diff --git a/libs/langchain_v1/langchain/agents/_internal/__init__.py b/libs/langchain_v1/langchain/agents/_internal/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/libs/langchain_v1/langchain/agents/_internal/file_utils.py b/libs/langchain_v1/langchain/agents/_internal/file_utils.py new file mode 100644 index 0000000000000..9449a803a24e7 --- /dev/null +++ b/libs/langchain_v1/langchain/agents/_internal/file_utils.py @@ -0,0 +1,283 @@ +"""Shared utility functions for file operations in middleware.""" + +from __future__ import annotations + +import os +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Literal + +from typing_extensions import TypedDict + +if TYPE_CHECKING: + from collections.abc import Sequence + + +class FileData(TypedDict): + """Data structure for storing file contents with metadata.""" + + content: list[str] + """Lines of the file.""" + + created_at: str + """ISO 8601 timestamp of file creation.""" + + modified_at: str + """ISO 8601 timestamp of last modification.""" + + +def file_data_reducer( + left: dict[str, FileData] | None, right: dict[str, FileData | None] +) -> dict[str, FileData]: + """Custom reducer that merges file updates. + + Args: + left: Existing files dict. + right: New files dict to merge (None values delete files). + + Returns: + Merged dict where right overwrites left for matching keys. + """ + if left is None: + # Filter out None values when initializing + return {k: v for k, v in right.items() if v is not None} + + # Merge, filtering out None values (deletions) + result = {**left} + for k, v in right.items(): + if v is None: + result.pop(k, None) + else: + result[k] = v + return result + + +def validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) -> str: + """Validate and normalize file path for security. + + Args: + path: The path to validate. + allowed_prefixes: Optional list of allowed path prefixes. + + Returns: + Normalized canonical path. + + Raises: + ValueError: If path contains traversal sequences or violates prefix rules. + """ + # Reject paths with traversal attempts + if ".." in path or path.startswith("~"): + msg = f"Path traversal not allowed: {path}" + raise ValueError(msg) + + # Normalize path (resolve ., //, etc.) + normalized = os.path.normpath(path) + + # Convert to forward slashes for consistency + normalized = normalized.replace("\\", "/") + + # Ensure path starts with / + if not normalized.startswith("/"): + normalized = f"/{normalized}" + + # Check allowed prefixes if specified + if allowed_prefixes is not None and not any( + normalized.startswith(prefix) for prefix in allowed_prefixes + ): + msg = f"Path must start with one of {allowed_prefixes}: {path}" + raise ValueError(msg) + + return normalized + + +def format_content_with_line_numbers( + content: str | list[str], + *, + format_style: Literal["pipe", "tab"] = "pipe", + start_line: int = 1, +) -> str: + r"""Format file content with line numbers. + + Args: + content: File content as string or list of lines. + format_style: "pipe" for "1|content" or "tab" for " 1\tcontent". + start_line: Starting line number. + + Returns: + Formatted content with line numbers. + """ + if isinstance(content, str): + lines = content.split("\n") + # Remove trailing empty line from split + if lines and lines[-1] == "": + lines = lines[:-1] + else: + lines = content + + if format_style == "pipe": + return "\n".join(f"{i + start_line}|{line}" for i, line in enumerate(lines)) + + return "\n".join(f"{i + start_line:6d}\t{line[:2000]}" for i, line in enumerate(lines)) + + +def apply_string_replacement( + content: str, + old_string: str, + new_string: str, + *, + replace_all: bool = False, +) -> tuple[str, int]: + """Apply string replacement to content. + + Args: + content: Original content. + old_string: String to replace. + new_string: Replacement string. + replace_all: If True, replace all occurrences. Otherwise, replace first. + + Returns: + Tuple of (new_content, replacement_count). + """ + if replace_all: + count = content.count(old_string) + new_content = content.replace(old_string, new_string) + else: + count = 1 + new_content = content.replace(old_string, new_string, 1) + + return new_content, count + + +def create_file_data( + content: str | list[str], + *, + created_at: str | None = None, +) -> FileData: + """Create a FileData object from content. + + Args: + content: File content as string or list of lines. + created_at: Optional creation timestamp. If None, uses current time. + + Returns: + FileData object. + """ + lines = content.split("\n") if isinstance(content, str) else content + + now = datetime.now(timezone.utc).isoformat() + + return { + "content": lines, + "created_at": created_at or now, + "modified_at": now, + } + + +def update_file_data( + file_data: FileData, + content: str | list[str], +) -> FileData: + """Update a FileData object with new content. + + Args: + file_data: Existing FileData object. + content: New file content as string or list of lines. + + Returns: + Updated FileData object with new modified_at timestamp. + """ + lines = content.split("\n") if isinstance(content, str) else content + + now = datetime.now(timezone.utc).isoformat() + + return { + "content": lines, + "created_at": file_data["created_at"], + "modified_at": now, + } + + +def file_data_to_string(file_data: FileData) -> str: + """Convert FileData to plain string content. + + Args: + file_data: FileData object. + + Returns: + File content as string. + """ + return "\n".join(file_data["content"]) + + +def list_directory(files: dict[str, FileData], path: str) -> list[str]: + """List files in a directory. + + Args: + files: Files dict mapping paths to FileData. + path: Normalized directory path. + + Returns: + Sorted list of file paths in the directory. + """ + # Ensure path ends with / for directory matching + dir_path = path if path.endswith("/") else f"{path}/" + + matching_files = [] + for file_path in files: + if file_path.startswith(dir_path): + # Get relative path from directory + relative = file_path[len(dir_path) :] + # Only include direct children (no subdirectories) + if "/" not in relative: + matching_files.append(file_path) + + return sorted(matching_files) + + +def check_empty_content(content: str) -> str | None: + """Check if file content is empty and return warning message. + + Args: + content: File content. + + Returns: + Warning message if empty, None otherwise. + """ + if not content or content.strip() == "": + return "System reminder: File exists but has empty contents" + return None + + +def has_memories_prefix(file_path: str) -> bool: + """Check if file path has the memories prefix. + + Args: + file_path: File path. + + Returns: + True if file path has the memories prefix, False otherwise. + """ + return file_path.startswith("/memories/") + + +def append_memories_prefix(file_path: str) -> str: + """Append the memories prefix to a file path. + + Args: + file_path: File path. + + Returns: + File path with the memories prefix. + """ + return f"/memories{file_path}" + + +def strip_memories_prefix(file_path: str) -> str: + """Strip the memories prefix from a file path. + + Args: + file_path: File path. + + Returns: + File path without the memories prefix. + """ + return file_path.replace("/memories", "") diff --git a/libs/langchain_v1/langchain/agents/deepagents.py b/libs/langchain_v1/langchain/agents/deepagents.py new file mode 100644 index 0000000000000..02e7073ab929f --- /dev/null +++ b/libs/langchain_v1/langchain/agents/deepagents.py @@ -0,0 +1,137 @@ +"""Deepagents come with planning, filesystem, and subagents.""" + +from collections.abc import Callable, Sequence +from typing import Any + +from langchain_anthropic import ChatAnthropic +from langchain_core.language_models import BaseChatModel +from langchain_core.tools import BaseTool +from langgraph.cache.base import BaseCache +from langgraph.graph.state import CompiledStateGraph +from langgraph.store.base import BaseStore +from langgraph.types import Checkpointer + +from langchain.agents.factory import create_agent +from langchain.agents.middleware.filesystem import FilesystemMiddleware +from langchain.agents.middleware.human_in_the_loop import HumanInTheLoopMiddleware, ToolConfig +from langchain.agents.middleware.planning import PlanningMiddleware +from langchain.agents.middleware.prompt_caching import AnthropicPromptCachingMiddleware +from langchain.agents.middleware.subagents import ( + CompiledSubAgent, + SubAgent, + SubAgentMiddleware, +) +from langchain.agents.middleware.summarization import SummarizationMiddleware +from langchain.agents.middleware.types import AgentMiddleware + +BASE_AGENT_PROMPT = "In order to complete the objective that the user asks of you, you have access to a number of standard tools." # noqa: E501 + + +def get_default_model() -> ChatAnthropic: + """Get the default model for deep agents. + + Returns: + ChatAnthropic instance configured with Claude Sonnet 4. + """ + return ChatAnthropic( + model_name="claude-sonnet-4-20250514", + timeout=None, + stop=None, + max_tokens=64000, + ) + + +def create_deep_agent( + model: str | BaseChatModel | None = None, + tools: Sequence[BaseTool | Callable | dict[str, Any]] | None = None, + *, + system_prompt: str | None = None, + middleware: Sequence[AgentMiddleware] = (), + subagents: list[SubAgent | CompiledSubAgent] | None = None, + context_schema: type[Any] | None = None, + checkpointer: Checkpointer | None = None, + store: BaseStore | None = None, + use_longterm_memory: bool = False, + tool_configs: dict[str, bool | ToolConfig] | None = None, + is_async: bool = False, + debug: bool = False, + name: str | None = None, + cache: BaseCache | None = None, +) -> CompiledStateGraph: + """Create a deep agent. + + This agent will by default have access to a tool to write todos (write_todos), + four file editing tools: write_file, ls, read_file, edit_file, and a tool to call + subagents. + + Args: + tools: The tools the agent should have access to. + system_prompt: The additional instructions the agent should have. Will go in + the system prompt. + middleware: Additional middleware to apply after standard middleware. + model: The model to use. + subagents: The subagents to use. Each subagent should be a dictionary with the + following keys: + - `name` + - `description` (used by the main agent to decide whether to call the + sub agent) + - `prompt` (used as the system prompt in the subagent) + - (optional) `tools` + - (optional) `model` (either a LanguageModelLike instance or dict + settings) + - (optional) `middleware` (list of AgentMiddleware) + context_schema: The schema of the deep agent. + checkpointer: Optional checkpointer for persisting agent state between runs. + store: Optional store for persisting longterm memories. + use_longterm_memory: Whether to use longterm memory - you must provide a store + in order to use longterm memory. + tool_configs: Optional Dict[str, HumanInTheLoopConfig] mapping tool names to + interrupt configs. + is_async: Whether to use async mode. If True, the agent will use async tools. + debug: Whether to enable debug mode. Passed through to create_agent. + name: The name of the agent. Passed through to create_agent. + cache: The cache to use for the agent. Passed through to create_agent. + + Returns: + A configured deep agent. + """ + if model is None: + model = get_default_model() + + deepagent_middleware = [ + PlanningMiddleware(), + FilesystemMiddleware( + use_longterm_memory=use_longterm_memory, + ), + SubAgentMiddleware( + default_subagent_tools=tools, + default_subagent_model=model, + subagents=subagents if subagents is not None else [], + is_async=is_async, + ), + SummarizationMiddleware( + model=model, + max_tokens_before_summary=120000, + messages_to_keep=20, + ), + AnthropicPromptCachingMiddleware(ttl="5m", unsupported_model_behavior="ignore"), + ] + if tool_configs is not None: + deepagent_middleware.append(HumanInTheLoopMiddleware(interrupt_on=tool_configs)) + if middleware is not None: + deepagent_middleware.extend(middleware) + + return create_agent( + model, + system_prompt=system_prompt + "\n\n" + BASE_AGENT_PROMPT + if system_prompt + else BASE_AGENT_PROMPT, + tools=tools, + middleware=deepagent_middleware, + context_schema=context_schema, + checkpointer=checkpointer, + store=store, + debug=debug, + name=name, + cache=cache, + ) diff --git a/libs/langchain_v1/langchain/agents/middleware/__init__.py b/libs/langchain_v1/langchain/agents/middleware/__init__.py index d7066c6792072..e87d0f22edb16 100644 --- a/libs/langchain_v1/langchain/agents/middleware/__init__.py +++ b/libs/langchain_v1/langchain/agents/middleware/__init__.py @@ -1,15 +1,26 @@ """Middleware plugins for agents.""" +from .anthropic_tools import ( + AnthropicToolsState, + FileData, + FilesystemClaudeMemoryMiddleware, + FilesystemClaudeTextEditorMiddleware, + StateClaudeMemoryMiddleware, + StateClaudeTextEditorMiddleware, +) from .context_editing import ( ClearToolUsesEdit, ContextEditingMiddleware, ) +from .file_search import FilesystemFileSearchMiddleware, StateFileSearchMiddleware +from .filesystem import FilesystemMiddleware from .human_in_the_loop import HumanInTheLoopMiddleware from .model_call_limit import ModelCallLimitMiddleware from .model_fallback import ModelFallbackMiddleware from .pii import PIIDetectionError, PIIMiddleware from .planning import PlanningMiddleware from .prompt_caching import AnthropicPromptCachingMiddleware +from .subagents import SubAgentMiddleware from .summarization import SummarizationMiddleware from .tool_call_limit import ToolCallLimitMiddleware from .tool_selection import LLMToolSelectorMiddleware @@ -31,8 +42,14 @@ "AgentState", # should move to langchain-anthropic if we decide to keep it "AnthropicPromptCachingMiddleware", + "AnthropicToolsState", "ClearToolUsesEdit", "ContextEditingMiddleware", + "FileData", + "FilesystemClaudeMemoryMiddleware", + "FilesystemClaudeTextEditorMiddleware", + "FilesystemFileSearchMiddleware", + "FilesystemMiddleware", "HumanInTheLoopMiddleware", "LLMToolSelectorMiddleware", "ModelCallLimitMiddleware", @@ -41,6 +58,10 @@ "PIIDetectionError", "PIIMiddleware", "PlanningMiddleware", + "StateClaudeMemoryMiddleware", + "StateClaudeTextEditorMiddleware", + "StateFileSearchMiddleware", + "SubAgentMiddleware", "SummarizationMiddleware", "ToolCallLimitMiddleware", "after_agent", diff --git a/libs/langchain_v1/langchain/agents/middleware/anthropic_tools.py b/libs/langchain_v1/langchain/agents/middleware/anthropic_tools.py new file mode 100644 index 0000000000000..056e681d5a0b0 --- /dev/null +++ b/libs/langchain_v1/langchain/agents/middleware/anthropic_tools.py @@ -0,0 +1,910 @@ +"""Anthropic text editor and memory tool middleware. + +This module provides client-side implementations of Anthropic's text editor and +memory tools using schema-less tool definitions and tool call interception. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Annotated, Any, cast + +from langchain_core.messages import AIMessage, ToolMessage +from langgraph.types import Command +from typing_extensions import NotRequired + +from langchain.agents._internal.file_utils import ( + FileData, + apply_string_replacement, + create_file_data, + file_data_reducer, + file_data_to_string, + format_content_with_line_numbers, + list_directory, + update_file_data, + validate_path, +) +from langchain.agents.middleware.types import AgentMiddleware, AgentState, ModelRequest + +if TYPE_CHECKING: + from collections.abc import Callable, Sequence + + from langchain.tools.tool_node import ToolCallRequest + +# Tool type constants +TEXT_EDITOR_TOOL_TYPE = "text_editor_20250728" +TEXT_EDITOR_TOOL_NAME = "str_replace_based_edit_tool" +MEMORY_TOOL_TYPE = "memory_20250818" +MEMORY_TOOL_NAME = "memory" + +MEMORY_SYSTEM_PROMPT = """IMPORTANT: ALWAYS VIEW YOUR MEMORY DIRECTORY BEFORE \ +DOING ANYTHING ELSE. +MEMORY PROTOCOL: +1. Use the `view` command of your `memory` tool to check for earlier progress. +2. ... (work on the task) ... + - As you make progress, record status / progress / thoughts etc in your memory. +ASSUME INTERRUPTION: Your context window might be reset at any moment, so you risk \ +losing any progress that is not recorded in your memory directory.""" + + +class AnthropicToolsState(AgentState): + """State schema for Anthropic text editor and memory tools.""" + + text_editor_files: NotRequired[Annotated[dict[str, FileData], file_data_reducer]] + """Virtual file system for text editor tools.""" + + memory_files: NotRequired[Annotated[dict[str, FileData], file_data_reducer]] + """Virtual file system for memory tools.""" + + +class _StateClaudeFileToolMiddleware(AgentMiddleware): + """Base class for state-based file tool middleware (internal).""" + + state_schema = AnthropicToolsState + + def __init__( + self, + *, + tool_type: str, + tool_name: str, + state_key: str, + allowed_path_prefixes: Sequence[str] | None = None, + system_prompt: str | None = None, + ) -> None: + """Initialize the middleware. + + Args: + tool_type: Tool type identifier. + tool_name: Tool name. + state_key: State key for file storage. + allowed_path_prefixes: Optional list of allowed path prefixes. + system_prompt: Optional system prompt to inject. + """ + self.tool_type = tool_type + self.tool_name = tool_name + self.state_key = state_key + self.allowed_prefixes = allowed_path_prefixes + self.system_prompt = system_prompt + + def wrap_model_call( + self, + request: ModelRequest, + handler: Callable[[ModelRequest], AIMessage], + ) -> AIMessage: + """Inject tool and optional system prompt.""" + # Add tool + tools = list(request.tools or []) + tools.append( + { + "type": self.tool_type, + "name": self.tool_name, + } + ) + request.tools = tools + + # Inject system prompt if provided + if self.system_prompt: + request.system_prompt = ( + request.system_prompt + "\n\n" + self.system_prompt + if request.system_prompt + else self.system_prompt + ) + + return handler(request) + + def wrap_tool_call( + self, request: ToolCallRequest, handler: Callable[[ToolCallRequest], ToolMessage | Command] + ) -> ToolMessage | Command: + """Intercept tool calls.""" + tool_call = request.tool_call + tool_name = tool_call.get("name") + + if tool_name != self.tool_name: + return handler(request) + + # Handle tool call + try: + args = tool_call.get("args", {}) + command = args.get("command") + state = request.state + + if command == "view": + return self._handle_view(args, state, tool_call["id"]) + if command == "create": + return self._handle_create(args, state, tool_call["id"]) + if command == "str_replace": + return self._handle_str_replace(args, state, tool_call["id"]) + if command == "insert": + return self._handle_insert(args, state, tool_call["id"]) + if command == "delete": + return self._handle_delete(args, state, tool_call["id"]) + if command == "rename": + return self._handle_rename(args, state, tool_call["id"]) + + msg = f"Unknown command: {command}" + return ToolMessage( + content=msg, + tool_call_id=tool_call["id"], + name=tool_name, + status="error", + ) + except (ValueError, FileNotFoundError) as e: + return ToolMessage( + content=str(e), + tool_call_id=tool_call["id"], + name=tool_name, + status="error", + ) + + def _handle_view( + self, args: dict, state: AnthropicToolsState, tool_call_id: str | None + ) -> Command: + """Handle view command.""" + path = args["path"] + normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes) + + files = cast("dict[str, Any]", state.get(self.state_key, {})) + file_data = files.get(normalized_path) + + if file_data is None: + # Try directory listing + matching = list_directory(files, normalized_path) + + if matching: + content = "\n".join(matching) + return Command( + update={ + "messages": [ + ToolMessage( + content=content, + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ] + } + ) + + msg = f"File not found: {path}" + raise FileNotFoundError(msg) + + # Format file content with line numbers + content = format_content_with_line_numbers(file_data["content"], format_style="pipe") + + return Command( + update={ + "messages": [ + ToolMessage( + content=content, + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ] + } + ) + + def _handle_create( + self, args: dict, state: AnthropicToolsState, tool_call_id: str | None + ) -> Command: + """Handle create command.""" + path = args["path"] + file_text = args["file_text"] + + normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes) + + # Get existing files + files = cast("dict[str, Any]", state.get(self.state_key, {})) + existing = files.get(normalized_path) + + # Create or update file data + if existing: + new_file_data = update_file_data(existing, file_text) + else: + new_file_data = create_file_data(file_text) + + return Command( + update={ + self.state_key: { + normalized_path: new_file_data, + }, + "messages": [ + ToolMessage( + content=f"File created: {path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ], + } + ) + + def _handle_str_replace( + self, args: dict, state: AnthropicToolsState, tool_call_id: str | None + ) -> Command: + """Handle str_replace command.""" + path = args["path"] + old_str = args["old_str"] + new_str = args.get("new_str", "") + + normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes) + + # Read file + files = cast("dict[str, Any]", state.get(self.state_key, {})) + file_data = files.get(normalized_path) + if file_data is None: + msg = f"File not found: {path}" + raise FileNotFoundError(msg) + + content = file_data_to_string(file_data) + + # Replace string + if old_str not in content: + msg = f"String not found in file: {old_str}" + raise ValueError(msg) + new_content, _ = apply_string_replacement(content, old_str, new_str, replace_all=False) + + # Update file + new_file_data = update_file_data(file_data, new_content) + + return Command( + update={ + self.state_key: { + normalized_path: new_file_data, + }, + "messages": [ + ToolMessage( + content=f"String replaced in {path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ], + } + ) + + def _handle_insert( + self, args: dict, state: AnthropicToolsState, tool_call_id: str | None + ) -> Command: + """Handle insert command.""" + path = args["path"] + insert_line = args["insert_line"] + text_to_insert = args["new_str"] + + normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes) + + # Read file + files = cast("dict[str, Any]", state.get(self.state_key, {})) + file_data = files.get(normalized_path) + if file_data is None: + msg = f"File not found: {path}" + raise FileNotFoundError(msg) + + lines_content = file_data["content"] + new_lines = text_to_insert.split("\n") + + # Insert after insert_line (0-indexed) + updated_lines = lines_content[:insert_line] + new_lines + lines_content[insert_line:] + + # Update file + new_file_data = update_file_data(file_data, updated_lines) + + return Command( + update={ + self.state_key: { + normalized_path: new_file_data, + }, + "messages": [ + ToolMessage( + content=f"Text inserted in {path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ], + } + ) + + def _handle_delete( + self, + args: dict, + state: AnthropicToolsState, # noqa: ARG002 + tool_call_id: str | None, + ) -> Command: + """Handle delete command.""" + path = args["path"] + + normalized_path = validate_path(path, allowed_prefixes=self.allowed_prefixes) + + return Command( + update={ + self.state_key: {normalized_path: None}, + "messages": [ + ToolMessage( + content=f"File deleted: {path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ], + } + ) + + def _handle_rename( + self, args: dict, state: AnthropicToolsState, tool_call_id: str | None + ) -> Command: + """Handle rename command.""" + old_path = args["old_path"] + new_path = args["new_path"] + + normalized_old = validate_path(old_path, allowed_prefixes=self.allowed_prefixes) + normalized_new = validate_path(new_path, allowed_prefixes=self.allowed_prefixes) + + # Read file + files = cast("dict[str, Any]", state.get(self.state_key, {})) + file_data = files.get(normalized_old) + if file_data is None: + msg = f"File not found: {old_path}" + raise ValueError(msg) + + # Update timestamp + content = file_data["content"] + new_file_data = update_file_data(file_data, content) + + return Command( + update={ + self.state_key: { + normalized_old: None, + normalized_new: new_file_data, + }, + "messages": [ + ToolMessage( + content=f"File renamed: {old_path} -> {new_path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ], + } + ) + + +class StateClaudeTextEditorMiddleware(_StateClaudeFileToolMiddleware): + """State-based text editor tool middleware. + + Provides Anthropic's text_editor tool using LangGraph state for storage. + Files persist for the conversation thread. + + Example: + ```python + from langchain.agents import create_agent + from langchain.agents.middleware import StateTextEditorToolMiddleware + + agent = create_agent( + model=model, + tools=[], + middleware=[StateTextEditorToolMiddleware()], + ) + ``` + """ + + def __init__( + self, + *, + allowed_path_prefixes: Sequence[str] | None = None, + ) -> None: + """Initialize the text editor middleware. + + Args: + allowed_path_prefixes: Optional list of allowed path prefixes. + If specified, only paths starting with these prefixes are allowed. + """ + super().__init__( + tool_type=TEXT_EDITOR_TOOL_TYPE, + tool_name=TEXT_EDITOR_TOOL_NAME, + state_key="text_editor_files", + allowed_path_prefixes=allowed_path_prefixes, + ) + + +class StateClaudeMemoryMiddleware(_StateClaudeFileToolMiddleware): + """State-based memory tool middleware. + + Provides Anthropic's memory tool using LangGraph state for storage. + Files persist for the conversation thread. Enforces /memories prefix + and injects Anthropic's recommended system prompt. + + Example: + ```python + from langchain.agents import create_agent + from langchain.agents.middleware import StateMemoryToolMiddleware + + agent = create_agent( + model=model, + tools=[], + middleware=[StateMemoryToolMiddleware()], + ) + ``` + """ + + def __init__( + self, + *, + allowed_path_prefixes: Sequence[str] | None = None, + system_prompt: str = MEMORY_SYSTEM_PROMPT, + ) -> None: + """Initialize the memory middleware. + + Args: + allowed_path_prefixes: Optional list of allowed path prefixes. + Defaults to ["/memories"]. + system_prompt: System prompt to inject. Defaults to Anthropic's + recommended memory prompt. + """ + super().__init__( + tool_type=MEMORY_TOOL_TYPE, + tool_name=MEMORY_TOOL_NAME, + state_key="memory_files", + allowed_path_prefixes=allowed_path_prefixes or ["/memories"], + system_prompt=system_prompt, + ) + + +class _FilesystemClaudeFileToolMiddleware(AgentMiddleware): + """Base class for filesystem-based file tool middleware (internal).""" + + def __init__( + self, + *, + tool_type: str, + tool_name: str, + root_path: str, + allowed_prefixes: list[str] | None = None, + max_file_size_mb: int = 10, + system_prompt: str | None = None, + ) -> None: + """Initialize the middleware. + + Args: + tool_type: Tool type identifier. + tool_name: Tool name. + root_path: Root directory for file operations. + allowed_prefixes: Optional list of allowed virtual path prefixes. + max_file_size_mb: Maximum file size in MB. + system_prompt: Optional system prompt to inject. + """ + self.tool_type = tool_type + self.tool_name = tool_name + self.root_path = Path(root_path).resolve() + self.allowed_prefixes = allowed_prefixes or ["/"] + self.max_file_size_bytes = max_file_size_mb * 1024 * 1024 + self.system_prompt = system_prompt + + # Create root directory if it doesn't exist + self.root_path.mkdir(parents=True, exist_ok=True) + + def wrap_model_call( + self, + request: ModelRequest, + handler: Callable[[ModelRequest], AIMessage], + ) -> AIMessage: + """Inject tool and optional system prompt.""" + # Add tool + tools = list(request.tools or []) + tools.append( + { + "type": self.tool_type, + "name": self.tool_name, + } + ) + request.tools = tools + + # Inject system prompt if provided + if self.system_prompt: + request.system_prompt = ( + request.system_prompt + "\n\n" + self.system_prompt + if request.system_prompt + else self.system_prompt + ) + + return handler(request) + + def wrap_tool_call( + self, request: ToolCallRequest, handler: Callable[[ToolCallRequest], ToolMessage | Command] + ) -> ToolMessage | Command: + """Intercept tool calls.""" + tool_call = request.tool_call + tool_name = tool_call.get("name") + + if tool_name != self.tool_name: + return handler(request) + + # Handle tool call + try: + args = tool_call.get("args", {}) + command = args.get("command") + + if command == "view": + return self._handle_view(args, tool_call["id"]) + if command == "create": + return self._handle_create(args, tool_call["id"]) + if command == "str_replace": + return self._handle_str_replace(args, tool_call["id"]) + if command == "insert": + return self._handle_insert(args, tool_call["id"]) + if command == "delete": + return self._handle_delete(args, tool_call["id"]) + if command == "rename": + return self._handle_rename(args, tool_call["id"]) + + msg = f"Unknown command: {command}" + return ToolMessage( + content=msg, + tool_call_id=tool_call["id"], + name=tool_name, + status="error", + ) + except (ValueError, FileNotFoundError) as e: + return ToolMessage( + content=str(e), + tool_call_id=tool_call["id"], + name=tool_name, + status="error", + ) + + def _validate_and_resolve_path(self, path: str) -> Path: + """Validate and resolve a virtual path to filesystem path. + + Args: + path: Virtual path (e.g., /file.txt or /src/main.py). + + Returns: + Resolved absolute filesystem path within root_path. + + Raises: + ValueError: If path contains traversal attempts, escapes root directory, + or violates allowed_prefixes restrictions. + """ + # Normalize path + if not path.startswith("/"): + path = "/" + path + + # Check for path traversal + if ".." in path or "~" in path: + msg = "Path traversal not allowed" + raise ValueError(msg) + + # Convert virtual path to filesystem path + # Remove leading / and resolve relative to root + relative = path.lstrip("/") + full_path = (self.root_path / relative).resolve() + + # Ensure path is within root + try: + full_path.relative_to(self.root_path) + except ValueError: + msg = f"Path outside root directory: {path}" + raise ValueError(msg) from None + + # Check allowed prefixes + virtual_path = "/" + str(full_path.relative_to(self.root_path)) + if self.allowed_prefixes: + allowed = any( + virtual_path.startswith(prefix) or virtual_path == prefix.rstrip("/") + for prefix in self.allowed_prefixes + ) + if not allowed: + msg = f"Path must start with one of: {self.allowed_prefixes}" + raise ValueError(msg) + + return full_path + + def _handle_view(self, args: dict, tool_call_id: str | None) -> Command: + """Handle view command.""" + path = args["path"] + full_path = self._validate_and_resolve_path(path) + + if not full_path.exists() or not full_path.is_file(): + msg = f"File not found: {path}" + raise FileNotFoundError(msg) + + # Check file size + if full_path.stat().st_size > self.max_file_size_bytes: + msg = f"File too large: {path} exceeds {self.max_file_size_bytes / 1024 / 1024}MB" + raise ValueError(msg) + + # Read file + try: + content = full_path.read_text() + except UnicodeDecodeError as e: + msg = f"Cannot decode file {path}: {e}" + raise ValueError(msg) from e + + # Format with line numbers + formatted_content = format_content_with_line_numbers(content, format_style="pipe") + + return Command( + update={ + "messages": [ + ToolMessage( + content=formatted_content, + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ] + } + ) + + def _handle_create(self, args: dict, tool_call_id: str | None) -> Command: + """Handle create command.""" + path = args["path"] + file_text = args["file_text"] + + full_path = self._validate_and_resolve_path(path) + + # Create parent directories + full_path.parent.mkdir(parents=True, exist_ok=True) + + # Write file + full_path.write_text(file_text + "\n") + + return Command( + update={ + "messages": [ + ToolMessage( + content=f"File created: {path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ] + } + ) + + def _handle_str_replace(self, args: dict, tool_call_id: str | None) -> Command: + """Handle str_replace command.""" + path = args["path"] + old_str = args["old_str"] + new_str = args.get("new_str", "") + + full_path = self._validate_and_resolve_path(path) + + if not full_path.exists(): + msg = f"File not found: {path}" + raise FileNotFoundError(msg) + + # Read file + content = full_path.read_text() + + # Replace string + if old_str not in content: + msg = f"String not found in file: {old_str}" + raise ValueError(msg) + new_content, _ = apply_string_replacement(content, old_str, new_str, replace_all=False) + + # Write back + full_path.write_text(new_content) + + return Command( + update={ + "messages": [ + ToolMessage( + content=f"String replaced in {path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ] + } + ) + + def _handle_insert(self, args: dict, tool_call_id: str | None) -> Command: + """Handle insert command.""" + path = args["path"] + insert_line = args["insert_line"] + text_to_insert = args["new_str"] + + full_path = self._validate_and_resolve_path(path) + + if not full_path.exists(): + msg = f"File not found: {path}" + raise FileNotFoundError(msg) + + # Read file + content = full_path.read_text() + lines = content.split("\n") + # Handle trailing newline + if lines and lines[-1] == "": + lines = lines[:-1] + had_trailing_newline = True + else: + had_trailing_newline = False + + new_lines = text_to_insert.split("\n") + + # Insert after insert_line (0-indexed) + updated_lines = lines[:insert_line] + new_lines + lines[insert_line:] + + # Write back + new_content = "\n".join(updated_lines) + if had_trailing_newline: + new_content += "\n" + full_path.write_text(new_content) + + return Command( + update={ + "messages": [ + ToolMessage( + content=f"Text inserted in {path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ] + } + ) + + def _handle_delete(self, args: dict, tool_call_id: str | None) -> Command: + """Handle delete command.""" + import shutil + + path = args["path"] + full_path = self._validate_and_resolve_path(path) + + if full_path.is_file(): + full_path.unlink() + elif full_path.is_dir(): + shutil.rmtree(full_path) + # If doesn't exist, silently succeed + + return Command( + update={ + "messages": [ + ToolMessage( + content=f"File deleted: {path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ] + } + ) + + def _handle_rename(self, args: dict, tool_call_id: str | None) -> Command: + """Handle rename command.""" + old_path = args["old_path"] + new_path = args["new_path"] + + old_full = self._validate_and_resolve_path(old_path) + new_full = self._validate_and_resolve_path(new_path) + + if not old_full.exists(): + msg = f"File not found: {old_path}" + raise ValueError(msg) + + # Create parent directory for new path + new_full.parent.mkdir(parents=True, exist_ok=True) + + # Rename + old_full.rename(new_full) + + return Command( + update={ + "messages": [ + ToolMessage( + content=f"File renamed: {old_path} -> {new_path}", + tool_call_id=tool_call_id, + name=self.tool_name, + ) + ] + } + ) + + +class FilesystemClaudeTextEditorMiddleware(_FilesystemClaudeFileToolMiddleware): + """Filesystem-based text editor tool middleware. + + Provides Anthropic's text_editor tool using local filesystem for storage. + User handles persistence via volumes, git, or other mechanisms. + + Example: + ```python + from langchain.agents import create_agent + from langchain.agents.middleware import FilesystemTextEditorToolMiddleware + + agent = create_agent( + model=model, + tools=[], + middleware=[FilesystemTextEditorToolMiddleware(root_path="/workspace")], + ) + ``` + """ + + def __init__( + self, + *, + root_path: str, + allowed_prefixes: list[str] | None = None, + max_file_size_mb: int = 10, + ) -> None: + """Initialize the text editor middleware. + + Args: + root_path: Root directory for file operations. + allowed_prefixes: Optional list of allowed virtual path prefixes (default: ["/"]). + max_file_size_mb: Maximum file size in MB (default: 10). + """ + super().__init__( + tool_type=TEXT_EDITOR_TOOL_TYPE, + tool_name=TEXT_EDITOR_TOOL_NAME, + root_path=root_path, + allowed_prefixes=allowed_prefixes, + max_file_size_mb=max_file_size_mb, + ) + + +class FilesystemClaudeMemoryMiddleware(_FilesystemClaudeFileToolMiddleware): + """Filesystem-based memory tool middleware. + + Provides Anthropic's memory tool using local filesystem for storage. + User handles persistence via volumes, git, or other mechanisms. + Enforces /memories prefix and injects Anthropic's recommended system prompt. + + Example: + ```python + from langchain.agents import create_agent + from langchain.agents.middleware import FilesystemMemoryToolMiddleware + + agent = create_agent( + model=model, + tools=[], + middleware=[FilesystemMemoryToolMiddleware(root_path="/workspace")], + ) + ``` + """ + + def __init__( + self, + *, + root_path: str, + allowed_prefixes: list[str] | None = None, + max_file_size_mb: int = 10, + system_prompt: str = MEMORY_SYSTEM_PROMPT, + ) -> None: + """Initialize the memory middleware. + + Args: + root_path: Root directory for file operations. + allowed_prefixes: Optional list of allowed virtual path prefixes. + Defaults to ["/memories"]. + max_file_size_mb: Maximum file size in MB (default: 10). + system_prompt: System prompt to inject. Defaults to Anthropic's + recommended memory prompt. + """ + super().__init__( + tool_type=MEMORY_TOOL_TYPE, + tool_name=MEMORY_TOOL_NAME, + root_path=root_path, + allowed_prefixes=allowed_prefixes or ["/memories"], + max_file_size_mb=max_file_size_mb, + system_prompt=system_prompt, + ) + + +__all__ = [ + "AnthropicToolsState", + "FileData", + "FilesystemClaudeMemoryMiddleware", + "FilesystemClaudeTextEditorMiddleware", + "StateClaudeMemoryMiddleware", + "StateClaudeTextEditorMiddleware", +] diff --git a/libs/langchain_v1/langchain/agents/middleware/file_search.py b/libs/langchain_v1/langchain/agents/middleware/file_search.py new file mode 100644 index 0000000000000..34d31ddebe54b --- /dev/null +++ b/libs/langchain_v1/langchain/agents/middleware/file_search.py @@ -0,0 +1,550 @@ +"""File search middleware for Anthropic text editor and memory tools. + +This module provides Glob and Grep search tools that operate on files stored +in state or filesystem. +""" + +from __future__ import annotations + +import fnmatch +import json +import re +import subprocess +from contextlib import suppress +from datetime import datetime, timezone +from pathlib import Path, PurePosixPath +from typing import Annotated, Any, Literal, cast + +from langchain_core.tools import InjectedToolArg, tool + +from langchain.agents.middleware.anthropic_tools import AnthropicToolsState +from langchain.agents.middleware.types import AgentMiddleware + + +class StateFileSearchMiddleware(AgentMiddleware): + """Provides Glob and Grep search over state-based files. + + This middleware adds two tools that search through virtual files in state: + - Glob: Fast file pattern matching by file path + - Grep: Fast content search using regular expressions + + Example: + ```python + from langchain.agents import create_agent + from langchain.agents.middleware import ( + StateTextEditorToolMiddleware, + StateFileSearchMiddleware, + ) + + agent = create_agent( + model=model, + tools=[], + middleware=[ + StateTextEditorToolMiddleware(), + StateFileSearchMiddleware(), + ], + ) + ``` + """ + + state_schema = AnthropicToolsState + + def __init__( + self, + *, + state_key: str = "text_editor_files", + ) -> None: + """Initialize the search middleware. + + Args: + state_key: State key to search (default: "text_editor_files"). + Use "memory_files" to search memory tool files. + """ + self.state_key = state_key + + # Create tool instances + @tool + def glob_search( # noqa: D417 + pattern: str, + path: str = "/", + state: Annotated[AnthropicToolsState, InjectedToolArg] = None, # type: ignore[assignment] + ) -> str: + """Fast file pattern matching tool that works with any codebase size. + + Supports glob patterns like **/*.js or src/**/*.ts. + Returns matching file paths sorted by modification time. + Use this tool when you need to find files by name patterns. + + Args: + pattern: The glob pattern to match files against. + path: The directory to search in. If not specified, searches from root. + + Returns: + Newline-separated list of matching file paths, sorted by modification + time (most recently modified first). Returns "No files found" if no + matches. + """ + # Normalize base path + base_path = path if path.startswith("/") else "/" + path + + # Get files from state + files = cast("dict[str, Any]", state.get(self.state_key, {})) + + # Match files + matches = [] + for file_path, file_data in files.items(): + if file_path.startswith(base_path): + # Get relative path from base + if base_path == "/": + relative = file_path[1:] # Remove leading / + elif file_path == base_path: + relative = Path(file_path).name + elif file_path.startswith(base_path + "/"): + relative = file_path[len(base_path) + 1 :] + else: + continue + + # Match against pattern + # Handle ** pattern which requires special care + # PurePosixPath.match doesn't match single-level paths against **/pattern + is_match = PurePosixPath(relative).match(pattern) + if not is_match and pattern.startswith("**/"): + # Also try matching without the **/ prefix for files in base dir + is_match = PurePosixPath(relative).match(pattern[3:]) + + if is_match: + matches.append((file_path, file_data["modified_at"])) + + if not matches: + return "No files found" + + # Sort by modification time + matches.sort(key=lambda x: x[1], reverse=True) + file_paths = [path for path, _ in matches] + + return "\n".join(file_paths) + + @tool + def grep_search( # noqa: D417 + pattern: str, + path: str = "/", + include: str | None = None, + output_mode: Literal["files_with_matches", "content", "count"] = "files_with_matches", + state: Annotated[AnthropicToolsState, InjectedToolArg] = None, # type: ignore[assignment] + ) -> str: + """Fast content search tool that works with any codebase size. + + Searches file contents using regular expressions. Supports full regex + syntax and filters files by pattern with the include parameter. + + Args: + pattern: The regular expression pattern to search for in file contents. + path: The directory to search in. If not specified, searches from root. + include: File pattern to filter (e.g., "*.js", "*.{ts,tsx}"). + output_mode: Output format: + - "files_with_matches": Only file paths containing matches (default) + - "content": Matching lines with file:line:content format + - "count": Count of matches per file + + Returns: + Search results formatted according to output_mode. Returns "No matches + found" if no results. + """ + # Normalize base path + base_path = path if path.startswith("/") else "/" + path + + # Compile regex pattern (for validation) + try: + regex = re.compile(pattern) + except re.error as e: + return f"Invalid regex pattern: {e}" + + # Search files + files = cast("dict[str, Any]", state.get(self.state_key, {})) + results: dict[str, list[tuple[int, str]]] = {} + + for file_path, file_data in files.items(): + if not file_path.startswith(base_path): + continue + + # Check include filter + if include: + basename = Path(file_path).name + if not self._match_include(basename, include): + continue + + # Search file content + for line_num, line in enumerate(file_data["content"], 1): + if regex.search(line): + if file_path not in results: + results[file_path] = [] + results[file_path].append((line_num, line)) + + if not results: + return "No matches found" + + # Format output based on mode + return self._format_grep_results(results, output_mode) + + self.glob_search = glob_search + self.grep_search = grep_search + self.tools = [glob_search, grep_search] + + def _match_include(self, basename: str, pattern: str) -> bool: + """Match filename against include pattern.""" + # Handle brace expansion {a,b,c} + if "{" in pattern and "}" in pattern: + start = pattern.index("{") + end = pattern.index("}") + prefix = pattern[:start] + suffix = pattern[end + 1 :] + alternatives = pattern[start + 1 : end].split(",") + + for alt in alternatives: + expanded = prefix + alt + suffix + if fnmatch.fnmatch(basename, expanded): + return True + return False + return fnmatch.fnmatch(basename, pattern) + + def _format_grep_results( + self, + results: dict[str, list[tuple[int, str]]], + output_mode: str, + ) -> str: + """Format grep results based on output mode.""" + if output_mode == "files_with_matches": + # Just return file paths + return "\n".join(sorted(results.keys())) + + if output_mode == "content": + # Return file:line:content format + lines = [] + for file_path in sorted(results.keys()): + for line_num, line in results[file_path]: + lines.append(f"{file_path}:{line_num}:{line}") + return "\n".join(lines) + + if output_mode == "count": + # Return file:count format + lines = [] + for file_path in sorted(results.keys()): + count = len(results[file_path]) + lines.append(f"{file_path}:{count}") + return "\n".join(lines) + + # Default to files_with_matches + return "\n".join(sorted(results.keys())) + + +class FilesystemFileSearchMiddleware(AgentMiddleware): + """Provides Glob and Grep search over filesystem files. + + This middleware adds two tools that search through local filesystem: + - Glob: Fast file pattern matching by file path + - Grep: Fast content search using ripgrep or Python fallback + + Example: + ```python + from langchain.agents import create_agent + from langchain.agents.middleware import ( + FilesystemTextEditorToolMiddleware, + FilesystemFileSearchMiddleware, + ) + + agent = create_agent( + model=model, + tools=[], + middleware=[ + FilesystemTextEditorToolMiddleware(root_path="/workspace"), + FilesystemFileSearchMiddleware(root_path="/workspace"), + ], + ) + ``` + """ + + def __init__( + self, + *, + root_path: str, + use_ripgrep: bool = True, + max_file_size_mb: int = 10, + ) -> None: + """Initialize the search middleware. + + Args: + root_path: Root directory to search. + use_ripgrep: Whether to use ripgrep for search (default: True). + Falls back to Python if ripgrep unavailable. + max_file_size_mb: Maximum file size to search in MB (default: 10). + """ + self.root_path = Path(root_path).resolve() + self.use_ripgrep = use_ripgrep + self.max_file_size_bytes = max_file_size_mb * 1024 * 1024 + + # Create tool instances as closures that capture self + @tool + def glob_search(pattern: str, path: str = "/") -> str: + """Fast file pattern matching tool that works with any codebase size. + + Supports glob patterns like **/*.js or src/**/*.ts. + Returns matching file paths sorted by modification time. + Use this tool when you need to find files by name patterns. + + Args: + pattern: The glob pattern to match files against. + path: The directory to search in. If not specified, searches from root. + + Returns: + Newline-separated list of matching file paths, sorted by modification + time (most recently modified first). Returns "No files found" if no + matches. + """ + try: + base_full = self._validate_and_resolve_path(path) + except ValueError: + return "No files found" + + if not base_full.exists() or not base_full.is_dir(): + return "No files found" + + # Use pathlib glob + matching: list[tuple[str, str]] = [] + for match in base_full.glob(pattern): + if match.is_file(): + # Convert to virtual path + virtual_path = "/" + str(match.relative_to(self.root_path)) + stat = match.stat() + modified_at = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() + matching.append((virtual_path, modified_at)) + + if not matching: + return "No files found" + + file_paths = [p for p, _ in matching] + return "\n".join(file_paths) + + @tool + def grep_search( + pattern: str, + path: str = "/", + include: str | None = None, + output_mode: Literal["files_with_matches", "content", "count"] = "files_with_matches", + ) -> str: + """Fast content search tool that works with any codebase size. + + Searches file contents using regular expressions. Supports full regex + syntax and filters files by pattern with the include parameter. + + Args: + pattern: The regular expression pattern to search for in file contents. + path: The directory to search in. If not specified, searches from root. + include: File pattern to filter (e.g., "*.js", "*.{ts,tsx}"). + output_mode: Output format: + - "files_with_matches": Only file paths containing matches (default) + - "content": Matching lines with file:line:content format + - "count": Count of matches per file + + Returns: + Search results formatted according to output_mode. Returns "No matches + found" if no results. + """ + # Compile regex pattern (for validation) + try: + re.compile(pattern) + except re.error as e: + return f"Invalid regex pattern: {e}" + + # Try ripgrep first if enabled + results = None + if self.use_ripgrep: + with suppress( + FileNotFoundError, + subprocess.CalledProcessError, + subprocess.TimeoutExpired, + ): + results = self._ripgrep_search(pattern, path, include) + + # Python fallback if ripgrep failed or is disabled + if results is None: + results = self._python_search(pattern, path, include) + + if not results: + return "No matches found" + + # Format output based on mode + return self._format_grep_results(results, output_mode) + + self.glob_search = glob_search + self.grep_search = grep_search + self.tools = [glob_search, grep_search] + + def _validate_and_resolve_path(self, path: str) -> Path: + """Validate and resolve a virtual path to filesystem path.""" + # Normalize path + if not path.startswith("/"): + path = "/" + path + + # Check for path traversal + if ".." in path or "~" in path: + msg = "Path traversal not allowed" + raise ValueError(msg) + + # Convert virtual path to filesystem path + relative = path.lstrip("/") + full_path = (self.root_path / relative).resolve() + + # Ensure path is within root + try: + full_path.relative_to(self.root_path) + except ValueError: + msg = f"Path outside root directory: {path}" + raise ValueError(msg) from None + + return full_path + + def _ripgrep_search( + self, pattern: str, base_path: str, include: str | None + ) -> dict[str, list[tuple[int, str]]]: + """Search using ripgrep subprocess.""" + try: + base_full = self._validate_and_resolve_path(base_path) + except ValueError: + return {} + + if not base_full.exists(): + return {} + + # Build ripgrep command + cmd = ["rg", "--json", pattern, str(base_full)] + + if include: + # Convert glob pattern to ripgrep glob + cmd.extend(["--glob", include]) + + try: + result = subprocess.run( # noqa: S603 + cmd, + capture_output=True, + text=True, + timeout=30, + check=False, + ) + except (subprocess.TimeoutExpired, FileNotFoundError): + # Fallback to Python search if ripgrep unavailable or times out + return self._python_search(pattern, base_path, include) + + # Parse ripgrep JSON output + results: dict[str, list[tuple[int, str]]] = {} + for line in result.stdout.splitlines(): + try: + data = json.loads(line) + if data["type"] == "match": + path = data["data"]["path"]["text"] + # Convert to virtual path + virtual_path = "/" + str(Path(path).relative_to(self.root_path)) + line_num = data["data"]["line_number"] + line_text = data["data"]["lines"]["text"].rstrip("\n") + + if virtual_path not in results: + results[virtual_path] = [] + results[virtual_path].append((line_num, line_text)) + except (json.JSONDecodeError, KeyError): + continue + + return results + + def _python_search( + self, pattern: str, base_path: str, include: str | None + ) -> dict[str, list[tuple[int, str]]]: + """Search using Python regex (fallback).""" + try: + base_full = self._validate_and_resolve_path(base_path) + except ValueError: + return {} + + if not base_full.exists(): + return {} + + regex = re.compile(pattern) + results: dict[str, list[tuple[int, str]]] = {} + + # Walk directory tree + for file_path in base_full.rglob("*"): + if not file_path.is_file(): + continue + + # Check include filter + if include and not self._match_include(file_path.name, include): + continue + + # Skip files that are too large + if file_path.stat().st_size > self.max_file_size_bytes: + continue + + try: + content = file_path.read_text() + except (UnicodeDecodeError, PermissionError): + continue + + # Search content + for line_num, line in enumerate(content.splitlines(), 1): + if regex.search(line): + virtual_path = "/" + str(file_path.relative_to(self.root_path)) + if virtual_path not in results: + results[virtual_path] = [] + results[virtual_path].append((line_num, line)) + + return results + + def _match_include(self, basename: str, pattern: str) -> bool: + """Match filename against include pattern.""" + # Handle brace expansion {a,b,c} + if "{" in pattern and "}" in pattern: + start = pattern.index("{") + end = pattern.index("}") + prefix = pattern[:start] + suffix = pattern[end + 1 :] + alternatives = pattern[start + 1 : end].split(",") + + for alt in alternatives: + expanded = prefix + alt + suffix + if fnmatch.fnmatch(basename, expanded): + return True + return False + return fnmatch.fnmatch(basename, pattern) + + def _format_grep_results( + self, + results: dict[str, list[tuple[int, str]]], + output_mode: str, + ) -> str: + """Format grep results based on output mode.""" + if output_mode == "files_with_matches": + # Just return file paths + return "\n".join(sorted(results.keys())) + + if output_mode == "content": + # Return file:line:content format + lines = [] + for file_path in sorted(results.keys()): + for line_num, line in results[file_path]: + lines.append(f"{file_path}:{line_num}:{line}") + return "\n".join(lines) + + if output_mode == "count": + # Return file:count format + lines = [] + for file_path in sorted(results.keys()): + count = len(results[file_path]) + lines.append(f"{file_path}:{count}") + return "\n".join(lines) + + # Default to files_with_matches + return "\n".join(sorted(results.keys())) + + +__all__ = [ + "FilesystemFileSearchMiddleware", + "StateFileSearchMiddleware", +] diff --git a/libs/langchain_v1/langchain/agents/middleware/filesystem.py b/libs/langchain_v1/langchain/agents/middleware/filesystem.py new file mode 100644 index 0000000000000..ec4ddaa7e5666 --- /dev/null +++ b/libs/langchain_v1/langchain/agents/middleware/filesystem.py @@ -0,0 +1,538 @@ +"""Middleware for providing filesystem tools to an agent.""" +# ruff: noqa: E501 + +from collections.abc import Callable +from typing import TYPE_CHECKING, Annotated, Any + +from typing_extensions import NotRequired + +if TYPE_CHECKING: + from langgraph.runtime import Runtime + +from langchain_core.messages import AIMessage, ToolMessage +from langchain_core.tools import BaseTool, InjectedToolCallId, tool +from langgraph.config import get_config +from langgraph.runtime import Runtime, get_runtime +from langgraph.store.base import BaseStore, Item +from langgraph.types import Command + +from langchain.agents._internal.file_utils import ( + FileData, + append_memories_prefix, + check_empty_content, + create_file_data, + file_data_reducer, + file_data_to_string, + format_content_with_line_numbers, + has_memories_prefix, + strip_memories_prefix, + update_file_data, + validate_path, +) +from langchain.agents.middleware.types import AgentMiddleware, AgentState, ModelRequest +from langchain.tools.tool_node import InjectedState + + +class FilesystemState(AgentState): + """State for the filesystem middleware.""" + + files: Annotated[NotRequired[dict[str, FileData]], file_data_reducer] + """Files in the filesystem.""" + + +LIST_FILES_TOOL_DESCRIPTION = """Lists all files in the filesystem, optionally filtering by directory. + +Usage: +- The list_files tool will return a list of all files in the filesystem. +- You can optionally provide a path parameter to list files in a specific directory. +- This is very useful for exploring the file system and finding the right file to read or edit. +- You should almost ALWAYS use this tool before using the Read or Edit tools.""" +LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = ( + "\n- Files from the longterm filesystem will be prefixed with the /memories/ path." +) + +READ_FILE_TOOL_DESCRIPTION = """Reads a file from the filesystem. You can access any file directly by using this tool. +Assume this tool is able to read all files on the machine. If the User provides a path to a file assume that path is valid. It is okay to read a file that does not exist; an error will be returned. + +Usage: +- The file_path parameter must be an absolute path, not a relative path +- By default, it reads up to 2000 lines starting from the beginning of the file +- You can optionally specify a line offset and limit (especially handy for long files), but it's recommended to read the whole file by not providing these parameters +- Any lines longer than 2000 characters will be truncated +- Results are returned using cat -n format, with line numbers starting at 1 +- You have the capability to call multiple tools in a single response. It is always better to speculatively read multiple files as a batch that are potentially useful. +- If you read a file that exists but has empty contents you will receive a system reminder warning in place of file contents. +- You should ALWAYS make sure a file has been read before editing it.""" +READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = ( + "\n- file_paths prefixed with the /memories/ path will be read from the longterm filesystem." +) + +EDIT_FILE_TOOL_DESCRIPTION = """Performs exact string replacements in files. + +Usage: +- You must use your `Read` tool at least once in the conversation before editing. This tool will error if you attempt an edit without reading the file. +- When editing text from Read tool output, ensure you preserve the exact indentation (tabs/spaces) as it appears AFTER the line number prefix. The line number prefix format is: spaces + line number + tab. Everything after that tab is the actual file content to match. Never include any part of the line number prefix in the old_string or new_string. +- ALWAYS prefer editing existing files. NEVER write new files unless explicitly required. +- Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked. +- The edit will FAIL if `old_string` is not unique in the file. Either provide a larger string with more surrounding context to make it unique or use `replace_all` to change every instance of `old_string`. +- Use `replace_all` for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance.""" +EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = "\n- You can edit files in the longterm filesystem by prefixing the filename with the /memories/ path." + +WRITE_FILE_TOOL_DESCRIPTION = """Writes to a new file in the filesystem. + +Usage: +- The file_path parameter must be an absolute path, not a relative path +- The content parameter must be a string +- The write_file tool will create the a new file. +- Prefer to edit existing files over creating new ones when possible. +- file_paths prefixed with the /memories/ path will be written to the longterm filesystem.""" +WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT = ( + "\n- file_paths prefixed with the /memories/ path will be written to the longterm filesystem." +) + +FILESYSTEM_SYSTEM_PROMPT = """## Filesystem Tools `ls`, `read_file`, `write_file`, `edit_file` + +You have access to a filesystem which you can interact with using these tools. +All file paths must start with a /. + +- ls: list all files in the filesystem +- read_file: read a file from the filesystem +- write_file: write to a file in the filesystem +- edit_file: edit a file in the filesystem""" +FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT = """ + +You also have access to a longterm filesystem in which you can store files that you want to keep around for longer than the current conversation. +In order to interact with the longterm filesystem, you can use those same tools, but filenames must be prefixed with the /memories/ path. +Remember, to interact with the longterm filesystem, you must prefix the filename with the /memories/ path.""" + + +def _get_namespace() -> tuple[str] | tuple[str, str]: + namespace = "filesystem" + config = get_config() + if config is None: + return (namespace,) + assistant_id = config.get("metadata", {}).get("assistant_id") + if assistant_id is None: + return (namespace,) + return (assistant_id, "filesystem") + + +def _get_store(runtime: Runtime[Any]) -> BaseStore: + if runtime.store is None: + msg = "Longterm memory is enabled, but no store is available" + raise ValueError(msg) + return runtime.store + + +def _convert_store_item_to_file_data(store_item: Item) -> FileData: + if "content" not in store_item.value or not isinstance(store_item.value["content"], list): + msg = "Store item does not contain content" + raise ValueError(msg) + if "created_at" not in store_item.value or not isinstance(store_item.value["created_at"], str): + msg = "Store item does not contain created_at" + raise ValueError(msg) + if "modified_at" not in store_item.value or not isinstance( + store_item.value["modified_at"], str + ): + msg = "Store item does not contain modified_at" + raise ValueError(msg) + return FileData( + content=store_item.value["content"], + created_at=store_item.value["created_at"], + modified_at=store_item.value["modified_at"], + ) + + +def _convert_file_data_to_store_item(file_data: FileData) -> dict[str, Any]: + return { + "content": file_data["content"], + "created_at": file_data["created_at"], + "modified_at": file_data["modified_at"], + } + + +def _get_file_data_from_state(state: FilesystemState, file_path: str) -> FileData: + mock_filesystem = state.get("files", {}) + if file_path not in mock_filesystem: + msg = f"File '{file_path}' not found" + raise ValueError(msg) + return mock_filesystem[file_path] + + +def _ls_tool_generator( + custom_description: str | None = None, *, has_longterm_memory: bool +) -> BaseTool: + tool_description = LIST_FILES_TOOL_DESCRIPTION + if custom_description: + tool_description = custom_description + elif has_longterm_memory: + tool_description += LIST_FILES_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT + + def _get_filenames_from_state(state: FilesystemState) -> list[str]: + files_dict = state.get("files", {}) + return list(files_dict.keys()) + + def _filter_files_by_path(filenames: list[str], path: str | None) -> list[str]: + if path is None: + return filenames + normalized_path = validate_path(path) + return [f for f in filenames if f.startswith(normalized_path)] + + if has_longterm_memory: + + @tool(description=tool_description) + def ls( + state: Annotated[FilesystemState, InjectedState], path: str | None = None + ) -> list[str]: + files = _get_filenames_from_state(state) + # Add filenames from longterm memory + runtime = get_runtime() + store = _get_store(runtime) + namespace = _get_namespace() + longterm_files = store.search(namespace) + longterm_files_prefixed = [append_memories_prefix(f.key) for f in longterm_files] + files.extend(longterm_files_prefixed) + return _filter_files_by_path(files, path) + else: + + @tool(description=tool_description) + def ls( + state: Annotated[FilesystemState, InjectedState], path: str | None = None + ) -> list[str]: + files = _get_filenames_from_state(state) + return _filter_files_by_path(files, path) + + return ls + + +def _read_file_tool_generator( + custom_description: str | None = None, *, has_longterm_memory: bool +) -> BaseTool: + tool_description = READ_FILE_TOOL_DESCRIPTION + if custom_description: + tool_description = custom_description + elif has_longterm_memory: + tool_description += READ_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT + + def _read_file_data_content(file_data: FileData, offset: int, limit: int) -> str: + content = file_data_to_string(file_data) + empty_msg = check_empty_content(content) + if empty_msg: + return empty_msg + lines = content.splitlines() + start_idx = offset + end_idx = min(start_idx + limit, len(lines)) + if start_idx >= len(lines): + return f"Error: Line offset {offset} exceeds file length ({len(lines)} lines)" + selected_lines = lines[start_idx:end_idx] + return format_content_with_line_numbers( + selected_lines, format_style="tab", start_line=start_idx + 1 + ) + + if has_longterm_memory: + + @tool(description=tool_description) + def read_file( + file_path: str, + state: Annotated[FilesystemState, InjectedState], + offset: int = 0, + limit: int = 2000, + ) -> str: + file_path = validate_path(file_path) + if has_memories_prefix(file_path): + stripped_file_path = strip_memories_prefix(file_path) + runtime = get_runtime() + store = _get_store(runtime) + namespace = _get_namespace() + item: Item | None = store.get(namespace, stripped_file_path) + if item is None: + return f"Error: File '{file_path}' not found" + file_data = _convert_store_item_to_file_data(item) + else: + try: + file_data = _get_file_data_from_state(state, file_path) + except ValueError as e: + return str(e) + return _read_file_data_content(file_data, offset, limit) + + else: + + @tool(description=tool_description) + def read_file( + file_path: str, + state: Annotated[FilesystemState, InjectedState], + offset: int = 0, + limit: int = 2000, + ) -> str: + file_path = validate_path(file_path) + try: + file_data = _get_file_data_from_state(state, file_path) + except ValueError as e: + return str(e) + return _read_file_data_content(file_data, offset, limit) + + return read_file + + +def _write_file_tool_generator( + custom_description: str | None = None, *, has_longterm_memory: bool +) -> BaseTool: + tool_description = WRITE_FILE_TOOL_DESCRIPTION + if custom_description: + tool_description = custom_description + elif has_longterm_memory: + tool_description += WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT + + def _write_file_to_state( + state: FilesystemState, tool_call_id: str, file_path: str, content: str + ) -> Command | str: + mock_filesystem = state.get("files", {}) + existing = mock_filesystem.get(file_path) + if existing: + return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path." + new_file_data = create_file_data(content) + return Command( + update={ + "files": {file_path: new_file_data}, + "messages": [ToolMessage(f"Updated file {file_path}", tool_call_id=tool_call_id)], + } + ) + + if has_longterm_memory: + + @tool(description=tool_description) + def write_file( + file_path: str, + content: str, + state: Annotated[FilesystemState, InjectedState], + tool_call_id: Annotated[str, InjectedToolCallId], + ) -> Command | str: + file_path = validate_path(file_path) + if has_memories_prefix(file_path): + stripped_file_path = strip_memories_prefix(file_path) + runtime = get_runtime() + store = _get_store(runtime) + namespace = _get_namespace() + if store.get(namespace, stripped_file_path) is not None: + return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path." + new_file_data = create_file_data(content) + store.put( + namespace, stripped_file_path, _convert_file_data_to_store_item(new_file_data) + ) + return f"Updated longterm memories file {file_path}" + return _write_file_to_state(state, tool_call_id, file_path, content) + + else: + + @tool(description=tool_description) + def write_file( + file_path: str, + content: str, + state: Annotated[FilesystemState, InjectedState], + tool_call_id: Annotated[str, InjectedToolCallId], + ) -> Command | str: + file_path = validate_path(file_path) + return _write_file_to_state(state, tool_call_id, file_path, content) + + return write_file + + +def _edit_file_tool_generator( + custom_description: str | None = None, *, has_longterm_memory: bool +) -> BaseTool: + tool_description = EDIT_FILE_TOOL_DESCRIPTION + if custom_description: + tool_description = custom_description + elif has_longterm_memory: + tool_description += EDIT_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT + + if has_longterm_memory: + + @tool(description=tool_description) + def edit_file( + file_path: str, + old_string: str, + new_string: str, + state: Annotated[FilesystemState, InjectedState], + tool_call_id: Annotated[str, InjectedToolCallId], + *, + replace_all: bool = False, + ) -> Command | str: + file_path = validate_path(file_path) + is_longterm_memory = has_memories_prefix(file_path) + if is_longterm_memory: + stripped_file_path = strip_memories_prefix(file_path) + runtime = get_runtime() + store = _get_store(runtime) + namespace = _get_namespace() + item: Item | None = store.get(namespace, stripped_file_path) + if item is None: + return f"Error: File '{file_path}' not found" + file_data = _convert_store_item_to_file_data(item) + else: + try: + file_data = _get_file_data_from_state(state, file_path) + except ValueError as e: + return str(e) + + content = file_data_to_string(file_data) + occurrences = content.count(old_string) + if occurrences == 0: + return f"Error: String not found in file: '{old_string}'" + if occurrences > 1 and not replace_all: + return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context." + new_content = content.replace(old_string, new_string) + new_file_data = update_file_data(file_data, new_content) + result_msg = ( + f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'" + ) + if is_longterm_memory: + store.put( + namespace, stripped_file_path, _convert_file_data_to_store_item(new_file_data) + ) + return result_msg + return Command( + update={ + "files": {file_path: new_file_data}, + "messages": [ToolMessage(result_msg, tool_call_id=tool_call_id)], + } + ) + else: + + @tool(description=tool_description) + def edit_file( + file_path: str, + old_string: str, + new_string: str, + state: Annotated[FilesystemState, InjectedState], + tool_call_id: Annotated[str, InjectedToolCallId], + *, + replace_all: bool = False, + ) -> Command | str: + file_path = validate_path(file_path) + try: + file_data = _get_file_data_from_state(state, file_path) + except ValueError as e: + return str(e) + content = file_data_to_string(file_data) + occurrences = content.count(old_string) + if occurrences == 0: + return f"Error: String not found in file: '{old_string}'" + if occurrences > 1 and not replace_all: + return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context." + new_content = content.replace(old_string, new_string) + new_file_data = update_file_data(file_data, new_content) + result_msg = ( + f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'" + ) + return Command( + update={ + "files": {file_path: new_file_data}, + "messages": [ToolMessage(result_msg, tool_call_id=tool_call_id)], + } + ) + + return edit_file + + +TOOL_GENERATORS = { + "ls": _ls_tool_generator, + "read_file": _read_file_tool_generator, + "write_file": _write_file_tool_generator, + "edit_file": _edit_file_tool_generator, +} + + +def _get_filesystem_tools( + custom_tool_descriptions: dict[str, str] | None = None, *, has_longterm_memory: bool +) -> list[BaseTool]: + """Get filesystem tools. + + Args: + has_longterm_memory: Whether to enable longterm memory support. + custom_tool_descriptions: Optional custom descriptions for tools. + + Returns: + List of configured filesystem tools. + """ + if custom_tool_descriptions is None: + custom_tool_descriptions = {} + tools = [] + for tool_name, tool_generator in TOOL_GENERATORS.items(): + tool = tool_generator( + custom_tool_descriptions.get(tool_name), has_longterm_memory=has_longterm_memory + ) + tools.append(tool) + return tools + + +class FilesystemMiddleware(AgentMiddleware): + """Middleware for providing filesystem tools to an agent. + + Args: + use_longterm_memory: Whether to enable longterm memory support. + system_prompt_extension: Optional custom system prompt. + custom_tool_descriptions: Optional custom tool descriptions. + + Returns: + List of configured filesystem tools. + + Raises: + ValueError: If longterm memory is enabled but no store is available. + + Example: + ```python + from langchain.agents.middleware.filesystem import FilesystemMiddleware + from langchain.agents import create_agent + + agent = create_agent(middleware=[FilesystemMiddleware(use_longterm_memory=False)]) + ``` + """ + + state_schema = FilesystemState + + def __init__( + self, + *, + use_longterm_memory: bool = False, + system_prompt_extension: str | None = None, + custom_tool_descriptions: dict[str, str] | None = None, + ) -> None: + """Initialize the filesystem middleware. + + Args: + use_longterm_memory: Whether to enable longterm memory support. + system_prompt_extension: Optional custom system prompt. + custom_tool_descriptions: Optional custom tool descriptions. + """ + self.use_longterm_memory = use_longterm_memory + self.system_prompt_extension = FILESYSTEM_SYSTEM_PROMPT + if system_prompt_extension is not None: + self.system_prompt_extension = system_prompt_extension + elif use_longterm_memory: + self.system_prompt_extension += FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT + + self.tools = _get_filesystem_tools( + custom_tool_descriptions, has_longterm_memory=use_longterm_memory + ) + + def before_model_call(self, request: ModelRequest, runtime: Runtime[Any]) -> ModelRequest: + """If use_longterm_memory is True, we must have a store available.""" + if self.use_longterm_memory and runtime.store is None: + msg = "Longterm memory is enabled, but no store is available" + raise ValueError(msg) + return request + + def wrap_model_call( + self, + request: ModelRequest, + handler: Callable[[ModelRequest], AIMessage], + ) -> AIMessage: + """Update the system prompt to include instructions on using the filesystem.""" + if self.system_prompt_extension is not None: + request.system_prompt = ( + request.system_prompt + "\n\n" + self.system_prompt_extension + if request.system_prompt + else self.system_prompt_extension + ) + return handler(request) diff --git a/libs/langchain_v1/langchain/agents/middleware/prompt_caching.py b/libs/langchain_v1/langchain/agents/middleware/prompt_caching.py index ae640a7459d80..80569d61f440d 100644 --- a/libs/langchain_v1/langchain/agents/middleware/prompt_caching.py +++ b/libs/langchain_v1/langchain/agents/middleware/prompt_caching.py @@ -51,7 +51,7 @@ def wrap_model_call( try: from langchain_anthropic import ChatAnthropic except ImportError: - ChatAnthropic = None # noqa: N806 + ChatAnthropic = None # type: ignore[assignment,misc] # noqa: N806 msg: str | None = None diff --git a/libs/langchain_v1/langchain/agents/middleware/subagents.py b/libs/langchain_v1/langchain/agents/middleware/subagents.py new file mode 100644 index 0000000000000..52d3596418eeb --- /dev/null +++ b/libs/langchain_v1/langchain/agents/middleware/subagents.py @@ -0,0 +1,360 @@ +"""Middleware for providing subagents to an agent via a `task` tool.""" + +from collections.abc import Callable, Sequence +from typing import Annotated, Any, TypedDict, cast + +from langchain_core.language_models import BaseChatModel +from langchain_core.messages import AIMessage, HumanMessage, ToolMessage +from langchain_core.runnables import Runnable +from langchain_core.tools import BaseTool, tool +from langgraph.types import Command +from typing_extensions import NotRequired + +from langchain.agents.middleware.filesystem import FilesystemMiddleware +from langchain.agents.middleware.planning import PlanningMiddleware +from langchain.agents.middleware.prompt_caching import AnthropicPromptCachingMiddleware +from langchain.agents.middleware.summarization import SummarizationMiddleware +from langchain.agents.middleware.types import AgentMiddleware, ModelRequest +from langchain.tools import InjectedState, InjectedToolCallId + + +class SubAgent(TypedDict): + """A subagent constructed with user-defined parameters.""" + + name: str + """The name of the subagent.""" + + description: str + """The description of the subagent.""" + + system_prompt: str + """The system prompt to use for the subagent.""" + + tools: Sequence[BaseTool | Callable | dict[str, Any]] + """The tools to use for the subagent.""" + + model: NotRequired[str | BaseChatModel] + """The model for the subagent.""" + + middleware: NotRequired[list[AgentMiddleware]] + """The middleware to use for the subagent.""" + + +class CompiledSubAgent(TypedDict): + """A Runnable passed in as a subagent.""" + + name: str + """The name of the subagent.""" + + description: str + """The description of the subagent.""" + + runnable: Runnable + """The Runnable to use for the subagent.""" + + +DEFAULT_SUBAGENT_PROMPT = "In order to complete the objective that the user asks of you, you have access to a number of standard tools." # noqa: E501 + +TASK_TOOL_DESCRIPTION = """Launch an ephemeral subagent to handle complex, multi-step independent tasks with isolated context windows. + +Available agent types and the tools they have access to: +- general-purpose: General-purpose agent for researching complex questions, searching for files and content, and executing multi-step tasks. When you are searching for a keyword or file and are not confident that you will find the right match in the first few tries use this agent to perform the search for you. This agent has access to all tools as the main agent. +{other_agents} + +When using the Task tool, you must specify a subagent_type parameter to select which agent type to use. + +## Usage notes: +1. Launch multiple agents concurrently whenever possible, to maximize performance; to do that, use a single message with multiple tool uses +2. When the agent is done, it will return a single message back to you. The result returned by the agent is not visible to the user. To show the user the result, you should send a text message back to the user with a concise summary of the result. +3. Each agent invocation is stateless. You will not be able to send additional messages to the agent, nor will the agent be able to communicate with you outside of its final report. Therefore, your prompt should contain a highly detailed task description for the agent to perform autonomously and you should specify exactly what information the agent should return back to you in its final and only message to you. +4. The agent's outputs should generally be trusted +5. Clearly tell the agent whether you expect it to create content, perform analysis, or just do research (search, file reads, web fetches, etc.), since it is not aware of the user's intent +6. If the agent description mentions that it should be used proactively, then you should try your best to use it without the user having to ask for it first. Use your judgement. +7. When only the general-purpose agent is provided, you should use it for all tasks. It is great for isolating context and token usage, and completing specific, complex tasks, as it has all the same capabilities as the main agent. + +### Example usage of the general-purpose agent: + + +"general-purpose": use this agent for general purpose tasks, it has access to all tools as the main agent. + + + +User: "I want to conduct research on the accomplishments of Lebron James, Michael Jordan, and Kobe Bryant, and then compare them." +Assistant: *Uses the task tool in parallel to conduct isolated research on each of the three players* +Assistant: *Synthesizes the results of the three isolated research tasks and responds to the User* + +Research is a complex, multi-step task in it of itself. +The research of each individual player is not dependent on the research of the other players. +The assistant uses the task tool to break down the complex objective into three isolated tasks. +Each research task only needs to worry about context and tokens about one player, then returns synthesized information about each player as the Tool Result. +This means each research task can dive deep and spend tokens and context deeply researching each player, but the final result is synthesized information, and saves us tokens in the long run when comparing the players to each other. + + + + +User: "Analyze a single large code repository for security vulnerabilities and generate a report." +Assistant: *Launches a single `task` subagent for the repository analysis* +Assistant: *Receives report and integrates results into final summary* + +Subagent is used to isolate a large, context-heavy task, even though there is only one. This prevents the main thread from being overloaded with details. +If the user then asks followup questions, we have a concise report to reference instead of the entire history of analysis and tool calls, which is good and saves us time and money. + + + + +User: "Schedule two meetings for me and prepare agendas for each." +Assistant: *Calls the task tool in parallel to launch two `task` subagents (one per meeting) to prepare agendas* +Assistant: *Returns final schedules and agendas* + +Tasks are simple individually, but subagents help silo agenda preparation. +Each subagent only needs to worry about the agenda for one meeting. + + + + +User: "I want to order a pizza from Dominos, order a burger from McDonald's, and order a salad from Subway." +Assistant: *Calls tools directly in parallel to order a pizza from Dominos, a burger from McDonald's, and a salad from Subway* + +The assistant did not use the task tool because the objective is super simple and clear and only requires a few trivial tool calls. +It is better to just complete the task directly and NOT use the `task`tool. + + + +### Example usage with custom agents: + + +"content-reviewer": use this agent after you are done creating significant content or documents +"greeting-responder": use this agent when to respond to user greetings with a friendly joke +"research-analyst": use this agent to conduct thorough research on complex topics + + + +user: "Please write a function that checks if a number is prime" +assistant: Sure let me write a function that checks if a number is prime +assistant: First let me use the Write tool to write a function that checks if a number is prime +assistant: I'm going to use the Write tool to write the following code: + +function isPrime(n) {{ + if (n <= 1) return false + for (let i = 2; i * i <= n; i++) {{ + if (n % i === 0) return false + }} + return true +}} + + +Since significant content was created and the task was completed, now use the content-reviewer agent to review the work + +assistant: Now let me use the content-reviewer agent to review the code +assistant: Uses the Task tool to launch with the content-reviewer agent + + + +user: "Can you help me research the environmental impact of different renewable energy sources and create a comprehensive report?" + +This is a complex research task that would benefit from using the research-analyst agent to conduct thorough analysis + +assistant: I'll help you research the environmental impact of renewable energy sources. Let me use the research-analyst agent to conduct comprehensive research on this topic. +assistant: Uses the Task tool to launch with the research-analyst agent, providing detailed instructions about what research to conduct and what format the report should take + + + +user: "Hello" + +Since the user is greeting, use the greeting-responder agent to respond with a friendly joke + +assistant: "I'm going to use the Task tool to launch with the greeting-responder agent" +""" # noqa: E501 + + +def _get_subagents( + default_subagent_model: str | BaseChatModel, + default_subagent_tools: Sequence[BaseTool | Callable | dict[str, Any]], + subagents: list[SubAgent | CompiledSubAgent], +) -> tuple[dict[str, Any], list[str]]: + from langchain.agents.factory import create_agent + + default_subagent_middleware = [ + PlanningMiddleware(), + FilesystemMiddleware(), + SummarizationMiddleware( + model=default_subagent_model, + max_tokens_before_summary=120000, + messages_to_keep=20, + ), + AnthropicPromptCachingMiddleware(ttl="5m", unsupported_model_behavior="ignore"), + ] + + # Create the general-purpose subagent + general_purpose_subagent = create_agent( + model=default_subagent_model, + system_prompt=DEFAULT_SUBAGENT_PROMPT, + tools=default_subagent_tools, + middleware=default_subagent_middleware, + ) + agents: dict[str, Any] = {"general-purpose": general_purpose_subagent} + subagent_descriptions = [] + for _agent in subagents: + subagent_descriptions.append(f"- {_agent['name']}: {_agent['description']}") + if "runnable" in _agent: + custom_agent = cast("CompiledSubAgent", _agent) + agents[custom_agent["name"]] = custom_agent["runnable"] + continue + _tools = _agent.get("tools", list(default_subagent_tools)) + + subagent_model = _agent.get("model", default_subagent_model) + + if "middleware" in _agent: + _middleware = [*default_subagent_middleware, *_agent["middleware"]] + else: + _middleware = default_subagent_middleware + + agents[_agent["name"]] = create_agent( + subagent_model, + system_prompt=_agent["system_prompt"], + tools=_tools, + middleware=_middleware, + checkpointer=False, + ) + return agents, subagent_descriptions + + +def _create_task_tool( + default_subagent_model: str | BaseChatModel, + default_subagent_tools: Sequence[BaseTool | Callable | dict[str, Any]], + subagents: list[SubAgent | CompiledSubAgent], + *, + is_async: bool = False, +) -> BaseTool: + subagent_graphs, subagent_descriptions = _get_subagents( + default_subagent_model, default_subagent_tools, subagents + ) + subagent_description_str = "\n".join(subagent_descriptions) + + def _return_command_with_state_update(result: dict, tool_call_id: str) -> Command: + state_update = {k: v for k, v in result.items() if k not in ["todos", "messages"]} + return Command( + update={ + **state_update, + "messages": [ + ToolMessage(result["messages"][-1].content, tool_call_id=tool_call_id) + ], + } + ) + + task_tool_description = TASK_TOOL_DESCRIPTION.format(other_agents=subagent_description_str) + if is_async: + + @tool(description=task_tool_description) + async def task( + description: str, + subagent_type: str, + state: Annotated[dict, InjectedState], + tool_call_id: Annotated[str, InjectedToolCallId], + ) -> str | Command: + if subagent_type not in subagent_graphs: + msg = ( + f"Error: invoked agent of type {subagent_type}, " + f"the only allowed types are {[f'`{k}`' for k in subagent_graphs]}" + ) + raise ValueError(msg) + subagent = subagent_graphs[subagent_type] + state["messages"] = [HumanMessage(content=description)] + if "todos" in state: + del state["todos"] + result = await subagent.ainvoke(state) + return _return_command_with_state_update(result, tool_call_id) + else: + + @tool(description=task_tool_description) + def task( + description: str, + subagent_type: str, + state: Annotated[dict, InjectedState], + tool_call_id: Annotated[str, InjectedToolCallId], + ) -> str | Command: + if subagent_type not in subagent_graphs: + msg = ( + f"Error: invoked agent of type {subagent_type}, " + f"the only allowed types are {[f'`{k}`' for k in subagent_graphs]}" + ) + raise ValueError(msg) + subagent = subagent_graphs[subagent_type] + state["messages"] = [HumanMessage(content=description)] + if "todos" in state: + del state["todos"] + result = subagent.invoke(state) + return _return_command_with_state_update(result, tool_call_id) + + return task + + +class SubAgentMiddleware(AgentMiddleware): + """Middleware for providing subagents to an agent via a `task` tool. + + This middleware adds a `task` tool to the agent that can be used to invoke subagents. + Subagents are useful for handling complex tasks that require multiple steps, or tasks + that require a lot of context to resolve. + + A chief benefit of subagents is that they can handle multi-step tasks, and then return + a clean, concise response to the main agent. + + Subagents are also great for different domains of expertise that require a narrower + subset of tools and focus. + + This middleware comes with a default general-purpose subagent that can be used to + handle the same tasks as the main agent, but with isolated context. + + Args: + default_subagent_model: The model to use for the general-purpose subagent. + Can be a LanguageModelLike or a dict for init_chat_model. + default_subagent_tools: The tools to use for the general-purpose subagent. + subagents: A list of additional subagents to provide to the agent. + system_prompt_extension: Additional instructions on how the main agent should use subagents. + is_async: Whether the `task` tool should be asynchronous. + + Example: + ```python + from langchain.agents.middleware.subagents import SubAgentMiddleware + from langchain.agents import create_agent + + agent = create_agent("openai:gpt-4o", middleware=[SubAgentMiddleware(subagents=[])]) + + # Agent now has access to the `task` tool + result = await agent.invoke({"messages": [HumanMessage("Help me refactor my codebase")]}) + ``` + """ + + def __init__( + self, + *, + default_subagent_model: str | BaseChatModel, + default_subagent_tools: Sequence[BaseTool | Callable | dict[str, Any]] | None = None, + subagents: list[SubAgent | CompiledSubAgent] | None = None, + system_prompt_extension: str | None = None, + is_async: bool = False, + ) -> None: + """Initialize the SubAgentMiddleware.""" + super().__init__() + self.system_prompt_extension = system_prompt_extension + task_tool = _create_task_tool( + default_subagent_model, + default_subagent_tools or [], + subagents or [], + is_async=is_async, + ) + self.tools = [task_tool] + + def wrap_model_call( + self, + request: ModelRequest, + handler: Callable[[ModelRequest], AIMessage], + ) -> AIMessage: + """Update the system prompt to include instructions on using subagents.""" + if self.system_prompt_extension is not None: + request.system_prompt = ( + request.system_prompt + "\n\n" + self.system_prompt_extension + if request.system_prompt + else self.system_prompt_extension + ) + return handler(request) diff --git a/libs/langchain_v1/pyproject.toml b/libs/langchain_v1/pyproject.toml index a4a2ee01a9167..e3c9d04a534ca 100644 --- a/libs/langchain_v1/pyproject.toml +++ b/libs/langchain_v1/pyproject.toml @@ -57,7 +57,8 @@ test = [ "toml>=0.10.2,<1.0.0", "langchain-tests", "langchain-text-splitters", - "langchain-openai" + "langchain-openai", + "langchain-anthropic", ] lint = [ "ruff>=0.12.2,<0.13.0", diff --git a/libs/langchain_v1/tests/integration_tests/agents/middleware/test_filesystem_middleware.py b/libs/langchain_v1/tests/integration_tests/agents/middleware/test_filesystem_middleware.py new file mode 100644 index 0000000000000..39bcff25ef9d5 --- /dev/null +++ b/libs/langchain_v1/tests/integration_tests/agents/middleware/test_filesystem_middleware.py @@ -0,0 +1,722 @@ +from langchain.agents.middleware.filesystem import ( + FilesystemMiddleware, + WRITE_FILE_TOOL_DESCRIPTION, + WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT, +) +from langchain.agents import create_agent +from langchain.agents.deepagents import create_deep_agent +from langchain_core.messages import HumanMessage +from langchain_anthropic import ChatAnthropic +from langgraph.store.memory import InMemoryStore +from langgraph.checkpoint.memory import MemorySaver +from langchain.agents._internal.file_utils import FileData +import pytest +import uuid + + +@pytest.mark.requires("langchain_anthropic") +class TestFilesystem: + def test_create_deepagent_without_store_and_with_longterm_memory_should_fail(self): + with pytest.raises(ValueError): + deepagent = create_deep_agent(tools=[], use_longterm_memory=True) + deepagent.invoke( + {"messages": [HumanMessage(content="List all of the files in your filesystem?")]} + ) + + def test_filesystem_system_prompt_override(self): + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=False, + system_prompt_extension="In every single response, you must say the word 'pokemon'! You love it!", + ) + ], + ) + response = agent.invoke({"messages": [HumanMessage(content="What do you like?")]}) + assert "pokemon" in response["messages"][1].text.lower() + + def test_filesystem_system_prompt_override_with_longterm_memory(self): + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + system_prompt_extension="In every single response, you must say the word 'pokemon'! You love it!", + ) + ], + store=InMemoryStore(), + ) + response = agent.invoke({"messages": [HumanMessage(content="What do you like?")]}) + assert "pokemon" in response["messages"][1].text.lower() + + def test_filesystem_tool_prompt_override(self): + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=False, + custom_tool_descriptions={ + "ls": "Charmander", + "read_file": "Bulbasaur", + "edit_file": "Squirtle", + }, + ) + ], + ) + tools = agent.nodes["tools"].bound._tools_by_name + assert "ls" in tools + assert tools["ls"].description == "Charmander" + assert "read_file" in tools + assert tools["read_file"].description == "Bulbasaur" + assert "write_file" in tools + assert tools["write_file"].description == WRITE_FILE_TOOL_DESCRIPTION + assert "edit_file" in tools + assert tools["edit_file"].description == "Squirtle" + + def test_filesystem_tool_prompt_override_with_longterm_memory(self): + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + custom_tool_descriptions={ + "ls": "Charmander", + "read_file": "Bulbasaur", + "edit_file": "Squirtle", + }, + ) + ], + store=InMemoryStore(), + ) + tools = agent.nodes["tools"].bound._tools_by_name + assert "ls" in tools + assert tools["ls"].description == "Charmander" + assert "read_file" in tools + assert tools["read_file"].description == "Bulbasaur" + assert "write_file" in tools + assert ( + tools["write_file"].description + == WRITE_FILE_TOOL_DESCRIPTION + WRITE_FILE_TOOL_DESCRIPTION_LONGTERM_SUPPLEMENT + ) + assert "edit_file" in tools + assert tools["edit_file"].description == "Squirtle" + + def test_ls_longterm_without_path(self): + checkpointer = MemorySaver() + store = InMemoryStore() + store.put( + ("filesystem",), + "/test.txt", + { + "content": ["Hello world"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + store.put( + ("filesystem",), + "/pokemon/charmander.txt", + { + "content": ["Ember"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [HumanMessage(content="List all of your files")], + "files": { + "/pizza.txt": FileData( + content=["Hello world"], + created_at="2021-01-01", + modified_at="2021-01-01", + ), + "/pokemon/squirtle.txt": FileData( + content=["Splash"], + created_at="2021-01-01", + modified_at="2021-01-01", + ), + }, + }, + config=config, + ) + messages = response["messages"] + ls_message = next( + message for message in messages if message.type == "tool" and message.name == "ls" + ) + assert "/pizza.txt" in ls_message.text + assert "/pokemon/squirtle.txt" in ls_message.text + assert "/memories/test.txt" in ls_message.text + assert "/memories/pokemon/charmander.txt" in ls_message.text + + def test_ls_longterm_with_path(self): + checkpointer = MemorySaver() + store = InMemoryStore() + store.put( + ("filesystem",), + "/test.txt", + { + "content": ["Hello world"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + store.put( + ("filesystem",), + "/pokemon/charmander.txt", + { + "content": ["Ember"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage(content="List all of your files in the /pokemon directory") + ], + "files": { + "/pizza.txt": FileData( + content=["Hello world"], + created_at="2021-01-01", + modified_at="2021-01-01", + ), + "/pokemon/squirtle.txt": FileData( + content=["Splash"], + created_at="2021-01-01", + modified_at="2021-01-01", + ), + }, + }, + config=config, + ) + messages = response["messages"] + ls_message = next( + message for message in messages if message.type == "tool" and message.name == "ls" + ) + assert "/pokemon/squirtle.txt" in ls_message.text + assert "/memories/pokemon/charmander.txt" not in ls_message.text + + def test_read_file_longterm_local_file(self): + checkpointer = MemorySaver() + store = InMemoryStore() + store.put( + ("filesystem",), + "/test.txt", + { + "content": ["Hello world"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [HumanMessage(content="Read test.txt from local memory")], + "files": { + "/test.txt": FileData( + content=["Goodbye world"], + created_at="2021-01-01", + modified_at="2021-01-01", + ) + }, + }, + config=config, + ) + messages = response["messages"] + read_file_message = next( + message + for message in messages + if message.type == "tool" and message.name == "read_file" + ) + assert read_file_message is not None + assert "Goodbye world" in read_file_message.content + + def test_read_file_longterm_store_file(self): + checkpointer = MemorySaver() + store = InMemoryStore() + store.put( + ("filesystem",), + "/test.txt", + { + "content": ["Hello world"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [HumanMessage(content="Read test.txt from longterm memory")], + "files": { + "/test.txt": FileData( + content=["Goodbye world"], + created_at="2021-01-01", + modified_at="2021-01-01", + ) + }, + }, + config=config, + ) + messages = response["messages"] + read_file_message = next( + message + for message in messages + if message.type == "tool" and message.name == "read_file" + ) + assert read_file_message is not None + assert "Hello world" in read_file_message.content + + def test_read_file_longterm(self): + checkpointer = MemorySaver() + store = InMemoryStore() + store.put( + ("filesystem",), + "/test.txt", + { + "content": ["Hello world"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + store.put( + ("filesystem",), + "/pokemon/charmander.txt", + { + "content": ["Ember"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Read the contents of the file about charmander from longterm memory." + ) + ], + "files": {}, + }, + config=config, + ) + messages = response["messages"] + ai_msg_w_toolcall = next( + message + for message in messages + if message.type == "ai" + and any( + tc["name"] == "read_file" + and tc["args"]["file_path"] == "/memories/pokemon/charmander.txt" + for tc in message.tool_calls + ) + ) + assert ai_msg_w_toolcall is not None + + def test_write_file_longterm(self): + checkpointer = MemorySaver() + store = InMemoryStore() + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'" + ) + ], + "files": {}, + }, + config=config, + ) + messages = response["messages"] + write_file_message = next( + message + for message in messages + if message.type == "tool" and message.name == "write_file" + ) + assert write_file_message is not None + file_item = store.get(("filesystem",), "/charmander.txt") + assert file_item is not None + assert any("fiery" in c for c in file_item.value["content"]) or any( + "Fiery" in c for c in file_item.value["content"] + ) + + def test_write_file_fail_already_exists_in_store(self): + checkpointer = MemorySaver() + store = InMemoryStore() + store.put( + ("filesystem",), + "/charmander.txt", + { + "content": ["Hello world"], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'" + ) + ], + "files": {}, + }, + config=config, + ) + messages = response["messages"] + write_file_message = next( + message + for message in messages + if message.type == "tool" and message.name == "write_file" + ) + assert write_file_message is not None + assert "Cannot write" in write_file_message.content + + def test_write_file_fail_already_exists_in_local(self): + checkpointer = MemorySaver() + store = InMemoryStore() + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Write a haiku about Charmander to /charmander.txt, use the word 'fiery'" + ) + ], + "files": { + "/charmander.txt": FileData( + content=["Hello world"], + created_at="2021-01-01", + modified_at="2021-01-01", + ) + }, + }, + config=config, + ) + messages = response["messages"] + write_file_message = next( + message + for message in messages + if message.type == "tool" and message.name == "write_file" + ) + assert write_file_message is not None + assert "Cannot write" in write_file_message.content + + def test_edit_file_longterm(self): + checkpointer = MemorySaver() + store = InMemoryStore() + store.put( + ("filesystem",), + "/charmander.txt", + { + "content": ["The fire burns brightly. The fire burns hot."], + "created_at": "2021-01-01", + "modified_at": "2021-01-01", + }, + ) + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Edit the longterm memory file about charmander, to replace all instances of the word 'fire' with 'embers'" + ) + ], + "files": {}, + }, + config=config, + ) + messages = response["messages"] + edit_file_message = next( + message + for message in messages + if message.type == "tool" and message.name == "edit_file" + ) + assert edit_file_message is not None + assert store.get(("filesystem",), "/charmander.txt").value["content"] == [ + "The embers burns brightly. The embers burns hot." + ] + + def test_longterm_memory_multiple_tools(self): + checkpointer = MemorySaver() + store = InMemoryStore() + agent = create_agent( + model=ChatAnthropic(model="claude-3-5-sonnet-20240620"), + middleware=[ + FilesystemMiddleware( + use_longterm_memory=True, + ) + ], + checkpointer=checkpointer, + store=store, + ) + assert_longterm_mem_tools(agent, store) + + def test_longterm_memory_multiple_tools_deepagent(self): + checkpointer = MemorySaver() + store = InMemoryStore() + agent = create_deep_agent(use_longterm_memory=True, checkpointer=checkpointer, store=store) + assert_longterm_mem_tools(agent, store) + + def test_shortterm_memory_multiple_tools_deepagent(self): + checkpointer = MemorySaver() + store = InMemoryStore() + agent = create_deep_agent(use_longterm_memory=False, checkpointer=checkpointer, store=store) + assert_shortterm_mem_tools(agent) + + +# Take actions on multiple threads to test longterm memory +def assert_longterm_mem_tools(agent, store): + # Write a longterm memory file + config = {"configurable": {"thread_id": uuid.uuid4()}} + agent.invoke( + { + "messages": [ + HumanMessage( + content="Write a haiku about Charmander to longterm memory in /charmander.txt, use the word 'fiery'" + ) + ] + }, + config=config, + ) + namespaces = store.list_namespaces() + assert len(namespaces) == 1 + assert namespaces[0] == ("filesystem",) + file_item = store.get(("filesystem",), "/charmander.txt") + assert file_item is not None + assert file_item.key == "/charmander.txt" + + # Read the longterm memory file + config2 = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Read the haiku about Charmander from longterm memory at /charmander.txt" + ) + ] + }, + config=config2, + ) + messages = response["messages"] + read_file_message = next( + message for message in messages if message.type == "tool" and message.name == "read_file" + ) + assert "fiery" in read_file_message.content or "Fiery" in read_file_message.content + + # List all of the files in longterm memory + config3 = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + {"messages": [HumanMessage(content="List all of the files in longterm memory")]}, + config=config3, + ) + messages = response["messages"] + ls_message = next( + message for message in messages if message.type == "tool" and message.name == "ls" + ) + assert "/memories/charmander.txt" in ls_message.content + + # Edit the longterm memory file + config4 = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Edit the haiku about Charmander in longterm memory to use the word 'ember'" + ) + ] + }, + config=config4, + ) + file_item = store.get(("filesystem",), "/charmander.txt") + assert file_item is not None + assert file_item.key == "/charmander.txt" + assert any("ember" in c for c in file_item.value["content"]) or any( + "Ember" in c for c in file_item.value["content"] + ) + + # Read the longterm memory file + config5 = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Read the haiku about Charmander from longterm memory at /charmander.txt" + ) + ] + }, + config=config5, + ) + messages = response["messages"] + read_file_message = next( + message for message in messages if message.type == "tool" and message.name == "read_file" + ) + assert "ember" in read_file_message.content or "Ember" in read_file_message.content + + +def assert_shortterm_mem_tools(agent): + # Write a shortterm memory file + config = {"configurable": {"thread_id": uuid.uuid4()}} + response = agent.invoke( + { + "messages": [ + HumanMessage( + content="Write a haiku about Charmander to /charmander.txt, use the word 'fiery'" + ) + ] + }, + config=config, + ) + files = response["files"] + assert "/charmander.txt" in files + + # Read the shortterm memory file + response = agent.invoke( + { + "messages": [ + HumanMessage(content="Read the haiku about Charmander from /charmander.txt") + ] + }, + config=config, + ) + messages = response["messages"] + read_file_message = next( + message + for message in reversed(messages) + if message.type == "tool" and message.name == "read_file" + ) + assert "fiery" in read_file_message.content or "Fiery" in read_file_message.content + + # List all of the files in shortterm memory + response = agent.invoke( + {"messages": [HumanMessage(content="List all of the files in your filesystem")]}, + config=config, + ) + messages = response["messages"] + ls_message = next( + message for message in messages if message.type == "tool" and message.name == "ls" + ) + assert "/charmander.txt" in ls_message.content + + # Edit the shortterm memory file + response = agent.invoke( + { + "messages": [ + HumanMessage(content="Edit the haiku about Charmander to use the word 'ember'") + ] + }, + config=config, + ) + files = response["files"] + assert "/charmander.txt" in files + assert any("ember" in c for c in files["/charmander.txt"]["content"]) or any( + "Ember" in c for c in files["/charmander.txt"]["content"] + ) + + # Read the shortterm memory file + response = agent.invoke( + {"messages": [HumanMessage(content="Read the haiku about Charmander at /charmander.txt")]}, + config=config, + ) + messages = response["messages"] + read_file_message = next( + message + for message in reversed(messages) + if message.type == "tool" and message.name == "read_file" + ) + assert "ember" in read_file_message.content or "Ember" in read_file_message.content diff --git a/libs/langchain_v1/tests/integration_tests/agents/middleware/test_subagent_middleware.py b/libs/langchain_v1/tests/integration_tests/agents/middleware/test_subagent_middleware.py new file mode 100644 index 0000000000000..b3dfd2b41e8ab --- /dev/null +++ b/libs/langchain_v1/tests/integration_tests/agents/middleware/test_subagent_middleware.py @@ -0,0 +1,226 @@ +from langchain.agents.middleware.subagents import SubAgentMiddleware +from langchain.agents.middleware import AgentMiddleware +from langchain_core.tools import tool +from langchain.agents import create_agent +from langchain_core.messages import HumanMessage +import pytest + + +@tool +def get_weather(city: str) -> str: + """Get the weather in a city.""" + return f"The weather in {city} is sunny." + + +class WeatherMiddleware(AgentMiddleware): + tools = [get_weather] + + +def assert_expected_subgraph_actions(expected_tool_calls, agent, inputs): + current_idx = 0 + for update in agent.stream( + inputs, + subgraphs=True, + stream_mode="updates", + ): + if "model" in update[1]: + ai_message = update[1]["model"]["messages"][-1] + tool_calls = ai_message.tool_calls + for tool_call in tool_calls: + if tool_call["name"] == expected_tool_calls[current_idx]["name"]: + if "model" in expected_tool_calls[current_idx]: + assert ( + ai_message.response_metadata["model_name"] + == expected_tool_calls[current_idx]["model"] + ) + for arg in expected_tool_calls[current_idx]["args"]: + assert arg in tool_call["args"] + assert ( + tool_call["args"][arg] == expected_tool_calls[current_idx]["args"][arg] + ) + current_idx += 1 + assert current_idx == len(expected_tool_calls) + + +@pytest.mark.requires("langchain_anthropic", "langchain_openai") +class TestSubagentMiddleware: + """Integration tests for the SubagentMiddleware class.""" + + def test_general_purpose_subagent(self): + agent = create_agent( + model="claude-sonnet-4-20250514", + system_prompt="Use the general-purpose subagent to get the weather in a city.", + middleware=[ + SubAgentMiddleware( + default_subagent_model="claude-sonnet-4-20250514", + default_subagent_tools=[get_weather], + ) + ], + ) + assert "task" in agent.nodes["tools"].bound._tools_by_name.keys() + response = agent.invoke( + {"messages": [HumanMessage(content="What is the weather in Tokyo?")]} + ) + assert response["messages"][1].tool_calls[0]["name"] == "task" + assert response["messages"][1].tool_calls[0]["args"]["subagent_type"] == "general-purpose" + + def test_defined_subagent(self): + agent = create_agent( + model="claude-sonnet-4-20250514", + system_prompt="Use the task tool to call a subagent.", + middleware=[ + SubAgentMiddleware( + default_subagent_model="claude-sonnet-4-20250514", + default_subagent_tools=[], + subagents=[ + { + "name": "weather", + "description": "This subagent can get weather in cities.", + "system_prompt": "Use the get_weather tool to get the weather in a city.", + "tools": [get_weather], + } + ], + ) + ], + ) + assert "task" in agent.nodes["tools"].bound._tools_by_name.keys() + response = agent.invoke( + {"messages": [HumanMessage(content="What is the weather in Tokyo?")]} + ) + assert response["messages"][1].tool_calls[0]["name"] == "task" + assert response["messages"][1].tool_calls[0]["args"]["subagent_type"] == "weather" + + def test_defined_subagent_tool_calls(self): + agent = create_agent( + model="claude-sonnet-4-20250514", + system_prompt="Use the task tool to call a subagent.", + middleware=[ + SubAgentMiddleware( + default_subagent_model="claude-sonnet-4-20250514", + default_subagent_tools=[], + subagents=[ + { + "name": "weather", + "description": "This subagent can get weather in cities.", + "system_prompt": "Use the get_weather tool to get the weather in a city.", + "tools": [get_weather], + } + ], + ) + ], + ) + expected_tool_calls = [ + {"name": "task", "args": {"subagent_type": "weather"}}, + {"name": "get_weather", "args": {}}, + ] + assert_expected_subgraph_actions( + expected_tool_calls, + agent, + {"messages": [HumanMessage(content="What is the weather in Tokyo?")]}, + ) + + def test_defined_subagent_custom_model(self): + agent = create_agent( + model="claude-sonnet-4-20250514", + system_prompt="Use the task tool to call a subagent.", + middleware=[ + SubAgentMiddleware( + default_subagent_model="claude-sonnet-4-20250514", + default_subagent_tools=[], + subagents=[ + { + "name": "weather", + "description": "This subagent can get weather in cities.", + "system_prompt": "Use the get_weather tool to get the weather in a city.", + "tools": [get_weather], + "model": "gpt-4.1", + } + ], + ) + ], + ) + expected_tool_calls = [ + { + "name": "task", + "args": {"subagent_type": "weather"}, + "model": "claude-sonnet-4-20250514", + }, + {"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"}, + ] + assert_expected_subgraph_actions( + expected_tool_calls, + agent, + {"messages": [HumanMessage(content="What is the weather in Tokyo?")]}, + ) + + def test_defined_subagent_custom_middleware(self): + agent = create_agent( + model="claude-sonnet-4-20250514", + system_prompt="Use the task tool to call a subagent.", + middleware=[ + SubAgentMiddleware( + default_subagent_model="claude-sonnet-4-20250514", + default_subagent_tools=[], + subagents=[ + { + "name": "weather", + "description": "This subagent can get weather in cities.", + "system_prompt": "Use the get_weather tool to get the weather in a city.", + "tools": [], # No tools, only in middleware + "model": "gpt-4.1", + "middleware": [WeatherMiddleware()], + } + ], + ) + ], + ) + expected_tool_calls = [ + { + "name": "task", + "args": {"subagent_type": "weather"}, + "model": "claude-sonnet-4-20250514", + }, + {"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"}, + ] + assert_expected_subgraph_actions( + expected_tool_calls, + agent, + {"messages": [HumanMessage(content="What is the weather in Tokyo?")]}, + ) + + def test_defined_subagent_custom_runnable(self): + custom_subagent = create_agent( + model="gpt-4.1-2025-04-14", + system_prompt="Use the get_weather tool to get the weather in a city.", + tools=[get_weather], + ) + agent = create_agent( + model="claude-sonnet-4-20250514", + system_prompt="Use the task tool to call a subagent.", + middleware=[ + SubAgentMiddleware( + default_subagent_model="claude-sonnet-4-20250514", + default_subagent_tools=[], + subagents=[ + { + "name": "weather", + "description": "This subagent can get weather in cities.", + "runnable": custom_subagent, + } + ], + ) + ], + ) + expected_tool_calls = [ + { + "name": "task", + "args": {"subagent_type": "weather"}, + "model": "claude-sonnet-4-20250514", + }, + {"name": "get_weather", "args": {}, "model": "gpt-4.1-2025-04-14"}, + ] + assert_expected_subgraph_actions( + expected_tool_calls, + agent, + {"messages": [HumanMessage(content="What is the weather in Tokyo?")]}, + ) diff --git a/libs/langchain_v1/tests/integration_tests/agents/test_deepagents.py b/libs/langchain_v1/tests/integration_tests/agents/test_deepagents.py new file mode 100644 index 0000000000000..e25b80bd344cc --- /dev/null +++ b/libs/langchain_v1/tests/integration_tests/agents/test_deepagents.py @@ -0,0 +1,306 @@ +from langchain.agents.deepagents import create_deep_agent +from langchain.agents import create_agent +from langchain_core.tools import tool, InjectedToolCallId +from typing import Annotated +from langchain.tools.tool_node import InjectedState +from langchain.agents.middleware import AgentMiddleware, AgentState +from langgraph.types import Command +from langchain_core.messages import ToolMessage +import pytest + + +def assert_all_deepagent_qualities(agent): + assert "todos" in agent.stream_channels + assert "files" in agent.stream_channels + assert "write_todos" in agent.nodes["tools"].bound._tools_by_name.keys() + assert "ls" in agent.nodes["tools"].bound._tools_by_name.keys() + assert "read_file" in agent.nodes["tools"].bound._tools_by_name.keys() + assert "write_file" in agent.nodes["tools"].bound._tools_by_name.keys() + assert "edit_file" in agent.nodes["tools"].bound._tools_by_name.keys() + assert "task" in agent.nodes["tools"].bound._tools_by_name.keys() + + +SAMPLE_MODEL = "claude-3-5-sonnet-20240620" + + +@tool(description="Use this tool to get the weather") +def get_weather(location: str): + return f"The weather in {location} is sunny." + + +@tool(description="Use this tool to get the latest soccer scores") +def get_soccer_scores(team: str): + return f"The latest soccer scores for {team} are 2-1." + + +@tool(description="Sample tool") +def sample_tool(sample_input: str): + return sample_input + + +@tool(description="Sample tool with injected state") +def sample_tool_with_injected_state(sample_input: str, state: Annotated[dict, InjectedState]): + return sample_input + state["sample_input"] + + +TOY_BASKETBALL_RESEARCH = "Lebron James is the best basketball player of all time with over 40k points and 21 seasons in the NBA." + + +@tool(description="Use this tool to conduct research into basketball and save it to state") +def research_basketball( + topic: str, + state: Annotated[dict, InjectedState], + tool_call_id: Annotated[str, InjectedToolCallId], +): + current_research = state.get("research", "") + research = f"{current_research}\n\nResearching on {topic}... Done! {TOY_BASKETBALL_RESEARCH}" + return Command( + update={ + "research": research, + "messages": [ToolMessage(research, tool_call_id=tool_call_id)], + } + ) + + +class ResearchState(AgentState): + research: str + + +class ResearchMiddlewareWithTools(AgentMiddleware): + state_schema = ResearchState + tools = [research_basketball] + + +class ResearchMiddleware(AgentMiddleware): + state_schema = ResearchState + + +class SampleMiddlewareWithTools(AgentMiddleware): + tools = [sample_tool] + + +class SampleState(AgentState): + sample_input: str + + +class SampleMiddlewareWithToolsAndState(AgentMiddleware): + state_schema = SampleState + tools = [sample_tool] + + +class WeatherToolMiddleware(AgentMiddleware): + tools = [get_weather] + + +@pytest.mark.requires("langchain_anthropic") +class TestDeepAgentsFilesystem: + def test_base_deep_agent(self): + agent = create_deep_agent() + assert_all_deepagent_qualities(agent) + + def test_deep_agent_with_tool(self): + agent = create_deep_agent(tools=[sample_tool]) + assert_all_deepagent_qualities(agent) + assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys() + + def test_deep_agent_with_middleware_with_tool(self): + agent = create_deep_agent(middleware=[SampleMiddlewareWithTools()]) + assert_all_deepagent_qualities(agent) + assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys() + + def test_deep_agent_with_middleware_with_tool_and_state(self): + agent = create_deep_agent(middleware=[SampleMiddlewareWithToolsAndState()]) + assert_all_deepagent_qualities(agent) + assert "sample_tool" in agent.nodes["tools"].bound._tools_by_name.keys() + assert "sample_input" in agent.stream_channels + + def test_deep_agent_with_subagents(self): + subagents = [ + { + "name": "weather_agent", + "description": "Use this agent to get the weather", + "system_prompt": "You are a weather agent.", + "tools": [get_weather], + "model": SAMPLE_MODEL, + } + ] + agent = create_deep_agent(tools=[sample_tool], subagents=subagents) + assert_all_deepagent_qualities(agent) + result = agent.invoke( + {"messages": [{"role": "user", "content": "What is the weather in Tokyo?"}]} + ) + agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"] + tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls] + assert any( + [ + tool_call["name"] == "task" + and tool_call["args"].get("subagent_type") == "weather_agent" + for tool_call in tool_calls + ] + ) + + def test_deep_agent_with_subagents_gen_purpose(self): + subagents = [ + { + "name": "weather_agent", + "description": "Use this agent to get the weather", + "system_prompt": "You are a weather agent.", + "tools": [get_weather], + "model": SAMPLE_MODEL, + } + ] + agent = create_deep_agent(tools=[sample_tool], subagents=subagents) + assert_all_deepagent_qualities(agent) + result = agent.invoke( + { + "messages": [ + { + "role": "user", + "content": "Use the general purpose subagent to call the sample tool", + } + ] + } + ) + agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"] + tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls] + assert any( + [ + tool_call["name"] == "task" + and tool_call["args"].get("subagent_type") == "general-purpose" + for tool_call in tool_calls + ] + ) + + def test_deep_agent_with_subagents_with_middleware(self): + subagents = [ + { + "name": "weather_agent", + "description": "Use this agent to get the weather", + "system_prompt": "You are a weather agent.", + "tools": [], + "model": SAMPLE_MODEL, + "middleware": [WeatherToolMiddleware()], + } + ] + agent = create_deep_agent(tools=[sample_tool], subagents=subagents) + assert_all_deepagent_qualities(agent) + result = agent.invoke( + {"messages": [{"role": "user", "content": "What is the weather in Tokyo?"}]} + ) + agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"] + tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls] + assert any( + [ + tool_call["name"] == "task" + and tool_call["args"].get("subagent_type") == "weather_agent" + for tool_call in tool_calls + ] + ) + + def test_deep_agent_with_custom_subagents(self): + subagents = [ + { + "name": "weather_agent", + "description": "Use this agent to get the weather", + "system_prompt": "You are a weather agent.", + "tools": [get_weather], + "model": SAMPLE_MODEL, + }, + { + "name": "soccer_agent", + "description": "Use this agent to get the latest soccer scores", + "runnable": create_agent( + model=SAMPLE_MODEL, + tools=[get_soccer_scores], + system_prompt="You are a soccer agent.", + ), + }, + ] + agent = create_deep_agent(tools=[sample_tool], subagents=subagents) + assert_all_deepagent_qualities(agent) + result = agent.invoke( + { + "messages": [ + { + "role": "user", + "content": "Look up the weather in Tokyo, and the latest scores for Manchester City!", + } + ] + } + ) + agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"] + tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls] + assert any( + [ + tool_call["name"] == "task" + and tool_call["args"].get("subagent_type") == "weather_agent" + for tool_call in tool_calls + ] + ) + assert any( + [ + tool_call["name"] == "task" + and tool_call["args"].get("subagent_type") == "soccer_agent" + for tool_call in tool_calls + ] + ) + + def test_deep_agent_with_extended_state_and_subagents(self): + subagents = [ + { + "name": "basketball_info_agent", + "description": "Use this agent to get surface level info on any basketball topic", + "system_prompt": "You are a basketball info agent.", + "middleware": [ResearchMiddlewareWithTools()], + } + ] + agent = create_deep_agent( + tools=[sample_tool], subagents=subagents, middleware=[ResearchMiddleware()] + ) + assert_all_deepagent_qualities(agent) + assert "research" in agent.stream_channels + result = agent.invoke( + {"messages": [{"role": "user", "content": "Get surface level info on lebron james"}]}, + config={"recursion_limit": 100}, + ) + agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"] + tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls] + assert any( + [ + tool_call["name"] == "task" + and tool_call["args"].get("subagent_type") == "basketball_info_agent" + for tool_call in tool_calls + ] + ) + assert TOY_BASKETBALL_RESEARCH in result["research"] + + def test_deep_agent_with_subagents_no_tools(self): + subagents = [ + { + "name": "basketball_info_agent", + "description": "Use this agent to get surface level info on any basketball topic", + "system_prompt": "You are a basketball info agent.", + } + ] + agent = create_deep_agent(tools=[sample_tool], subagents=subagents) + assert_all_deepagent_qualities(agent) + result = agent.invoke( + { + "messages": [ + { + "role": "user", + "content": "Use the basketball info subagent to call the sample tool", + } + ] + }, + config={"recursion_limit": 100}, + ) + agent_messages = [msg for msg in result.get("messages", []) if msg.type == "ai"] + tool_calls = [tool_call for msg in agent_messages for tool_call in msg.tool_calls] + assert any( + [ + tool_call["name"] == "task" + and tool_call["args"].get("subagent_type") == "basketball_info_agent" + for tool_call in tool_calls + ] + ) diff --git a/libs/langchain_v1/tests/unit_tests/agents/middleware/test_anthropic_tools.py b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_anthropic_tools.py new file mode 100644 index 0000000000000..7e871c69aad36 --- /dev/null +++ b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_anthropic_tools.py @@ -0,0 +1,276 @@ +"""Unit tests for Anthropic text editor and memory tool middleware.""" + +import pytest +from langchain.agents.middleware.anthropic_tools import ( + AnthropicToolsState, + StateClaudeMemoryMiddleware, + StateClaudeTextEditorMiddleware, +) +from langchain.agents._internal.file_utils import validate_path +from langchain_core.messages import ToolMessage +from langgraph.types import Command + + +class TestPathValidation: + """Test path validation and security.""" + + def test_basic_path_normalization(self) -> None: + """Test basic path normalization.""" + assert validate_path("/foo/bar") == "/foo/bar" + assert validate_path("foo/bar") == "/foo/bar" + assert validate_path("/foo//bar") == "/foo/bar" + assert validate_path("/foo/./bar") == "/foo/bar" + + def test_path_traversal_blocked(self) -> None: + """Test that path traversal attempts are blocked.""" + with pytest.raises(ValueError, match="Path traversal not allowed"): + validate_path("/foo/../etc/passwd") + + with pytest.raises(ValueError, match="Path traversal not allowed"): + validate_path("../etc/passwd") + + with pytest.raises(ValueError, match="Path traversal not allowed"): + validate_path("~/.ssh/id_rsa") + + def test_allowed_prefixes(self) -> None: + """Test path prefix validation.""" + # Should pass + assert ( + validate_path("/workspace/file.txt", allowed_prefixes=["/workspace"]) + == "/workspace/file.txt" + ) + + # Should fail + with pytest.raises(ValueError, match="Path must start with"): + validate_path("/etc/passwd", allowed_prefixes=["/workspace"]) + + with pytest.raises(ValueError, match="Path must start with"): + validate_path("/workspacemalicious/file.txt", allowed_prefixes=["/workspace/"]) + + def test_memories_prefix(self) -> None: + """Test /memories prefix validation for memory tools.""" + assert ( + validate_path("/memories/notes.txt", allowed_prefixes=["/memories"]) + == "/memories/notes.txt" + ) + + with pytest.raises(ValueError, match="Path must start with"): + validate_path("/other/notes.txt", allowed_prefixes=["/memories"]) + + +class TestTextEditorMiddleware: + """Test text editor middleware functionality.""" + + def test_middleware_initialization(self) -> None: + """Test middleware initializes correctly.""" + middleware = StateClaudeTextEditorMiddleware() + assert middleware.state_schema == AnthropicToolsState + assert middleware.tool_type == "text_editor_20250728" + assert middleware.tool_name == "str_replace_based_edit_tool" + assert middleware.state_key == "text_editor_files" + + # With path restrictions + middleware = StateClaudeTextEditorMiddleware(allowed_path_prefixes=["/workspace"]) + assert middleware.allowed_prefixes == ["/workspace"] + + +class TestMemoryMiddleware: + """Test memory middleware functionality.""" + + def test_middleware_initialization(self) -> None: + """Test middleware initializes correctly.""" + middleware = StateClaudeMemoryMiddleware() + assert middleware.state_schema == AnthropicToolsState + assert middleware.tool_type == "memory_20250818" + assert middleware.tool_name == "memory" + assert middleware.state_key == "memory_files" + assert middleware.system_prompt # Should have default prompt + + def test_custom_system_prompt(self) -> None: + """Test custom system prompt can be set.""" + custom_prompt = "Custom memory instructions" + middleware = StateClaudeMemoryMiddleware(system_prompt=custom_prompt) + assert middleware.system_prompt == custom_prompt + + +class TestFileOperations: + """Test file operation implementations via wrap_tool_call.""" + + def test_view_operation(self) -> None: + """Test view command execution.""" + middleware = StateClaudeTextEditorMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/test.txt": { + "content": ["line1", "line2", "line3"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + } + }, + } + + args = {"command": "view", "path": "/test.txt"} + result = middleware._handle_view(args, state, "test_id") + + assert isinstance(result, Command) + assert result.update is not None + messages = result.update.get("messages", []) + assert len(messages) == 1 + assert isinstance(messages[0], ToolMessage) + assert messages[0].content == "1|line1\n2|line2\n3|line3" + assert messages[0].tool_call_id == "test_id" + + def test_create_operation(self) -> None: + """Test create command execution.""" + middleware = StateClaudeTextEditorMiddleware() + + state: AnthropicToolsState = {"messages": []} + + args = {"command": "create", "path": "/test.txt", "file_text": "line1\nline2"} + result = middleware._handle_create(args, state, "test_id") + + assert isinstance(result, Command) + assert result.update is not None + files = result.update.get("text_editor_files", {}) + assert "/test.txt" in files + assert files["/test.txt"]["content"] == ["line1", "line2"] + + def test_path_prefix_enforcement(self) -> None: + """Test that path prefixes are enforced.""" + middleware = StateClaudeTextEditorMiddleware(allowed_path_prefixes=["/workspace"]) + + state: AnthropicToolsState = {"messages": []} + + # Should fail with /etc/passwd + args = {"command": "create", "path": "/etc/passwd", "file_text": "test"} + + try: + middleware._handle_create(args, state, "test_id") + assert False, "Should have raised ValueError" + except ValueError as e: + assert "Path must start with" in str(e) + + def test_memories_prefix_enforcement(self) -> None: + """Test that /memories prefix is enforced for memory middleware.""" + middleware = StateClaudeMemoryMiddleware() + + state: AnthropicToolsState = {"messages": []} + + # Should fail with /other/path + args = {"command": "create", "path": "/other/path.txt", "file_text": "test"} + + try: + middleware._handle_create(args, state, "test_id") + assert False, "Should have raised ValueError" + except ValueError as e: + assert "/memories" in str(e) + + def test_str_replace_operation(self) -> None: + """Test str_replace command execution.""" + middleware = StateClaudeTextEditorMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/test.txt": { + "content": ["Hello world", "Goodbye world"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + } + }, + } + + args = { + "command": "str_replace", + "path": "/test.txt", + "old_str": "world", + "new_str": "universe", + } + result = middleware._handle_str_replace(args, state, "test_id") + + assert isinstance(result, Command) + files = result.update.get("text_editor_files", {}) + # Should only replace first occurrence + assert files["/test.txt"]["content"] == ["Hello universe", "Goodbye world"] + + def test_insert_operation(self) -> None: + """Test insert command execution.""" + middleware = StateClaudeTextEditorMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/test.txt": { + "content": ["line1", "line2"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + } + }, + } + + args = { + "command": "insert", + "path": "/test.txt", + "insert_line": 0, + "new_str": "inserted", + } + result = middleware._handle_insert(args, state, "test_id") + + assert isinstance(result, Command) + files = result.update.get("text_editor_files", {}) + assert files["/test.txt"]["content"] == ["inserted", "line1", "line2"] + + def test_delete_operation(self) -> None: + """Test delete command execution (memory only).""" + middleware = StateClaudeMemoryMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "memory_files": { + "/memories/test.txt": { + "content": ["line1"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + } + }, + } + + args = {"command": "delete", "path": "/memories/test.txt"} + result = middleware._handle_delete(args, state, "test_id") + + assert isinstance(result, Command) + files = result.update.get("memory_files", {}) + # Deleted files are marked as None in state + assert files.get("/memories/test.txt") is None + + def test_rename_operation(self) -> None: + """Test rename command execution (memory only).""" + middleware = StateClaudeMemoryMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "memory_files": { + "/memories/old.txt": { + "content": ["line1"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + } + }, + } + + args = { + "command": "rename", + "old_path": "/memories/old.txt", + "new_path": "/memories/new.txt", + } + result = middleware._handle_rename(args, state, "test_id") + + assert isinstance(result, Command) + files = result.update.get("memory_files", {}) + # Old path is marked as None (deleted) + assert files.get("/memories/old.txt") is None + # New path has the file data + assert files.get("/memories/new.txt") is not None + assert files["/memories/new.txt"]["content"] == ["line1"] diff --git a/libs/langchain_v1/tests/unit_tests/agents/middleware/test_file_search.py b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_file_search.py new file mode 100644 index 0000000000000..c659ca7840be5 --- /dev/null +++ b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_file_search.py @@ -0,0 +1,461 @@ +"""Unit tests for file search middleware.""" + +import pytest +from langchain.agents.middleware.anthropic_tools import AnthropicToolsState +from langchain.agents.middleware.file_search import StateFileSearchMiddleware +from langchain_core.messages import ToolMessage + + +class TestSearchMiddlewareInitialization: + """Test search middleware initialization.""" + + def test_middleware_initialization(self) -> None: + """Test middleware initializes correctly.""" + middleware = StateFileSearchMiddleware() + assert middleware.state_schema == AnthropicToolsState + assert middleware.state_key == "text_editor_files" + + def test_custom_state_key(self) -> None: + """Test middleware with custom state key.""" + middleware = StateFileSearchMiddleware(state_key="memory_files") + assert middleware.state_key == "memory_files" + + +class TestGlobSearch: + """Test Glob file pattern matching.""" + + def test_glob_basic_pattern(self) -> None: + """Test basic glob pattern matching.""" + middleware = StateFileSearchMiddleware() + + test_state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["print('hello')"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/src/utils.py": { + "content": ["def helper(): pass"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/README.md": { + "content": ["# Project"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + # Call tool function directly (state is injected in real usage) + result = middleware.glob_search.func(pattern="*.py", state=test_state) + + assert isinstance(result, str) + assert "/src/main.py" in result + assert "/src/utils.py" in result + assert "/README.md" not in result + + def test_glob_recursive_pattern(self) -> None: + """Test recursive glob pattern matching.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/src/utils/helper.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/tests/test_main.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.glob_search.func(pattern="**/*.py", state=state) + + assert isinstance(result, str) + lines = result.split("\n") + assert len(lines) == 3 + assert all(".py" in line for line in lines) + + def test_glob_with_base_path(self) -> None: + """Test glob with base path restriction.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/tests/test.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.glob_search.func(pattern="**/*.py", path="/src", state=state) + + assert isinstance(result, str) + assert "/src/main.py" in result + assert "/tests/test.py" not in result + + def test_glob_no_matches(self) -> None: + """Test glob with no matching files.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.glob_search.func(pattern="*.ts", state=state) + + assert isinstance(result, str) + assert result == "No files found" + + def test_glob_sorts_by_modified_time(self) -> None: + """Test that glob results are sorted by modification time.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/old.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/new.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-02T00:00:00", + }, + }, + } + + result = middleware.glob_search.func(pattern="*.py", state=state) + + lines = result.split("\n") + # Most recent first + assert lines[0] == "/new.py" + assert lines[1] == "/old.py" + + +class TestGrepSearch: + """Test Grep content search.""" + + def test_grep_files_with_matches_mode(self) -> None: + """Test grep with files_with_matches output mode.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["def foo():", " pass"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/src/utils.py": { + "content": ["def bar():", " return None"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/README.md": { + "content": ["# Documentation", "No code here"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func(pattern=r"def \w+\(\):", state=state) + + assert isinstance(result, str) + assert "/src/main.py" in result + assert "/src/utils.py" in result + assert "/README.md" not in result + # Should only have file paths, not line content + assert "def foo():" not in result + + def test_grep_content_mode(self) -> None: + """Test grep with content output mode.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["def foo():", " pass", "def bar():"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func( + pattern=r"def \w+\(\):", output_mode="content", state=state + ) + + assert isinstance(result, str) + lines = result.split("\n") + assert len(lines) == 2 + assert lines[0] == "/src/main.py:1:def foo():" + assert lines[1] == "/src/main.py:3:def bar():" + + def test_grep_count_mode(self) -> None: + """Test grep with count output mode.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["TODO: fix this", "print('hello')", "TODO: add tests"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/src/utils.py": { + "content": ["TODO: implement"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func(pattern=r"TODO", output_mode="count", state=state) + + assert isinstance(result, str) + lines = result.split("\n") + assert "/src/main.py:2" in lines + assert "/src/utils.py:1" in lines + + def test_grep_with_include_filter(self) -> None: + """Test grep with include file pattern filter.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["import os"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/src/main.ts": { + "content": ["import os from 'os'"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func(pattern="import", include="*.py", state=state) + + assert isinstance(result, str) + assert "/src/main.py" in result + assert "/src/main.ts" not in result + + def test_grep_with_brace_expansion_filter(self) -> None: + """Test grep with brace expansion in include filter.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.ts": { + "content": ["const x = 1"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/src/App.tsx": { + "content": ["const y = 2"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/src/main.py": { + "content": ["z = 3"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func(pattern="const", include="*.{ts,tsx}", state=state) + + assert isinstance(result, str) + assert "/src/main.ts" in result + assert "/src/App.tsx" in result + assert "/src/main.py" not in result + + def test_grep_with_base_path(self) -> None: + """Test grep with base path restriction.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["import foo"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + "/tests/test.py": { + "content": ["import foo"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func(pattern="import", path="/src", state=state) + + assert isinstance(result, str) + assert "/src/main.py" in result + assert "/tests/test.py" not in result + + def test_grep_no_matches(self) -> None: + """Test grep with no matching content.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["print('hello')"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func(pattern=r"TODO", state=state) + + assert isinstance(result, str) + assert result == "No matches found" + + def test_grep_invalid_regex(self) -> None: + """Test grep with invalid regex pattern.""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": {}, + } + + result = middleware.grep_search.func(pattern=r"[unclosed", state=state) + + assert isinstance(result, str) + assert "Invalid regex pattern" in result + + +class TestSearchWithDifferentBackends: + """Test searching with different backend configurations.""" + + def test_glob_default_backend(self) -> None: + """Test that glob searches the default backend (text_editor_files).""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + "memory_files": { + "/memories/notes.txt": { + "content": [], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.glob_search.func(pattern="**/*", state=state) + + assert isinstance(result, str) + assert "/src/main.py" in result + # Should NOT find memory_files since default backend is text_editor_files + assert "/memories/notes.txt" not in result + + def test_grep_default_backend(self) -> None: + """Test that grep searches the default backend (text_editor_files).""" + middleware = StateFileSearchMiddleware() + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["TODO: implement"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + "memory_files": { + "/memories/tasks.txt": { + "content": ["TODO: review"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func(pattern=r"TODO", state=state) + + assert isinstance(result, str) + assert "/src/main.py" in result + # Should NOT find memory_files since default backend is text_editor_files + assert "/memories/tasks.txt" not in result + + def test_search_with_single_store(self) -> None: + """Test searching with a specific state key.""" + middleware = StateFileSearchMiddleware(state_key="text_editor_files") + + state: AnthropicToolsState = { + "messages": [], + "text_editor_files": { + "/src/main.py": { + "content": ["code"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + "memory_files": { + "/memories/notes.txt": { + "content": ["notes"], + "created_at": "2025-01-01T00:00:00", + "modified_at": "2025-01-01T00:00:00", + }, + }, + } + + result = middleware.grep_search.func(pattern=r".*", state=state) + + assert isinstance(result, str) + assert "/src/main.py" in result + assert "/memories/notes.txt" not in result diff --git a/libs/langchain_v1/tests/unit_tests/agents/middleware/test_filesystem_middleware.py b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_filesystem_middleware.py new file mode 100644 index 0000000000000..b958c39928cd3 --- /dev/null +++ b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_filesystem_middleware.py @@ -0,0 +1,114 @@ +from langchain.agents.middleware.filesystem import ( + FileData, + FilesystemState, + FilesystemMiddleware, + FILESYSTEM_SYSTEM_PROMPT, + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT, +) +from langgraph.store.memory import InMemoryStore +from langgraph.runtime import Runtime + + +class TestFilesystem: + def test_init_local(self): + middleware = FilesystemMiddleware(use_longterm_memory=False) + assert middleware.use_longterm_memory is False + assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT + assert len(middleware.tools) == 4 + + def test_init_longterm(self): + middleware = FilesystemMiddleware(use_longterm_memory=True) + assert middleware.use_longterm_memory is True + assert middleware.system_prompt_extension == ( + FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT + ) + assert len(middleware.tools) == 4 + + def test_init_custom_system_prompt_shortterm(self): + middleware = FilesystemMiddleware( + use_longterm_memory=False, system_prompt_extension="Custom system prompt" + ) + assert middleware.use_longterm_memory is False + assert middleware.system_prompt_extension == "Custom system prompt" + assert len(middleware.tools) == 4 + + def test_init_custom_system_prompt_longterm(self): + middleware = FilesystemMiddleware( + use_longterm_memory=True, system_prompt_extension="Custom system prompt" + ) + assert middleware.use_longterm_memory is True + assert middleware.system_prompt_extension == "Custom system prompt" + assert len(middleware.tools) == 4 + + def test_init_custom_tool_descriptions_shortterm(self): + middleware = FilesystemMiddleware( + use_longterm_memory=False, custom_tool_descriptions={"ls": "Custom ls tool description"} + ) + assert middleware.use_longterm_memory is False + assert middleware.system_prompt_extension == FILESYSTEM_SYSTEM_PROMPT + ls_tool = next(tool for tool in middleware.tools if tool.name == "ls") + assert ls_tool.description == "Custom ls tool description" + + def test_init_custom_tool_descriptions_longterm(self): + middleware = FilesystemMiddleware( + use_longterm_memory=True, custom_tool_descriptions={"ls": "Custom ls tool description"} + ) + assert middleware.use_longterm_memory is True + assert middleware.system_prompt_extension == ( + FILESYSTEM_SYSTEM_PROMPT + FILESYSTEM_SYSTEM_PROMPT_LONGTERM_SUPPLEMENT + ) + ls_tool = next(tool for tool in middleware.tools if tool.name == "ls") + assert ls_tool.description == "Custom ls tool description" + + def test_ls_shortterm(self): + state = FilesystemState( + messages=[], + files={ + "test.txt": FileData( + content=["Hello world"], + modified_at="2021-01-01", + created_at="2021-01-01", + ), + "test2.txt": FileData( + content=["Goodbye world"], + modified_at="2021-01-01", + created_at="2021-01-01", + ), + }, + ) + middleware = FilesystemMiddleware(use_longterm_memory=False) + ls_tool = next(tool for tool in middleware.tools if tool.name == "ls") + result = ls_tool.invoke({"state": state}) + assert result == ["test.txt", "test2.txt"] + + def test_ls_shortterm_with_path(self): + state = FilesystemState( + messages=[], + files={ + "/test.txt": FileData( + content=["Hello world"], + modified_at="2021-01-01", + created_at="2021-01-01", + ), + "/pokemon/test2.txt": FileData( + content=["Goodbye world"], + modified_at="2021-01-01", + created_at="2021-01-01", + ), + "/pokemon/charmander.txt": FileData( + content=["Ember"], + modified_at="2021-01-01", + created_at="2021-01-01", + ), + "/pokemon/water/squirtle.txt": FileData( + content=["Water"], + modified_at="2021-01-01", + created_at="2021-01-01", + ), + }, + ) + middleware = FilesystemMiddleware(use_longterm_memory=False) + ls_tool = next(tool for tool in middleware.tools if tool.name == "ls") + result = ls_tool.invoke({"state": state, "path": "pokemon/"}) + assert "/pokemon/test2.txt" in result + assert "/pokemon/charmander.txt" in result diff --git a/libs/langchain_v1/tests/unit_tests/agents/middleware/test_subagent_middleware.py b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_subagent_middleware.py new file mode 100644 index 0000000000000..d7c721a2c9a48 --- /dev/null +++ b/libs/langchain_v1/tests/unit_tests/agents/middleware/test_subagent_middleware.py @@ -0,0 +1,35 @@ +from langchain.agents.middleware.subagents import TASK_TOOL_DESCRIPTION, SubAgentMiddleware + +import pytest + + +@pytest.mark.requires("langchain_openai") +class TestSubagentMiddleware: + """Test the SubagentMiddleware class.""" + + def test_subagent_middleware_init(self): + middleware = SubAgentMiddleware( + default_subagent_model="gpt-4o-mini", + ) + assert middleware is not None + assert middleware.system_prompt_extension == None + assert len(middleware.tools) == 1 + assert middleware.tools[0].name == "task" + assert middleware.tools[0].description == TASK_TOOL_DESCRIPTION.format(other_agents="") + + def test_default_subagent_with_tools(self): + middleware = SubAgentMiddleware( + default_subagent_model="gpt-4o-mini", + default_subagent_tools=[], + ) + assert middleware is not None + assert middleware.system_prompt_extension == None + + def test_default_subagent_custom_system_prompt_extension(self): + middleware = SubAgentMiddleware( + default_subagent_model="gpt-4o-mini", + default_subagent_tools=[], + system_prompt_extension="Use the task tool to call a subagent.", + ) + assert middleware is not None + assert middleware.system_prompt_extension == "Use the task tool to call a subagent." diff --git a/libs/langchain_v1/uv.lock b/libs/langchain_v1/uv.lock index 0242907e69f87..0243d6152e287 100644 --- a/libs/langchain_v1/uv.lock +++ b/libs/langchain_v1/uv.lock @@ -1607,6 +1607,7 @@ lint = [ { name = "ruff" }, ] test = [ + { name = "langchain-anthropic" }, { name = "langchain-openai" }, { name = "langchain-tests" }, { name = "langchain-text-splitters" }, @@ -1659,6 +1660,7 @@ provides-extras = ["community", "anthropic", "openai", "google-vertexai", "googl [package.metadata.requires-dev] lint = [{ name = "ruff", specifier = ">=0.12.2,<0.13.0" }] test = [ + { name = "langchain-anthropic" }, { name = "langchain-openai", editable = "../partners/openai" }, { name = "langchain-tests", editable = "../standard-tests" }, { name = "langchain-text-splitters", editable = "../text-splitters" },