Skip to content

Commit 6a72b58

Browse files
committed
refactor: Unify ExecutorIsh and ExecutorRegistration
* Switch to more modern Record type-tree for Sum Types * Unify APIs for getting ExecutorRegistration * Fix an issue where workflows consisting entirely of cross-run shareable executors which are not instance-resettable do not properly clear state when running non-concurrently.
1 parent 0fc1b88 commit 6a72b58

24 files changed

+618
-500
lines changed

dotnet/samples/GettingStarted/Workflows/_Foundational/06_SubWorkflows/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private static async Task Main()
4040
.Build();
4141

4242
// Step 2: Configure the sub-workflow as an executor for use in the parent workflow
43-
ExecutorIsh subWorkflowExecutor = subWorkflow.ConfigureSubWorkflow("TextProcessingSubWorkflow");
43+
ExecutorRegistration subWorkflowExecutor = subWorkflow.AsExecutor("TextProcessingSubWorkflow");
4444

4545
// Step 3: Build a main workflow that uses the sub-workflow as an executor
4646
Console.WriteLine("Building main workflow that uses the sub-workflow as an executor...\n");

dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowModelBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public void Connect(IModeledAction source, IModeledAction target, Func<object?,
2424
condition);
2525
}
2626

27-
private static ExecutorIsh GetExecutorIsh(IModeledAction action) =>
27+
private static ExecutorRegistration GetExecutorIsh(IModeledAction action) =>
2828
action switch
2929
{
3030
RequestPortAction port => port.RequestPort,
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using Microsoft.Agents.AI.Workflows.Specialized;
4+
using Microsoft.Shared.Diagnostics;
5+
6+
namespace Microsoft.Agents.AI.Workflows;
7+
8+
/// <summary>
9+
/// Represents the registration details for an AI agent, including configuration options for event emission.
10+
/// </summary>
11+
/// <param name="Agent">The AI agent.</param>
12+
/// <param name="EmitEvents">Specifies whether the agent should emit events. If null, the default behavior is applied.</param>
13+
public record AIAgentRegistration(AIAgent Agent, bool EmitEvents = false)
14+
: ExecutorRegistration(Throw.IfNull(Agent).Name ?? Throw.IfNull(Agent.Id),
15+
(_) => new(new AIAgentHostExecutor(Agent, EmitEvents)),
16+
typeof(AIAgentHostExecutor),
17+
Agent)
18+
{
19+
/// <inheritdoc/>
20+
public override bool SupportsConcurrentSharedExecution => true;
21+
22+
/// <inheritdoc/>
23+
public override bool SupportsResetting => false;
24+
}

dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private static Workflow BuildSequentialCore(string? workflowName, params IEnumer
3939
// Create a builder that chains the agents together in sequence. The workflow simply begins
4040
// with the first agent in the sequence.
4141
WorkflowBuilder? builder = null;
42-
ExecutorIsh? previous = null;
42+
ExecutorRegistration? previous = null;
4343
foreach (var agent in agents)
4444
{
4545
AgentRunStreamingExecutor agentExecutor = new(agent, includeInputInOutput: true);
@@ -124,8 +124,8 @@ private static Workflow BuildConcurrentCore(
124124
// so that the final accumulator receives a single list of messages from each agent. Otherwise, the
125125
// accumulator would not be able to determine what came from what agent, as there's currently no
126126
// provenance tracking exposed in the workflow context passed to a handler.
127-
ExecutorIsh[] agentExecutors = (from agent in agents select (ExecutorIsh)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray();
128-
ExecutorIsh[] accumulators = [.. from agent in agentExecutors select (ExecutorIsh)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")];
127+
ExecutorRegistration[] agentExecutors = (from agent in agents select (ExecutorRegistration)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray();
128+
ExecutorRegistration[] accumulators = [.. from agent in agentExecutors select (ExecutorRegistration)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")];
129129
builder.AddFanOutEdge(start, targets: agentExecutors);
130130
for (int i = 0; i < agentExecutors.Length; i++)
131131
{
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System;
4+
using Microsoft.Shared.Diagnostics;
5+
6+
namespace Microsoft.Agents.AI.Workflows;
7+
8+
// TODO: Unwrap the Configured object, just like for WorkflowRegistration
9+
internal record ConfiguredExecutorRegistration(Configured<Executor> ConfiguredExecutor, Type ExecutorType)
10+
: ExecutorRegistration(Throw.IfNull(ConfiguredExecutor).Id,
11+
ConfiguredExecutor.BoundFactoryAsync,
12+
ExecutorType,
13+
ConfiguredExecutor.Raw)
14+
{
15+
/// <inheritdoc/>
16+
public override bool SupportsConcurrentSharedExecution => true;
17+
18+
/// <inheritdoc/>
19+
public override bool SupportsResetting => false;
20+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System.Threading.Tasks;
4+
using Microsoft.Shared.Diagnostics;
5+
6+
namespace Microsoft.Agents.AI.Workflows;
7+
8+
/// <summary>
9+
/// Initializes a new instance of the ExecutorRegistrationEx class using the specified executor.
10+
/// </summary>
11+
/// <param name="ExecutorInstance">The executor instance to register. Cannot be null.</param>
12+
public record ExecutorInstanceRegistration(Executor ExecutorInstance)
13+
: ExecutorRegistration(Throw.IfNull(ExecutorInstance).Id,
14+
(_) => new(ExecutorInstance),
15+
ExecutorInstance.GetType(),
16+
ExecutorInstance)
17+
{
18+
/// <inheritdoc/>
19+
public override bool SupportsConcurrentSharedExecution => this.ExecutorInstance.IsCrossRunShareable;
20+
21+
/// <inheritdoc/>
22+
public override bool SupportsResetting => this.ExecutorInstance is IResettableExecutor;
23+
24+
/// <inheritdoc/>
25+
protected override async ValueTask<bool> ResetCoreAsync()
26+
{
27+
if (this.ExecutorInstance is IResettableExecutor resettable)
28+
{
29+
await resettable.ResetAsync().ConfigureAwait(false);
30+
return true;
31+
}
32+
33+
return false;
34+
}
35+
}

0 commit comments

Comments
 (0)