-
Notifications
You must be signed in to change notification settings - Fork 465
Open
Labels
.NETpythonsquad: workflowsAgent Framework Workflows SquadAgent Framework Workflows SquadworkflowsRelated to Workflows in agent-frameworkRelated to Workflows in agent-framework
Description
Add Support for Long-Running Executors and Workflows
Overview
Workflows currently lack explicit patterns and infrastructure for handling long-running operations that may take minutes, hours, or days to complete. While checkpointing provides pause/resume capability, there's no comprehensive support for:
- External job polling
- Executor-level timeouts
- Retry/backoff strategies
- Durable timers
- Background work patterns
Current State
What Exists
Checkpointing (_checkpoint.py, ICheckpointStore.cs)
- Workflows pause at superstep boundaries
- State persists to storage (file, memory, etc.)
- Can resume from any checkpoint
- Python:
workflow.run_stream_from_checkpoint(checkpoint_id, storage)
- .NET:
ICheckpointStore<T>.RetrieveCheckpointAsync(runId, key)
Human-in-the-Loop (_request_info_executor.py)
RequestInfoExecutor
pauses for external input- Emits
RequestInfoEvent
with request details - Workflow blocks until
handle_response()
called - Pending requests survive checkpointing
Iteration Limits (_runner.py:39, _runner.py:150)
- Python:
max_iterations
parameter prevents runaway workflows - Raises
RuntimeError
if limit exceeded with pending messages
Async Execution
- All executors are async by default
- Python: handlers are
async def
- .NET: handlers return
ValueTask
- CancellationToken support in .NET
What's Missing
1. Executor-Level Timeouts
- No way to specify max execution time for individual executors
- Long-running agent calls can block indefinitely
- No timeout enforcement at executor boundary
2. External Job Polling Patterns
- No built-in support for "start job, poll until done"
- Common scenario: submit to external API, wait for completion
- No guidance on implementing polling executors
3. Retry and Backoff
- No retry logic for transient failures
- No exponential backoff patterns
- Manual implementation required in every executor
4. Durable Timers
- No way to schedule workflow resumption at specific time
- Cannot implement "wait 24 hours then resume"
- No timer persistence across checkpoint/restore
5. Background Work
- Executors must complete before workflow advances
- No pattern for "fire and forget" operations
- No background task lifecycle management
6. Workflow-Level Timeouts
- No max wall-clock time for entire workflow
- Cannot enforce "this workflow must complete within 1 hour"
- Only iteration count limits exist
7. Progress Tracking
- No standard pattern for long-running executor progress
- Cannot emit incremental progress for operations like "processing 10,000 items"
AgentRunUpdateEvent
only covers agent streaming
Use Cases
Use Case 1: External Job Polling
# Submit a training job, poll until complete (may take hours)
class MLTrainingExecutor(Executor):
@handler
async def train(self, request: TrainRequest, ctx: WorkflowContext[TrainResult]) -> None:
# Start job
job_id = await ml_service.submit_training(request.model_config)
# Need to poll, but how?
# Option A: Loop with sleep (blocks workflow)
# Option B: Return and re-invoke (need durable timer)
# Option C: Checkpoint-based polling (complex)
Desired:
class MLTrainingExecutor(LongRunningExecutor):
@handler
async def train(self, request: TrainRequest, ctx: WorkflowContext[TrainResult]) -> None:
job_id = await ml_service.submit_training(request.model_config)
# Poll with exponential backoff, auto-checkpoint between polls
result = await ctx.poll_until_complete(
check=lambda: ml_service.get_job_status(job_id),
is_complete=lambda status: status.state in ["SUCCEEDED", "FAILED"],
max_duration=timedelta(hours=6),
backoff=ExponentialBackoff(initial=10, max=300),
)
await ctx.send_message(TrainResult(job_id, result))
Use Case 2: Agent with Timeout
# Prevent agent from running indefinitely
class SummarizationExecutor(AgentExecutor):
def __init__(self, agent: AgentProtocol):
super().__init__(agent, timeout=timedelta(minutes=5))
# Automatically cancels if agent exceeds timeout
Use Case 3: Workflow SLA Enforcement
# Entire workflow must complete within SLA
workflow = (
WorkflowBuilder()
.add_edge(...)
.with_timeout(timedelta(hours=1)) # Fail if not done in 1 hour
.with_checkpointing(storage)
.build()
)
Use Case 4: Retry with Backoff
class APICallExecutor(Executor):
@handler
@retry(max_attempts=3, backoff=ExponentialBackoff())
async def call_api(self, request: APIRequest, ctx: WorkflowContext[APIResponse]) -> None:
# Automatically retries on failure with backoff
response = await external_api.call(request)
await ctx.send_message(response)
Use Case 5: Scheduled Workflow Resumption
# Pause workflow, resume after delay
class ApprovalExecutor(Executor):
@handler
async def request_approval(self, doc: Document, ctx: WorkflowContext) -> None:
await send_approval_email(doc)
# Resume in 24 hours if no response
await ctx.schedule_resume(after=timedelta(hours=24))
# Or resume at specific time
await ctx.schedule_resume(at=datetime(2025, 1, 15, 9, 0))
Related Code
Python:
python/packages/core/agent_framework/_workflows/_runner.py
- workflow executionpython/packages/core/agent_framework/_workflows/_executor.py
- base executorpython/packages/core/agent_framework/_workflows/_checkpoint.py
- checkpointingpython/packages/core/agent_framework/_workflows/_request_info_executor.py
- HITL patternspython/samples/getting_started/workflows/checkpoint/
- checkpoint examples
.NET:
dotnet/src/Microsoft.Agents.AI.Workflows/Execution/
- workflow executiondotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs
- base executordotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/ICheckpointStore.cs
- checkpointingdotnet/src/Microsoft.Agents.AI.Workflows/Execution/InputWaiter.cs
- async coordination
Metadata
Metadata
Assignees
Labels
.NETpythonsquad: workflowsAgent Framework Workflows SquadAgent Framework Workflows SquadworkflowsRelated to Workflows in agent-frameworkRelated to Workflows in agent-framework