diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 0c05abbb69..8fa85b7f84 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -2,11 +2,14 @@ import logging from dataclasses import dataclass -from typing import Any +from typing import Any, cast + +from agent_framework import FunctionApprovalRequestContent, FunctionApprovalResponseContent from .._agents import AgentProtocol, ChatAgent from .._threads import AgentThread from .._types import AgentRunResponse, AgentRunResponseUpdate, ChatMessage +from ._checkpoint_encoding import decode_checkpoint_value, encode_checkpoint_value from ._conversation_state import encode_chat_messages from ._events import ( AgentRunEvent, @@ -14,6 +17,7 @@ ) from ._executor import Executor, handler from ._message_utils import normalize_messages_input +from ._request_info_mixin import response_handler from ._workflow_context import WorkflowContext logger = logging.getLogger(__name__) @@ -83,6 +87,8 @@ def __init__( super().__init__(exec_id) self._agent = agent self._agent_thread = agent_thread or self._agent.get_new_thread() + self._pending_agent_requests: dict[str, FunctionApprovalRequestContent] = {} + self._pending_responses_to_agent: list[FunctionApprovalResponseContent] = [] self._output_response = output_response self._cache: list[ChatMessage] = [] @@ -93,50 +99,6 @@ def workflow_output_types(self) -> list[type[Any]]: return [AgentRunResponse] return [] - async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]) -> None: - """Execute the underlying agent, emit events, and enqueue response. - - Checks ctx.is_streaming() to determine whether to emit incremental AgentRunUpdateEvent - events (streaming mode) or a single AgentRunEvent (non-streaming mode). - """ - if ctx.is_streaming(): - # Streaming mode: emit incremental updates - updates: list[AgentRunResponseUpdate] = [] - async for update in self._agent.run_stream( - self._cache, - thread=self._agent_thread, - ): - updates.append(update) - await ctx.add_event(AgentRunUpdateEvent(self.id, update)) - - if isinstance(self._agent, ChatAgent): - response_format = self._agent.chat_options.response_format - response = AgentRunResponse.from_agent_run_response_updates( - updates, - output_format_type=response_format, - ) - else: - response = AgentRunResponse.from_agent_run_response_updates(updates) - else: - # Non-streaming mode: use run() and emit single event - response = await self._agent.run( - self._cache, - thread=self._agent_thread, - ) - await ctx.add_event(AgentRunEvent(self.id, response)) - - if self._output_response: - await ctx.yield_output(response) - - # Always construct a full conversation snapshot from inputs (cache) - # plus agent outputs (agent_run_response.messages). Do not mutate - # response.messages so AgentRunEvent remains faithful to the raw output. - full_conversation: list[ChatMessage] = list(self._cache) + list(response.messages) - - agent_response = AgentExecutorResponse(self.id, response, full_conversation=full_conversation) - await ctx.send_message(agent_response) - self._cache.clear() - @handler async def run( self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse] @@ -192,6 +154,31 @@ async def from_messages( self._cache = normalize_messages_input(messages) await self._run_agent_and_emit(ctx) + @response_handler + async def handle_user_input_response( + self, + original_request: FunctionApprovalRequestContent, + response: FunctionApprovalResponseContent, + ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse], + ) -> None: + """Handle user input responses for function approvals during agent execution. + + This will hold the executor's execution until all pending user input requests are resolved. + + Args: + original_request: The original function approval request sent by the agent. + response: The user's response to the function approval request. + ctx: The workflow context for emitting events and outputs. + """ + self._pending_responses_to_agent.append(response) + self._pending_agent_requests.pop(original_request.id, None) + + if not self._pending_agent_requests: + # All pending requests have been resolved; resume agent execution + self._cache = normalize_messages_input(ChatMessage(role="user", contents=self._pending_responses_to_agent)) + self._pending_responses_to_agent.clear() + await self._run_agent_and_emit(ctx) + async def snapshot_state(self) -> dict[str, Any]: """Capture current executor state for checkpointing. @@ -226,6 +213,8 @@ async def snapshot_state(self) -> dict[str, Any]: return { "cache": encode_chat_messages(self._cache), "agent_thread": serialized_thread, + "pending_agent_requests": encode_checkpoint_value(self._pending_agent_requests), + "pending_responses_to_agent": encode_checkpoint_value(self._pending_responses_to_agent), } async def restore_state(self, state: dict[str, Any]) -> None: @@ -258,7 +247,109 @@ async def restore_state(self, state: dict[str, Any]) -> None: else: self._agent_thread = self._agent.get_new_thread() + pending_requests_payload = state.get("pending_agent_requests") + if pending_requests_payload: + self._pending_agent_requests = decode_checkpoint_value(pending_requests_payload) + + pending_responses_payload = state.get("pending_responses_to_agent") + if pending_responses_payload: + self._pending_responses_to_agent = decode_checkpoint_value(pending_responses_payload) + def reset(self) -> None: """Reset the internal cache of the executor.""" logger.debug("AgentExecutor %s: Resetting cache", self.id) self._cache.clear() + + async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]) -> None: + """Execute the underlying agent, emit events, and enqueue response. + + Checks ctx.is_streaming() to determine whether to emit incremental AgentRunUpdateEvent + events (streaming mode) or a single AgentRunEvent (non-streaming mode). + """ + if ctx.is_streaming(): + # Streaming mode: emit incremental updates + response = await self._run_agent_streaming(cast(WorkflowContext, ctx)) + else: + # Non-streaming mode: use run() and emit single event + response = await self._run_agent(cast(WorkflowContext, ctx)) + + if response is None: + # Agent did not complete (e.g., waiting for user input); do not emit response + logger.info("AgentExecutor %s: Agent did not complete, awaiting user input", self.id) + return + + if self._output_response: + await ctx.yield_output(response) + + # Always construct a full conversation snapshot from inputs (cache) + # plus agent outputs (agent_run_response.messages). Do not mutate + # response.messages so AgentRunEvent remains faithful to the raw output. + full_conversation: list[ChatMessage] = list(self._cache) + list(response.messages) + + agent_response = AgentExecutorResponse(self.id, response, full_conversation=full_conversation) + await ctx.send_message(agent_response) + self._cache.clear() + + async def _run_agent(self, ctx: WorkflowContext) -> AgentRunResponse | None: + """Execute the underlying agent in non-streaming mode. + + Args: + ctx: The workflow context for emitting events. + + Returns: + The complete AgentRunResponse, or None if waiting for user input. + """ + response = await self._agent.run( + self._cache, + thread=self._agent_thread, + ) + await ctx.add_event(AgentRunEvent(self.id, response)) + + # Handle any user input requests + if response.user_input_requests: + for user_input_request in response.user_input_requests: + self._pending_agent_requests[user_input_request.id] = user_input_request + await ctx.request_info(user_input_request, FunctionApprovalResponseContent) + return None + + return response + + async def _run_agent_streaming(self, ctx: WorkflowContext) -> AgentRunResponse | None: + """Execute the underlying agent in streaming mode and collect the full response. + + Args: + ctx: The workflow context for emitting events. + + Returns: + The complete AgentRunResponse, or None if waiting for user input. + """ + updates: list[AgentRunResponseUpdate] = [] + user_input_requests: list[FunctionApprovalRequestContent] = [] + async for update in self._agent.run_stream( + self._cache, + thread=self._agent_thread, + ): + updates.append(update) + await ctx.add_event(AgentRunUpdateEvent(self.id, update)) + + if update.user_input_requests: + user_input_requests.extend(update.user_input_requests) + + # Build the final AgentRunResponse from the collected updates + if isinstance(self._agent, ChatAgent): + response_format = self._agent.chat_options.response_format + response = AgentRunResponse.from_agent_run_response_updates( + updates, + output_format_type=response_format, + ) + else: + response = AgentRunResponse.from_agent_run_response_updates(updates) + + # Handle any user input requests after the streaming completes + if user_input_requests: + for user_input_request in user_input_requests: + self._pending_agent_requests[user_input_request.id] = user_input_request + await ctx.request_info(user_input_request, FunctionApprovalResponseContent) + return None + + return response diff --git a/python/packages/core/tests/workflow/test_agent_executor.py b/python/packages/core/tests/workflow/test_agent_executor.py index 3bda2fcaad..77fd969f12 100644 --- a/python/packages/core/tests/workflow/test_agent_executor.py +++ b/python/packages/core/tests/workflow/test_agent_executor.py @@ -111,6 +111,10 @@ async def test_agent_executor_checkpoint_stores_and_restores_state() -> None: chat_store_state = thread_state["chat_message_store_state"] # type: ignore[index] assert "messages" in chat_store_state, "Message store state should include messages" + # Verify checkpoint contains pending requests from agents and responses to be sent + assert "pending_agent_requests" in executor_state + assert "pending_responses_to_agent" in executor_state + # Create a new agent and executor for restoration # This simulates starting from a fresh state and restoring from checkpoint restored_agent = _CountingAgent(id="test_agent", name="TestAgent") diff --git a/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py b/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py index 8124f6253d..a7849120b0 100644 --- a/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py +++ b/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py @@ -5,19 +5,32 @@ from collections.abc import AsyncIterable from typing import Any +from typing_extensions import Never + from agent_framework import ( AgentExecutor, + AgentExecutorResponse, AgentRunResponse, AgentRunResponseUpdate, AgentRunUpdateEvent, AgentThread, BaseAgent, + ChatAgent, ChatMessage, + ChatResponse, + ChatResponseUpdate, + FunctionApprovalRequestContent, FunctionCallContent, FunctionResultContent, + RequestInfoEvent, Role, TextContent, WorkflowBuilder, + WorkflowContext, + WorkflowOutputEvent, + ai_function, + executor, + use_function_invocation, ) @@ -120,3 +133,235 @@ async def test_agent_executor_emits_tool_calls_in_streaming_mode() -> None: assert events[3].data is not None assert isinstance(events[3].data.contents[0], TextContent) assert "sunny" in events[3].data.contents[0].text + + +@ai_function(approval_mode="always_require") +def mock_tool_requiring_approval(query: str) -> str: + """Mock tool that requires approval before execution.""" + return f"Executed tool with query: {query}" + + +@use_function_invocation +class MockChatClient: + """Simple implementation of a chat client.""" + + def __init__(self, parallel_request: bool = False) -> None: + self.additional_properties: dict[str, Any] = {} + self._iteration: int = 0 + self._parallel_request: bool = parallel_request + + async def get_response( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage], + **kwargs: Any, + ) -> ChatResponse: + if self._iteration == 0: + if self._parallel_request: + response = ChatResponse( + messages=ChatMessage( + role="assistant", + contents=[ + FunctionCallContent( + call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ), + FunctionCallContent( + call_id="2", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ), + ], + ) + ) + else: + response = ChatResponse( + messages=ChatMessage( + role="assistant", + contents=[ + FunctionCallContent( + call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ) + ], + ) + ) + else: + response = ChatResponse(messages=ChatMessage(role="assistant", text="Tool executed successfully.")) + + self._iteration += 1 + return response + + async def get_streaming_response( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage], + **kwargs: Any, + ) -> AsyncIterable[ChatResponseUpdate]: + if self._iteration == 0: + if self._parallel_request: + yield ChatResponseUpdate( + contents=[ + FunctionCallContent( + call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ), + FunctionCallContent( + call_id="2", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ), + ], + role="assistant", + ) + else: + yield ChatResponseUpdate( + contents=[ + FunctionCallContent( + call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}' + ) + ], + role="assistant", + ) + else: + yield ChatResponseUpdate(text=TextContent(text="Tool executed "), role="assistant") + yield ChatResponseUpdate(contents=[TextContent(text="successfully.")], role="assistant") + + self._iteration += 1 + + +@executor(id="test_executor") +async def test_executor(agent_executor_response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(agent_executor_response.agent_run_response.text) + + +async def test_agent_executor_tool_call_with_approval() -> None: + """Test that AgentExecutor handles tool calls requiring approval.""" + # Arrange + agent = ChatAgent( + chat_client=MockChatClient(), + name="ApprovalAgent", + tools=[mock_tool_requiring_approval], + ) + + workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build() + + # Act + events = await workflow.run("Invoke tool requiring approval") + + # Assert + assert len(events.get_request_info_events()) == 1 + approval_request = events.get_request_info_events()[0] + assert isinstance(approval_request.data, FunctionApprovalRequestContent) + assert approval_request.data.function_call.name == "mock_tool_requiring_approval" + assert approval_request.data.function_call.arguments == '{"query": "test"}' + + # Act + events = await workflow.send_responses({approval_request.request_id: approval_request.data.create_response(True)}) + + # Assert + final_response = events.get_outputs() + assert len(final_response) == 1 + assert final_response[0] == "Tool executed successfully." + + +async def test_agent_executor_tool_call_with_approval_streaming() -> None: + """Test that AgentExecutor handles tool calls requiring approval in streaming mode.""" + # Arrange + agent = ChatAgent( + chat_client=MockChatClient(), + name="ApprovalAgent", + tools=[mock_tool_requiring_approval], + ) + + workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build() + + # Act + request_info_events: list[RequestInfoEvent] = [] + async for event in workflow.run_stream("Invoke tool requiring approval"): + if isinstance(event, RequestInfoEvent): + request_info_events.append(event) + + # Assert + assert len(request_info_events) == 1 + approval_request = request_info_events[0] + assert isinstance(approval_request.data, FunctionApprovalRequestContent) + assert approval_request.data.function_call.name == "mock_tool_requiring_approval" + assert approval_request.data.function_call.arguments == '{"query": "test"}' + + # Act + output: str | None = None + async for event in workflow.send_responses_streaming({ + approval_request.request_id: approval_request.data.create_response(True) + }): + if isinstance(event, WorkflowOutputEvent): + output = event.data + + # Assert + assert output is not None + assert output == "Tool executed successfully." + + +async def test_agent_executor_parallel_tool_call_with_approval() -> None: + """Test that AgentExecutor handles parallel tool calls requiring approval.""" + # Arrange + agent = ChatAgent( + chat_client=MockChatClient(parallel_request=True), + name="ApprovalAgent", + tools=[mock_tool_requiring_approval], + ) + + workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build() + + # Act + events = await workflow.run("Invoke tool requiring approval") + + # Assert + assert len(events.get_request_info_events()) == 2 + for approval_request in events.get_request_info_events(): + assert isinstance(approval_request.data, FunctionApprovalRequestContent) + assert approval_request.data.function_call.name == "mock_tool_requiring_approval" + assert approval_request.data.function_call.arguments == '{"query": "test"}' + + # Act + responses = { + approval_request.request_id: approval_request.data.create_response(True) # type: ignore + for approval_request in events.get_request_info_events() + } + events = await workflow.send_responses(responses) + + # Assert + final_response = events.get_outputs() + assert len(final_response) == 1 + assert final_response[0] == "Tool executed successfully." + + +async def test_agent_executor_parallel_tool_call_with_approval_streaming() -> None: + """Test that AgentExecutor handles parallel tool calls requiring approval in streaming mode.""" + # Arrange + agent = ChatAgent( + chat_client=MockChatClient(parallel_request=True), + name="ApprovalAgent", + tools=[mock_tool_requiring_approval], + ) + + workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build() + + # Act + request_info_events: list[RequestInfoEvent] = [] + async for event in workflow.run_stream("Invoke tool requiring approval"): + if isinstance(event, RequestInfoEvent): + request_info_events.append(event) + + # Assert + assert len(request_info_events) == 2 + for approval_request in request_info_events: + assert isinstance(approval_request.data, FunctionApprovalRequestContent) + assert approval_request.data.function_call.name == "mock_tool_requiring_approval" + assert approval_request.data.function_call.arguments == '{"query": "test"}' + + # Act + responses = { + approval_request.request_id: approval_request.data.create_response(True) # type: ignore + for approval_request in request_info_events + } + + output: str | None = None + async for event in workflow.send_responses_streaming(responses): + if isinstance(event, WorkflowOutputEvent): + output = event.data + + # Assert + assert output is not None + assert output == "Tool executed successfully." diff --git a/python/samples/README.md b/python/samples/README.md index f70a390892..f29a9fa6b6 100644 --- a/python/samples/README.md +++ b/python/samples/README.md @@ -281,6 +281,7 @@ This directory contains samples demonstrating the capabilities of Microsoft Agen | File | Description | |------|-------------| | [`getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py`](./getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py) | Sample: Human in the loop guessing game | +| [`getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py`](./getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py) | Sample: Agents with Approval Requests in Workflows | ### Observability diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 49cbb81e21..1def803373 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -78,6 +78,7 @@ Once comfortable with these, explore the rest of the samples below. |---|---|---| | Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human | | Azure Agents Tool Feedback Loop | [agents/azure_chat_agents_tool_calls_with_feedback.py](./agents/azure_chat_agents_tool_calls_with_feedback.py) | Two-agent workflow that streams tool calls and pauses for human guidance between passes | +| Agents with Approval Requests in Workflows | [human-in-the-loop/agents_with_approval_requests.py](./human-in-the-loop/agents_with_approval_requests.py) | Agents that create approval requests during workflow execution and wait for human approval to proceed | ### observability diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py new file mode 100644 index 0000000000..a51088e886 --- /dev/null +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -0,0 +1,340 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import json +from dataclasses import dataclass +from typing import Annotated, Never + +from agent_framework import ( + AgentExecutorResponse, + ChatMessage, + Executor, + FunctionApprovalRequestContent, + FunctionApprovalResponseContent, + WorkflowBuilder, + WorkflowContext, + ai_function, + executor, + handler, +) +from agent_framework.openai import OpenAIChatClient + +""" +Sample: Agents in a workflow with AI functions requiring approval + +This sample creates a workflow that automatically replies to incoming emails. +If historical email data is needed, it uses an AI function to read the data, +which requires human approval before execution. + +This sample works as follows: +1. An incoming email is received by the workflow. +2. The EmailPreprocessor executor preprocesses the email, adding special notes if the sender is important. +3. The preprocessed email is sent to the Email Writer agent, which generates a response. +4. If the agent needs to read historical email data, it calls the read_historical_email_data AI function, + which triggers an approval request. +5. The sample automatically approves the request for demonstration purposes. +6. Once approved, the AI function executes and returns the historical email data to the agent. +7. The agent uses the historical data to compose a comprehensive email response. +8. The response is sent to the conclude_workflow_executor, which yields the final response. + +Purpose: +Show how to integrate AI functions with approval requests into a workflow. + +Demonstrate: +- Creating AI functions that require approval before execution. +- Building a workflow that includes an agent and executors. +- Handling approval requests during workflow execution. + +Prerequisites: +- Azure AI Agent Service configured, along with the required environment variables. +- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample. +- Basic familiarity with WorkflowBuilder, edges, events, RequestInfoEvent, and streaming runs. +""" + + +@ai_function +def get_current_date() -> str: + """Get the current date in YYYY-MM-DD format.""" + # For demonstration purposes, we return a fixed date. + return "2025-11-07" + + +@ai_function +def get_team_members_email_addresses() -> list[dict[str, str]]: + """Get the email addresses of team members.""" + # In a real implementation, this might query a database or directory service. + return [ + { + "name": "Alice", + "email": "alice@contoso.com", + "position": "Software Engineer", + "manager": "John Doe", + }, + { + "name": "Bob", + "email": "bob@contoso.com", + "position": "Product Manager", + "manager": "John Doe", + }, + { + "name": "Charlie", + "email": "charlie@contoso.com", + "position": "Senior Software Engineer", + "manager": "John Doe", + }, + { + "name": "Mike", + "email": "mike@contoso.com", + "position": "Principal Software Engineer Manager", + "manager": "VP of Engineering", + }, + ] + + +@ai_function +def get_my_information() -> dict[str, str]: + """Get my personal information.""" + return { + "name": "John Doe", + "email": "john@contoso.com", + "position": "Software Engineer Manager", + "manager": "Mike", + } + + +@ai_function(approval_mode="always_require") +async def read_historical_email_data( + email_address: Annotated[str, "The email address to read historical data from"], + start_date: Annotated[str, "The start date in YYYY-MM-DD format"], + end_date: Annotated[str, "The end date in YYYY-MM-DD format"], +) -> list[dict[str, str]]: + """Read historical email data for a given email address and date range.""" + historical_data = { + "alice@contoso.com": [ + { + "from": "alice@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-05", + "subject": "Bug Bash Results", + "body": "We just completed the bug bash and found a few issues that need immediate attention.", + }, + { + "from": "alice@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-03", + "subject": "Code Freeze", + "body": "We are entering code freeze starting tomorrow.", + }, + ], + "bob@contoso.com": [ + { + "from": "bob@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-04", + "subject": "Team Outing", + "body": "Don't forget about the team outing this Friday!", + }, + { + "from": "bob@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-02", + "subject": "Requirements Update", + "body": "The requirements for the new feature have been updated. Please review them.", + }, + ], + "charlie@contoso.com": [ + { + "from": "charlie@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-05", + "subject": "Project Update", + "body": "The bug bash went well. A few critical bugs but should be fixed by the end of the week.", + }, + { + "from": "charlie@contoso.com", + "to": "john@contoso.com", + "date": "2025-11-06", + "subject": "Code Review", + "body": "Please review my latest code changes.", + }, + ], + } + + emails = historical_data.get(email_address, []) + return [email for email in emails if start_date <= email["date"] <= end_date] + + +@ai_function(approval_mode="always_require") +async def send_email( + to: Annotated[str, "The recipient email address"], + subject: Annotated[str, "The email subject"], + body: Annotated[str, "The email body"], +) -> str: + """Send an email.""" + await asyncio.sleep(1) # Simulate sending email + return "Email successfully sent." + + +@dataclass +class Email: + sender: str + subject: str + body: str + + +class EmailPreprocessor(Executor): + def __init__(self, special_email_addresses: set[str]) -> None: + super().__init__(id="email_preprocessor") + self.special_email_addresses = special_email_addresses + + @handler + async def preprocess(self, email: Email, ctx: WorkflowContext[str]) -> None: + """Preprocess the incoming email.""" + message = str(email) + if email.sender in self.special_email_addresses: + note = ( + "Pay special attention to this sender. This email is very important. " + "Gather relevant information from all previous emails within my team before responding." + ) + message = f"{note}\n\n{message}" + + await ctx.send_message(message) + + +@executor(id="conclude_workflow_executor") +async def conclude_workflow( + email_response: AgentExecutorResponse, + ctx: WorkflowContext[Never, str], +) -> None: + """Conclude the workflow by yielding the final email response.""" + await ctx.yield_output(email_response.agent_run_response.text) + + +async def main() -> None: + # Create the agent and executors + chat_client = OpenAIChatClient() + email_writer = chat_client.create_agent( + name="Email Writer", + instructions=("You are an excellent email assistant. You respond to incoming emails."), + # tools with `approval_mode="always_require"` will trigger approval requests + tools=[ + read_historical_email_data, + send_email, + get_current_date, + get_team_members_email_addresses, + get_my_information, + ], + ) + email_preprocessor = EmailPreprocessor(special_email_addresses={"mike@contoso.com"}) + + # Build the workflow + workflow = ( + WorkflowBuilder() + .set_start_executor(email_preprocessor) + .add_edge(email_preprocessor, email_writer) + .add_edge(email_writer, conclude_workflow) + .build() + ) + + # Simulate an incoming email + incoming_email = Email( + sender="mike@contoso.com", + subject="Important: Project Update", + body="Please provide your team's status update on the project since last week.", + ) + + responses: dict[str, FunctionApprovalResponseContent] = {} + output: list[ChatMessage] | None = None + while True: + if responses: + events = await workflow.send_responses(responses) + responses.clear() + else: + events = await workflow.run(incoming_email) + + request_info_events = events.get_request_info_events() + for request_info_event in request_info_events: + # We should only expect FunctionApprovalRequestContent in this sample + if not isinstance(request_info_event.data, FunctionApprovalRequestContent): + raise ValueError(f"Unexpected request info content type: {type(request_info_event.data)}") + + # Pretty print the function call details + arguments = json.dumps(request_info_event.data.function_call.parse_arguments(), indent=2) + print( + f"Received approval request for function: {request_info_event.data.function_call.name} " + f"with args:\n{arguments}" + ) + + # For demo purposes, we automatically approve the request + # The expected response type of the request is `FunctionApprovalResponseContent`, + # which can be created via `create_response` method on the request content + print("Performing automatic approval for demo purposes...") + responses[request_info_event.request_id] = request_info_event.data.create_response(approved=True) + + # Once we get an output event, we can conclude the workflow + # Outputs can only be produced by the conclude_workflow_executor in this sample + if outputs := events.get_outputs(): + # We expect only one output from the conclude_workflow_executor + output = outputs[0] + break + + if not output: + raise RuntimeError("Workflow did not produce any output event.") + + print("Final email response conversation:") + print(output) + + """ + Sample Output: + Received approval request for function: read_historical_email_data with args: + { + "email_address": "alice@contoso.com", + "start_date": "2025-10-31", + "end_date": "2025-11-07" + } + Performing automatic approval for demo purposes... + Received approval request for function: read_historical_email_data with args: + { + "email_address": "bob@contoso.com", + "start_date": "2025-10-31", + "end_date": "2025-11-07" + } + Performing automatic approval for demo purposes... + Received approval request for function: read_historical_email_data with args: + { + "email_address": "charlie@contoso.com", + "start_date": "2025-10-31", + "end_date": "2025-11-07" + } + Performing automatic approval for demo purposes... + Received approval request for function: send_email with args: + { + "to": "mike@contoso.com", + "subject": "Team's Status Update on the Project", + "body": " + Hi Mike, + + Here's the status update from our team: + - **Bug Bash and Code Freeze:** + - We recently completed a bug bash, during which several issues were identified. Alice and Charlie are working on fixing these critical bugs, and we anticipate resolving them by the end of this week. + - We have entered a code freeze as of November 4, 2025. + + - **Requirements Update:** + - Bob has updated the requirements for a new feature, and all team members are reviewing these changes to ensure alignment. + + - **Ongoing Reviews:** + - Charlie has submitted his latest code changes for review to ensure they meet our quality standards. + + Please let me know if you need more detailed information or have any questions. + + Best regards, + John" + } + Performing automatic approval for demo purposes... + Final email response conversation: + I've sent the status update to Mike with the relevant information from the team. Let me know if there's anything else you need + """ # noqa: E501 + + +if __name__ == "__main__": + asyncio.run(main())