Skip to content

Commit 69f65f0

Browse files
chore: enhance description of shell_exec (#3321)
Co-authored-by: Wendong-Fan <[email protected]>
1 parent 6ac84d6 commit 69f65f0

File tree

1 file changed

+108
-76
lines changed

1 file changed

+108
-76
lines changed

camel/toolkits/terminal_toolkit/terminal_toolkit.py

Lines changed: 108 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,10 @@
4242
try:
4343
import docker
4444
from docker.errors import APIError, NotFound
45-
from docker.models.containers import Container
4645
except ImportError:
4746
docker = None
4847
NotFound = None
4948
APIError = None
50-
Container = None
5149

5250

5351
def _to_plain(text: str) -> str:
@@ -400,9 +398,11 @@ def reader():
400398
with self._session_lock:
401399
if session_id in self.shell_sessions:
402400
self.shell_sessions[session_id]["error"] = str(e)
403-
except Exception:
404-
# Swallow any secondary errors during cleanup
405-
pass
401+
except Exception as cleanup_error:
402+
logger.warning(
403+
f"[SESSION {session_id}] Failed to store error state: "
404+
f"{cleanup_error}"
405+
)
406406
finally:
407407
try:
408408
with self._session_lock:
@@ -480,39 +480,40 @@ def _collect_output_until_idle(
480480

481481
warning_message = (
482482
"\n--- WARNING: Process is still actively outputting "
483-
"after max wait time. Consider using shell_wait() "
484-
"before sending the next command. ---"
483+
"after max wait time. Consider waiting before "
484+
"sending the next command. ---"
485485
)
486486
return "".join(output_parts) + warning_message
487487

488488
def shell_exec(self, id: str, command: str, block: bool = True) -> str:
489-
r"""This function executes a shell command. The command can run in
490-
blocking mode (waits for completion) or non-blocking mode
491-
(runs in the background). A unique session ID is created for
492-
each session.
489+
r"""Executes a shell command in blocking or non-blocking mode.
493490
494491
Args:
495-
command (str): The command to execute.
496-
block (bool): If True, the command runs synchronously,
497-
waiting for it to complete or time out, and returns
498-
its full output. If False, the command runs
499-
asynchronously in the background.
500-
id (Optional[str]): A specific ID for the session. If not provided,
501-
a unique ID is generated for non-blocking sessions.
492+
id (str): A unique identifier for the command's session. This ID is
493+
used to interact with non-blocking processes.
494+
command (str): The shell command to execute.
495+
block (bool, optional): Determines the execution mode. Defaults to
496+
True. If `True` (blocking mode), the function waits for the
497+
command to complete and returns the full output. Use this for
498+
most commands . If `False` (non-blocking mode), the function
499+
starts the command in the background. Use this only for
500+
interactive sessions or long-running tasks, or servers.
502501
503502
Returns:
504-
str: If block is True, returns the complete stdout and stderr.
505-
If block is False, returns a message containing the new
506-
session ID and the initial output from the command after
507-
it goes idle.
503+
str: The output of the command execution, which varies by mode.
504+
In blocking mode, returns the complete standard output and
505+
standard error from the command.
506+
In non-blocking mode, returns a confirmation message with the
507+
session `id`. To interact with the background process, use
508+
other functions: `shell_view(id)` to see output,
509+
`shell_write_to_process(id, "input")` to send input, and
510+
`shell_kill_process(id)` to terminate.
508511
"""
509512
if self.safe_mode:
510513
is_safe, message = self._sanitize_command(command)
511514
if not is_safe:
512515
return f"Error: {message}"
513516
command = message
514-
else:
515-
command = command
516517

517518
if self.use_docker_backend:
518519
# For Docker, we always run commands in a shell
@@ -570,7 +571,10 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
570571
log_entry += f"--- Error ---\n{error_msg}\n"
571572
return error_msg
572573
except Exception as e:
573-
if "Read timed out" in str(e):
574+
if (
575+
isinstance(e, (subprocess.TimeoutExpired, TimeoutError))
576+
or "timed out" in str(e).lower()
577+
):
574578
error_msg = (
575579
f"Error: Command timed out after "
576580
f"{self.timeout} seconds."
@@ -593,6 +597,14 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
593597
f"> {command}\n",
594598
)
595599

600+
# PYTHONUNBUFFERED=1 for real-time output
601+
# Without this, Python subprocesses buffer output (4KB buffer)
602+
# and shell_view() won't see output until buffer fills or process
603+
# exits
604+
env_vars = os.environ.copy()
605+
env_vars["PYTHONUNBUFFERED"] = "1"
606+
docker_env = {"PYTHONUNBUFFERED": "1"}
607+
596608
with self._session_lock:
597609
self.shell_sessions[session_id] = {
598610
"id": session_id,
@@ -606,6 +618,8 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
606618
else "local",
607619
}
608620

621+
process = None
622+
exec_socket = None
609623
try:
610624
if not self.use_docker_backend:
611625
process = subprocess.Popen(
@@ -617,6 +631,7 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
617631
text=True,
618632
cwd=self.working_dir,
619633
encoding="utf-8",
634+
env=env_vars,
620635
)
621636
with self._session_lock:
622637
self.shell_sessions[session_id]["process"] = process
@@ -630,6 +645,7 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
630645
stdin=True,
631646
tty=True,
632647
workdir=self.docker_workdir,
648+
environment=docker_env,
633649
)
634650
exec_id = exec_instance['Id']
635651
exec_socket = self.docker_api_client.exec_start(
@@ -643,15 +659,29 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
643659

644660
self._start_output_reader_thread(session_id)
645661

646-
# time.sleep(0.1)
647-
initial_output = self._collect_output_until_idle(session_id)
648-
662+
# Return immediately with session ID and instructions
649663
return (
650-
f"Session started with ID: {session_id}\n\n"
651-
f"[Initial Output]:\n{initial_output}"
664+
f"Session '{session_id}' started.\n\n"
665+
f"You could use:\n"
666+
f" - shell_view('{session_id}') - get output\n"
667+
f" - shell_write_to_process('{session_id}', '<input>')"
668+
f" - send input\n"
669+
f" - shell_kill_process('{session_id}') - terminate"
652670
)
653671

654672
except Exception as e:
673+
# Clean up resources on failure
674+
if process is not None:
675+
try:
676+
process.terminate()
677+
except Exception:
678+
pass
679+
if exec_socket is not None:
680+
try:
681+
exec_socket.close()
682+
except Exception:
683+
pass
684+
655685
with self._session_lock:
656686
if session_id in self.shell_sessions:
657687
self.shell_sessions[session_id]["running"] = False
@@ -714,18 +744,16 @@ def shell_write_to_process(self, id: str, command: str) -> str:
714744
return f"Error writing to session '{id}': {e}"
715745

716746
def shell_view(self, id: str) -> str:
717-
r"""This function retrieves any new output from a non-blocking session
718-
since the last time this function was called. If the process has
719-
terminated, it drains the output queue and appends a termination
720-
message. If the process is still running, it simply returns any
721-
new output.
747+
r"""Retrieves new output from a non-blocking session.
748+
749+
This function returns only NEW output since the last call. It does NOT
750+
wait or block - it returns immediately with whatever is available.
722751
723752
Args:
724753
id (str): The unique session ID of the non-blocking process.
725754
726755
Returns:
727-
str: The new output from the process's stdout and stderr. Returns
728-
an empty string if there is no new output.
756+
str: New output if available, or a status message.
729757
"""
730758
with self._session_lock:
731759
if id not in self.shell_sessions:
@@ -734,56 +762,39 @@ def shell_view(self, id: str) -> str:
734762
is_running = session["running"]
735763

736764
# If session is terminated, drain the queue and return
737-
# with a status message.
738765
if not is_running:
739766
final_output = []
740767
try:
741768
while True:
742769
final_output.append(session["output_stream"].get_nowait())
743770
except Empty:
744771
pass
745-
return "".join(final_output) + "\n--- SESSION TERMINATED ---"
746772

747-
# Otherwise, just drain the queue for a live session.
773+
if final_output:
774+
return "".join(final_output) + "\n\n--- SESSION TERMINATED ---"
775+
else:
776+
return "--- SESSION TERMINATED (no new output) ---"
777+
778+
# For running session, check for new output
748779
output = []
749780
try:
750781
while True:
751782
output.append(session["output_stream"].get_nowait())
752783
except Empty:
753784
pass
754785

755-
return "".join(output)
756-
757-
def shell_wait(self, id: str, wait_seconds: float = 5.0) -> str:
758-
r"""This function waits for a specified duration for a
759-
non-blocking process to produce more output or terminate.
760-
761-
Args:
762-
id (str): The unique session ID of the non-blocking process.
763-
wait_seconds (float): The maximum number of seconds to wait.
764-
765-
Returns:
766-
str: All output collected during the wait period.
767-
"""
768-
with self._session_lock:
769-
if id not in self.shell_sessions:
770-
return f"Error: No session found with ID '{id}'."
771-
session = self.shell_sessions[id]
772-
if not session["running"]:
773-
return (
774-
"Session is no longer running. "
775-
"Use shell_view to get final output."
776-
)
777-
778-
output_collected = []
779-
end_time = time.time() + wait_seconds
780-
while time.time() < end_time and session["running"]:
781-
new_output = self.shell_view(id)
782-
if new_output:
783-
output_collected.append(new_output)
784-
time.sleep(0.2)
785-
786-
return "".join(output_collected)
786+
if output:
787+
return "".join(output)
788+
else:
789+
# No new output - guide the agent
790+
return (
791+
"[No new output]\n"
792+
"Session is running but idle. Actions could take:\n"
793+
" - For interactive sessions: Send input "
794+
"with shell_write_to_process()\n"
795+
" - For long tasks: Check again later (don't poll "
796+
"too frequently)"
797+
)
787798

788799
def shell_kill_process(self, id: str) -> str:
789800
r"""This function forcibly terminates a running non-blocking process.
@@ -894,8 +905,17 @@ def shell_ask_user_for_help(self, id: str, prompt: str) -> str:
894905
except EOFError:
895906
return f"User input interrupted for session '{id}'."
896907

897-
def __del__(self):
898-
# Clean up any sessions
908+
def __enter__(self):
909+
r"""Context manager entry."""
910+
return self
911+
912+
def __exit__(self, exc_type, exc_val, exc_tb):
913+
r"""Context manager exit - clean up all sessions."""
914+
self.cleanup()
915+
return False
916+
917+
def cleanup(self):
918+
r"""Clean up all active sessions."""
899919
with self._session_lock:
900920
session_ids = list(self.shell_sessions.keys())
901921
for session_id in session_ids:
@@ -904,7 +924,20 @@ def __del__(self):
904924
"running", False
905925
)
906926
if is_running:
907-
self.shell_kill_process(session_id)
927+
try:
928+
self.shell_kill_process(session_id)
929+
except Exception as e:
930+
logger.warning(
931+
f"Failed to kill session '{session_id}' "
932+
f"during cleanup: {e}"
933+
)
934+
935+
def __del__(self):
936+
r"""Fallback cleanup in destructor."""
937+
try:
938+
self.cleanup()
939+
except Exception:
940+
pass
908941

909942
def get_tools(self) -> List[FunctionTool]:
910943
r"""Returns a list of FunctionTool objects representing the functions
@@ -917,7 +950,6 @@ def get_tools(self) -> List[FunctionTool]:
917950
return [
918951
FunctionTool(self.shell_exec),
919952
FunctionTool(self.shell_view),
920-
FunctionTool(self.shell_wait),
921953
FunctionTool(self.shell_write_to_process),
922954
FunctionTool(self.shell_kill_process),
923955
FunctionTool(self.shell_ask_user_for_help),

0 commit comments

Comments
 (0)