Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 108 additions & 76 deletions camel/toolkits/terminal_toolkit/terminal_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@
try:
import docker
from docker.errors import APIError, NotFound
from docker.models.containers import Container
except ImportError:
docker = None
NotFound = None
APIError = None
Container = None


def _to_plain(text: str) -> str:
Expand Down Expand Up @@ -400,9 +398,11 @@ def reader():
with self._session_lock:
if session_id in self.shell_sessions:
self.shell_sessions[session_id]["error"] = str(e)
except Exception:
# Swallow any secondary errors during cleanup
pass
except Exception as cleanup_error:
logger.warning(
f"[SESSION {session_id}] Failed to store error state: "
f"{cleanup_error}"
)
finally:
try:
with self._session_lock:
Expand Down Expand Up @@ -480,39 +480,40 @@ def _collect_output_until_idle(

warning_message = (
"\n--- WARNING: Process is still actively outputting "
"after max wait time. Consider using shell_wait() "
"before sending the next command. ---"
"after max wait time. Consider waiting before "
"sending the next command. ---"
)
return "".join(output_parts) + warning_message

def shell_exec(self, id: str, command: str, block: bool = True) -> str:
r"""This function executes a shell command. The command can run in
blocking mode (waits for completion) or non-blocking mode
(runs in the background). A unique session ID is created for
each session.
r"""Executes a shell command in blocking or non-blocking mode.

Args:
command (str): The command to execute.
block (bool): If True, the command runs synchronously,
waiting for it to complete or time out, and returns
its full output. If False, the command runs
asynchronously in the background.
id (Optional[str]): A specific ID for the session. If not provided,
a unique ID is generated for non-blocking sessions.
id (str): A unique identifier for the command's session. This ID is
used to interact with non-blocking processes.
command (str): The shell command to execute.
block (bool, optional): Determines the execution mode. Defaults to
True. If `True` (blocking mode), the function waits for the
command to complete and returns the full output. Use this for
most commands . If `False` (non-blocking mode), the function
starts the command in the background. Use this only for
interactive sessions or long-running tasks, or servers.

Returns:
str: If block is True, returns the complete stdout and stderr.
If block is False, returns a message containing the new
session ID and the initial output from the command after
it goes idle.
str: The output of the command execution, which varies by mode.
In blocking mode, returns the complete standard output and
standard error from the command.
In non-blocking mode, returns a confirmation message with the
session `id`. To interact with the background process, use
other functions: `shell_view(id)` to see output,
`shell_write_to_process(id, "input")` to send input, and
`shell_kill_process(id)` to terminate.
"""
if self.safe_mode:
is_safe, message = self._sanitize_command(command)
if not is_safe:
return f"Error: {message}"
command = message
else:
command = command

if self.use_docker_backend:
# For Docker, we always run commands in a shell
Expand Down Expand Up @@ -570,7 +571,10 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
log_entry += f"--- Error ---\n{error_msg}\n"
return error_msg
except Exception as e:
if "Read timed out" in str(e):
if (
isinstance(e, (subprocess.TimeoutExpired, TimeoutError))
or "timed out" in str(e).lower()
):
error_msg = (
f"Error: Command timed out after "
f"{self.timeout} seconds."
Expand All @@ -593,6 +597,14 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
f"> {command}\n",
)

# PYTHONUNBUFFERED=1 for real-time output
# Without this, Python subprocesses buffer output (4KB buffer)
# and shell_view() won't see output until buffer fills or process
# exits
env_vars = os.environ.copy()
env_vars["PYTHONUNBUFFERED"] = "1"
docker_env = {"PYTHONUNBUFFERED": "1"}

with self._session_lock:
self.shell_sessions[session_id] = {
"id": session_id,
Expand All @@ -606,6 +618,8 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
else "local",
}

process = None
exec_socket = None
try:
if not self.use_docker_backend:
process = subprocess.Popen(
Expand All @@ -617,6 +631,7 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
text=True,
cwd=self.working_dir,
encoding="utf-8",
env=env_vars,
)
with self._session_lock:
self.shell_sessions[session_id]["process"] = process
Expand All @@ -630,6 +645,7 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
stdin=True,
tty=True,
workdir=self.docker_workdir,
environment=docker_env,
)
exec_id = exec_instance['Id']
exec_socket = self.docker_api_client.exec_start(
Expand All @@ -643,15 +659,29 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:

self._start_output_reader_thread(session_id)

# time.sleep(0.1)
initial_output = self._collect_output_until_idle(session_id)

# Return immediately with session ID and instructions
return (
f"Session started with ID: {session_id}\n\n"
f"[Initial Output]:\n{initial_output}"
f"Session '{session_id}' started.\n\n"
f"You could use:\n"
f" - shell_view('{session_id}') - get output\n"
f" - shell_write_to_process('{session_id}', '<input>')"
f" - send input\n"
f" - shell_kill_process('{session_id}') - terminate"
)

except Exception as e:
# Clean up resources on failure
if process is not None:
try:
process.terminate()
except Exception:
pass
if exec_socket is not None:
try:
exec_socket.close()
except Exception:
pass

with self._session_lock:
if session_id in self.shell_sessions:
self.shell_sessions[session_id]["running"] = False
Expand Down Expand Up @@ -714,18 +744,16 @@ def shell_write_to_process(self, id: str, command: str) -> str:
return f"Error writing to session '{id}': {e}"

def shell_view(self, id: str) -> str:
r"""This function retrieves any new output from a non-blocking session
since the last time this function was called. If the process has
terminated, it drains the output queue and appends a termination
message. If the process is still running, it simply returns any
new output.
r"""Retrieves new output from a non-blocking session.

This function returns only NEW output since the last call. It does NOT
wait or block - it returns immediately with whatever is available.

Args:
id (str): The unique session ID of the non-blocking process.

Returns:
str: The new output from the process's stdout and stderr. Returns
an empty string if there is no new output.
str: New output if available, or a status message.
"""
with self._session_lock:
if id not in self.shell_sessions:
Expand All @@ -734,56 +762,39 @@ def shell_view(self, id: str) -> str:
is_running = session["running"]

# If session is terminated, drain the queue and return
# with a status message.
if not is_running:
final_output = []
try:
while True:
final_output.append(session["output_stream"].get_nowait())
except Empty:
pass
return "".join(final_output) + "\n--- SESSION TERMINATED ---"

# Otherwise, just drain the queue for a live session.
if final_output:
return "".join(final_output) + "\n\n--- SESSION TERMINATED ---"
else:
return "--- SESSION TERMINATED (no new output) ---"

# For running session, check for new output
output = []
try:
while True:
output.append(session["output_stream"].get_nowait())
except Empty:
pass

return "".join(output)

def shell_wait(self, id: str, wait_seconds: float = 5.0) -> str:
r"""This function waits for a specified duration for a
non-blocking process to produce more output or terminate.

Args:
id (str): The unique session ID of the non-blocking process.
wait_seconds (float): The maximum number of seconds to wait.

Returns:
str: All output collected during the wait period.
"""
with self._session_lock:
if id not in self.shell_sessions:
return f"Error: No session found with ID '{id}'."
session = self.shell_sessions[id]
if not session["running"]:
return (
"Session is no longer running. "
"Use shell_view to get final output."
)

output_collected = []
end_time = time.time() + wait_seconds
while time.time() < end_time and session["running"]:
new_output = self.shell_view(id)
if new_output:
output_collected.append(new_output)
time.sleep(0.2)

return "".join(output_collected)
if output:
return "".join(output)
else:
# No new output - guide the agent
return (
"[No new output]\n"
"Session is running but idle. Actions could take:\n"
" - For interactive sessions: Send input "
"with shell_write_to_process()\n"
" - For long tasks: Check again later (don't poll "
"too frequently)"
)

def shell_kill_process(self, id: str) -> str:
r"""This function forcibly terminates a running non-blocking process.
Expand Down Expand Up @@ -894,8 +905,17 @@ def shell_ask_user_for_help(self, id: str, prompt: str) -> str:
except EOFError:
return f"User input interrupted for session '{id}'."

def __del__(self):
# Clean up any sessions
def __enter__(self):
r"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
r"""Context manager exit - clean up all sessions."""
self.cleanup()
return False

def cleanup(self):
r"""Clean up all active sessions."""
with self._session_lock:
session_ids = list(self.shell_sessions.keys())
for session_id in session_ids:
Expand All @@ -904,7 +924,20 @@ def __del__(self):
"running", False
)
if is_running:
self.shell_kill_process(session_id)
try:
self.shell_kill_process(session_id)
except Exception as e:
logger.warning(
f"Failed to kill session '{session_id}' "
f"during cleanup: {e}"
)

def __del__(self):
r"""Fallback cleanup in destructor."""
try:
self.cleanup()
except Exception:
pass

def get_tools(self) -> List[FunctionTool]:
r"""Returns a list of FunctionTool objects representing the functions
Expand All @@ -917,7 +950,6 @@ def get_tools(self) -> List[FunctionTool]:
return [
FunctionTool(self.shell_exec),
FunctionTool(self.shell_view),
FunctionTool(self.shell_wait),
FunctionTool(self.shell_write_to_process),
FunctionTool(self.shell_kill_process),
FunctionTool(self.shell_ask_user_for_help),
Expand Down
Loading