Skip to content

Commit 607a678

Browse files
committed
fix: .NET: Concurrency Support for Orchestrations
Concurrent run support was recently added to workflows, but Orchestrations did not fully update to support it. A few executors were missing Cross-Run Shareable annotations, and the ConcurrentEnd executor needed to be factory-instantiated. This also ports the fix for #1613 from #1637, to avoid waiting on that PR.
1 parent aba505d commit 607a678

17 files changed

+447
-90
lines changed

dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Collections.Generic;
55
using System.Diagnostics;
66
using System.Linq;
7+
using System.Threading.Tasks;
78
using Microsoft.Agents.AI.Workflows.Specialized;
89
using Microsoft.Extensions.AI;
910
using Microsoft.Shared.Diagnostics;
@@ -136,7 +137,12 @@ private static Workflow BuildConcurrentCore(
136137
// each agent's accumulator to it. If no aggregation function was provided, we default to returning
137138
// the last message from each agent
138139
aggregator ??= static lists => (from list in lists where list.Count > 0 select list.Last()).ToList();
139-
ConcurrentEndExecutor end = new(agentExecutors.Length, aggregator);
140+
141+
Func<string, string, ValueTask<ConcurrentEndExecutor>> endFactory =
142+
(string _, string __) => new(new ConcurrentEndExecutor(agentExecutors.Length, aggregator));
143+
144+
ExecutorIsh end = endFactory.ConfigureFactory(ConcurrentEndExecutor.ExecutorId);
145+
140146
builder.AddFanInEdge(end, sources: accumulators);
141147

142148
builder = builder.WithOutputFrom(end);

dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorRegistration.cs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,29 @@ internal sealed class ExecutorRegistration(string id, Type executorType, Executo
1313
public string Id { get; } = Throw.IfNullOrEmpty(id);
1414
public Type ExecutorType { get; } = Throw.IfNull(executorType);
1515
private ExecutorFactoryF ProviderAsync { get; } = Throw.IfNull(provider);
16-
public bool IsNotExecutorInstance { get; } = rawData is not Executor;
17-
public bool IsUnresettableSharedInstance { get; } = rawData is Executor executor &&
18-
// Cross-Run Shareable executors are "trivially" resettable, since they
19-
// have no on-object state.
20-
!executor.IsCrossRunShareable &&
21-
rawData is not IResettableExecutor;
22-
public bool SupportsConcurrent { get; } = (rawData is not Executor executor || executor.IsCrossRunShareable) &&
23-
(rawData is not Workflow workflow || workflow.AllowConcurrent);
16+
17+
public bool IsSharedInstance { get; } = rawData is Executor;
18+
19+
/// <summary>
20+
/// Gets a value whether instances of the executor created from this registration can be reset between subsequent
21+
/// runs from the same <see cref="Workflow"/> instance. This value is not relevant for executors that <see
22+
/// cref="SupportsConcurrent"/>.
23+
/// </summary>
24+
public bool SupportsResetting { get; } = rawData is Executor &&
25+
// Cross-Run Shareable executors are "trivially" resettable, since they
26+
// have no on-object state.
27+
rawData is IResettableExecutor;
28+
29+
/// <summary>
30+
/// Gets a value whether instances of the executor created from this registration can be used in concurrent runs
31+
/// from the same <see cref="Workflow"/> instance.
32+
/// </summary>
33+
public bool SupportsConcurrent { get; } = rawData is not Executor executor || executor.IsCrossRunShareable;
2434

2535
internal async ValueTask<bool> TryResetAsync()
2636
{
27-
if (this.IsUnresettableSharedInstance)
28-
{
29-
return false;
30-
}
31-
32-
// If the executor supports concurrent use, then resetting is a no-op.
33-
if (this.SupportsConcurrent)
37+
// Non-shared instances do not need resetting
38+
if (!this.IsSharedInstance)
3439
{
3540
return true;
3641
}

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/CollectChatMessagesExecutor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace Microsoft.Agents.AI.Workflows.Specialized;
1111
/// Provides an executor that batches received chat messages that it then releases when
1212
/// receiving a <see cref="TurnToken"/>.
1313
/// </summary>
14-
internal sealed class CollectChatMessagesExecutor(string id) : ChatProtocolExecutor(id), IResettableExecutor
14+
internal sealed class CollectChatMessagesExecutor(string id) : ChatProtocolExecutor(id, declareCrossRunShareable: true), IResettableExecutor
1515
{
1616
/// <inheritdoc/>
1717
protected override ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/ConcurrentEndExecutor.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ namespace Microsoft.Agents.AI.Workflows.Specialized;
1414
/// </summary>
1515
internal sealed class ConcurrentEndExecutor : Executor, IResettableExecutor
1616
{
17+
public const string ExecutorId = "ConcurrentEnd";
18+
1719
private readonly int _expectedInputs;
1820
private readonly Func<IList<List<ChatMessage>>, List<ChatMessage>> _aggregator;
1921
private List<List<ChatMessage>> _allResults;
2022
private int _remaining;
2123

22-
public ConcurrentEndExecutor(int expectedInputs, Func<IList<List<ChatMessage>>, List<ChatMessage>> aggregator) : base("ConcurrentEnd")
24+
public ConcurrentEndExecutor(int expectedInputs, Func<IList<List<ChatMessage>>, List<ChatMessage>> aggregator) : base(ExecutorId)
2325
{
2426
this._expectedInputs = expectedInputs;
2527
this._aggregator = Throw.IfNull(aggregator);

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Microsoft.Agents.AI.Workflows.Specialized;
1414
/// <summary>Executor used to represent an agent in a handoffs workflow, responding to <see cref="HandoffState"/> events.</summary>
1515
internal sealed class HandoffAgentExecutor(
1616
AIAgent agent,
17-
string? handoffInstructions) : Executor(agent.GetDescriptiveId()), IResettableExecutor
17+
string? handoffInstructions) : Executor(agent.GetDescriptiveId(), declareCrossRunShareable: true), IResettableExecutor
1818
{
1919
private static readonly JsonElement s_handoffSchema = AIFunctionFactory.Create(
2020
([Description("The reason for the handoff")] string? reasonForHandoff) => { }).JsonSchema;
@@ -70,9 +70,9 @@ protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
7070
List<ChatMessage>? roleChanges = allMessages.ChangeAssistantToUserForOtherParticipants(this._agent.DisplayName);
7171

7272
await foreach (var update in this._agent.RunStreamingAsync(allMessages,
73-
options: this._agentOptions,
74-
cancellationToken: cancellationToken)
75-
.ConfigureAwait(false))
73+
options: this._agentOptions,
74+
cancellationToken: cancellationToken)
75+
.ConfigureAwait(false))
7676
{
7777
await AddUpdateAsync(update, cancellationToken).ConfigureAwait(false);
7878

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsEndExecutor.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
namespace Microsoft.Agents.AI.Workflows.Specialized;
66

77
/// <summary>Executor used at the end of a handoff workflow to raise a final completed event.</summary>
8-
internal sealed class HandoffsEndExecutor() : Executor("HandoffEnd"), IResettableExecutor
8+
internal sealed class HandoffsEndExecutor() : Executor(ExecutorId, declareCrossRunShareable: true), IResettableExecutor
99
{
10+
public const string ExecutorId = "HandoffEnd";
11+
1012
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
1113
routeBuilder.AddHandler<HandoffState>((handoff, context, cancellationToken) =>
1214
context.YieldOutputAsync(handoff.Messages, cancellationToken));

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsStartExecutor.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
namespace Microsoft.Agents.AI.Workflows.Specialized;
99

1010
/// <summary>Executor used at the start of a handoffs workflow to accumulate messages and emit them as HandoffState upon receiving a turn token.</summary>
11-
internal sealed class HandoffsStartExecutor() : ChatProtocolExecutor("HandoffStart", DefaultOptions), IResettableExecutor
11+
internal sealed class HandoffsStartExecutor() : ChatProtocolExecutor(ExecutorId, DefaultOptions, declareCrossRunShareable: true), IResettableExecutor
1212
{
13+
internal const string ExecutorId = "HandoffStart";
14+
1315
private static ChatProtocolExecutorOptions DefaultOptions => new()
1416
{
1517
StringMessageChatRole = ChatRole.User

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public static partial class AgentWorkflowBuilder
1313
/// Provides an executor that batches received chat messages that it then publishes as the final result
1414
/// when receiving a <see cref="TurnToken"/>.
1515
/// </summary>
16-
internal sealed class OutputMessagesExecutor() : ChatProtocolExecutor("OutputMessages"), IResettableExecutor
16+
internal sealed class OutputMessagesExecutor() : ChatProtocolExecutor("OutputMessages", declareCrossRunShareable: true), IResettableExecutor
1717
{
1818
protected override ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)
1919
=> context.YieldOutputAsync(messages, cancellationToken);

dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ internal Workflow(string startExecutorId, string? name = null, string? descripti
8686
}
8787

8888
private bool _needsReset;
89-
private bool IsResettable => this.Registrations.Values.All(registration => !registration.IsUnresettableSharedInstance);
90-
89+
private bool HasResettable => this.Registrations.Values.Any(registration => registration.SupportsResetting);
9190
private async ValueTask<bool> TryResetExecutorRegistrationsAsync()
9291
{
93-
if (this.IsResettable)
92+
if (this.HasResettable)
9493
{
9594
foreach (ExecutorRegistration registration in this.Registrations.Values)
9695
{
96+
// TryResetAsync returns true if the executor does not need resetting
9797
if (!await registration.TryResetAsync().ConfigureAwait(false))
9898
{
9999
return false;
@@ -158,7 +158,7 @@ internal void TakeOwnership(object ownerToken, bool subworkflow = false, object?
158158
});
159159
}
160160

161-
this._needsReset = true;
161+
this._needsReset = this.HasResettable;
162162
this._ownedAsSubworkflow = subworkflow;
163163
}
164164

dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,11 +382,12 @@ public async Task BuildGroupChat_AgentsRunInOrderAsync(int maxIterations)
382382
}
383383

384384
private static async Task<(string UpdateText, List<ChatMessage>? Result)> RunWorkflowAsync(
385-
Workflow workflow, List<ChatMessage> input)
385+
Workflow workflow, List<ChatMessage> input, ExecutionEnvironment executionEnvironment = ExecutionEnvironment.InProcess_Lockstep)
386386
{
387387
StringBuilder sb = new();
388388

389-
await using StreamingRun run = await InProcessExecution.Lockstep.StreamAsync(workflow, input);
389+
IWorkflowExecutionEnvironment environment = executionEnvironment.ToWorkflowExecutionEnvironment();
390+
await using StreamingRun run = await environment.StreamAsync(workflow, input);
390391
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
391392

392393
WorkflowOutputEvent? output = null;

0 commit comments

Comments
 (0)