diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs index d4cdfcc452..8b527d5c44 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Specialized; using Microsoft.Extensions.AI; using Microsoft.Shared.Diagnostics; @@ -136,7 +137,12 @@ private static Workflow BuildConcurrentCore( // each agent's accumulator to it. If no aggregation function was provided, we default to returning // the last message from each agent aggregator ??= static lists => (from list in lists where list.Count > 0 select list.Last()).ToList(); - ConcurrentEndExecutor end = new(agentExecutors.Length, aggregator); + + Func> endFactory = + (string _, string __) => new(new ConcurrentEndExecutor(agentExecutors.Length, aggregator)); + + ExecutorIsh end = endFactory.ConfigureFactory(ConcurrentEndExecutor.ExecutorId); + builder.AddFanInEdge(end, sources: accumulators); builder = builder.WithOutputFrom(end); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorRegistration.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorRegistration.cs index 661cde559f..eef749966c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorRegistration.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorRegistration.cs @@ -13,24 +13,29 @@ internal sealed class ExecutorRegistration(string id, Type executorType, Executo public string Id { get; } = Throw.IfNullOrEmpty(id); public Type ExecutorType { get; } = Throw.IfNull(executorType); private ExecutorFactoryF ProviderAsync { get; } = Throw.IfNull(provider); - public bool IsNotExecutorInstance { get; } = rawData is not Executor; - public bool IsUnresettableSharedInstance { get; } = rawData is Executor executor && - // Cross-Run Shareable executors are "trivially" resettable, since they - // have no on-object state. - !executor.IsCrossRunShareable && - rawData is not IResettableExecutor; - public bool SupportsConcurrent { get; } = (rawData is not Executor executor || executor.IsCrossRunShareable) && - (rawData is not Workflow workflow || workflow.AllowConcurrent); + + public bool IsSharedInstance { get; } = rawData is Executor; + + /// + /// Gets a value whether instances of the executor created from this registration can be reset between subsequent + /// runs from the same instance. This value is not relevant for executors that . + /// + public bool SupportsResetting { get; } = rawData is Executor && + // Cross-Run Shareable executors are "trivially" resettable, since they + // have no on-object state. + rawData is IResettableExecutor; + + /// + /// Gets a value whether instances of the executor created from this registration can be used in concurrent runs + /// from the same instance. + /// + public bool SupportsConcurrent { get; } = rawData is not Executor executor || executor.IsCrossRunShareable; internal async ValueTask TryResetAsync() { - if (this.IsUnresettableSharedInstance) - { - return false; - } - - // If the executor supports concurrent use, then resetting is a no-op. - if (this.SupportsConcurrent) + // Non-shared instances do not need resetting + if (!this.IsSharedInstance) { return true; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/CollectChatMessagesExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/CollectChatMessagesExecutor.cs index 8653dfbab1..5a923b9c52 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/CollectChatMessagesExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/CollectChatMessagesExecutor.cs @@ -11,7 +11,7 @@ namespace Microsoft.Agents.AI.Workflows.Specialized; /// Provides an executor that batches received chat messages that it then releases when /// receiving a . /// -internal sealed class CollectChatMessagesExecutor(string id) : ChatProtocolExecutor(id), IResettableExecutor +internal sealed class CollectChatMessagesExecutor(string id) : ChatProtocolExecutor(id, declareCrossRunShareable: true), IResettableExecutor { /// protected override ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/ConcurrentEndExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/ConcurrentEndExecutor.cs index 7f509ef9fe..2fc4030a5c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/ConcurrentEndExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/ConcurrentEndExecutor.cs @@ -14,12 +14,14 @@ namespace Microsoft.Agents.AI.Workflows.Specialized; /// internal sealed class ConcurrentEndExecutor : Executor, IResettableExecutor { + public const string ExecutorId = "ConcurrentEnd"; + private readonly int _expectedInputs; private readonly Func>, List> _aggregator; private List> _allResults; private int _remaining; - public ConcurrentEndExecutor(int expectedInputs, Func>, List> aggregator) : base("ConcurrentEnd") + public ConcurrentEndExecutor(int expectedInputs, Func>, List> aggregator) : base(ExecutorId) { this._expectedInputs = expectedInputs; this._aggregator = Throw.IfNull(aggregator); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs index 941cc5783e..59dc49f143 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs @@ -14,7 +14,7 @@ namespace Microsoft.Agents.AI.Workflows.Specialized; /// Executor used to represent an agent in a handoffs workflow, responding to events. internal sealed class HandoffAgentExecutor( AIAgent agent, - string? handoffInstructions) : Executor(agent.GetDescriptiveId()), IResettableExecutor + string? handoffInstructions) : Executor(agent.GetDescriptiveId(), declareCrossRunShareable: true), IResettableExecutor { private static readonly JsonElement s_handoffSchema = AIFunctionFactory.Create( ([Description("The reason for the handoff")] string? reasonForHandoff) => { }).JsonSchema; @@ -70,9 +70,9 @@ protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) => List? roleChanges = allMessages.ChangeAssistantToUserForOtherParticipants(this._agent.DisplayName); await foreach (var update in this._agent.RunStreamingAsync(allMessages, - options: this._agentOptions, - cancellationToken: cancellationToken) - .ConfigureAwait(false)) + options: this._agentOptions, + cancellationToken: cancellationToken) + .ConfigureAwait(false)) { await AddUpdateAsync(update, cancellationToken).ConfigureAwait(false); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsEndExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsEndExecutor.cs index 1d825be665..eeabeb5d5a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsEndExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsEndExecutor.cs @@ -5,8 +5,10 @@ namespace Microsoft.Agents.AI.Workflows.Specialized; /// Executor used at the end of a handoff workflow to raise a final completed event. -internal sealed class HandoffsEndExecutor() : Executor("HandoffEnd"), IResettableExecutor +internal sealed class HandoffsEndExecutor() : Executor(ExecutorId, declareCrossRunShareable: true), IResettableExecutor { + public const string ExecutorId = "HandoffEnd"; + protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) => routeBuilder.AddHandler((handoff, context, cancellationToken) => context.YieldOutputAsync(handoff.Messages, cancellationToken)); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsStartExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsStartExecutor.cs index e7f6789edf..982b8aabf2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsStartExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffsStartExecutor.cs @@ -8,8 +8,10 @@ namespace Microsoft.Agents.AI.Workflows.Specialized; /// Executor used at the start of a handoffs workflow to accumulate messages and emit them as HandoffState upon receiving a turn token. -internal sealed class HandoffsStartExecutor() : ChatProtocolExecutor("HandoffStart", DefaultOptions), IResettableExecutor +internal sealed class HandoffsStartExecutor() : ChatProtocolExecutor(ExecutorId, DefaultOptions, declareCrossRunShareable: true), IResettableExecutor { + internal const string ExecutorId = "HandoffStart"; + private static ChatProtocolExecutorOptions DefaultOptions => new() { StringMessageChatRole = ChatRole.User diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs index a9d1005c0d..e727e30bac 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs @@ -13,7 +13,7 @@ public static partial class AgentWorkflowBuilder /// Provides an executor that batches received chat messages that it then publishes as the final result /// when receiving a . /// - internal sealed class OutputMessagesExecutor() : ChatProtocolExecutor("OutputMessages"), IResettableExecutor + internal sealed class OutputMessagesExecutor() : ChatProtocolExecutor("OutputMessages", declareCrossRunShareable: true), IResettableExecutor { protected override ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default) => context.YieldOutputAsync(messages, cancellationToken); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs index d2579871fa..3636500438 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs @@ -86,14 +86,14 @@ internal Workflow(string startExecutorId, string? name = null, string? descripti } private bool _needsReset; - private bool IsResettable => this.Registrations.Values.All(registration => !registration.IsUnresettableSharedInstance); - + private bool HasResettable => this.Registrations.Values.Any(registration => registration.SupportsResetting); private async ValueTask TryResetExecutorRegistrationsAsync() { - if (this.IsResettable) + if (this.HasResettable) { foreach (ExecutorRegistration registration in this.Registrations.Values) { + // TryResetAsync returns true if the executor does not need resetting if (!await registration.TryResetAsync().ConfigureAwait(false)) { return false; @@ -158,7 +158,7 @@ internal void TakeOwnership(object ownerToken, bool subworkflow = false, object? }); } - this._needsReset = true; + this._needsReset = this.HasResettable; this._ownedAsSubworkflow = subworkflow; } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs index a96f130a6c..0437fc7695 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs @@ -382,11 +382,12 @@ public async Task BuildGroupChat_AgentsRunInOrderAsync(int maxIterations) } private static async Task<(string UpdateText, List? Result)> RunWorkflowAsync( - Workflow workflow, List input) + Workflow workflow, List input, ExecutionEnvironment executionEnvironment = ExecutionEnvironment.InProcess_Lockstep) { StringBuilder sb = new(); - await using StreamingRun run = await InProcessExecution.Lockstep.StreamAsync(workflow, input); + IWorkflowExecutionEnvironment environment = executionEnvironment.ToWorkflowExecutionEnvironment(); + await using StreamingRun run = await environment.StreamAsync(workflow, input); await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); WorkflowOutputEvent? output = null; diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs index 33a0fbc13f..a0e57006ed 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs @@ -6,20 +6,23 @@ using System.IO; using System.Linq; using System.Runtime.CompilerServices; -using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.UnitTests; using Microsoft.Extensions.AI; namespace Microsoft.Agents.AI.Workflows.Sample; internal static class Step6EntryPoint { + public const string EchoAgentId = "echo"; + public const string EchoPrefix = "You said: "; + public static Workflow CreateWorkflow(int maxTurns) => AgentWorkflowBuilder .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = maxTurns }) - .AddParticipants(new HelloAgent(), new EchoAgent()) + .AddParticipants(new HelloAgent(), new TestEchoAgent(id: EchoAgentId, prefix: EchoPrefix)) .Build(); public static async ValueTask RunAsync(TextWriter writer, IWorkflowExecutionEnvironment environment, int maxSteps = 2) @@ -85,54 +88,3 @@ public override async IAsyncEnumerable RunStreamingAsync } internal sealed class HelloAgentThread() : InMemoryAgentThread(); - -internal sealed class EchoAgent(string id = nameof(EchoAgent)) : AIAgent -{ - public const string Prefix = "You said: "; - public const string DefaultId = nameof(EchoAgent); - - public override string Id => id; - public override string? Name => id; - - public override AgentThread GetNewThread() - => new EchoAgentThread(); - - public override AgentThread DeserializeThread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null) - => new EchoAgentThread(); - - public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) - { - IEnumerable update = [ - await this.RunStreamingAsync(messages, thread, options, cancellationToken) - .SingleAsync(cancellationToken) - .ConfigureAwait(false)]; - - return update.ToAgentRunResponse(); - } - - public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - var messagesList = messages as IReadOnlyCollection ?? messages.ToList(); - - if (messagesList.Count == 0) - { - throw new ArgumentException("No messages provided to echo.", nameof(messages)); - } - - StringBuilder collectedText = new(Prefix); - foreach (string messageText in messagesList.Select(message => message.Text) - .Where(text => !string.IsNullOrEmpty(text))) - { - collectedText.AppendLine(messageText); - } - - yield return new(ChatRole.Assistant, collectedText.ToString()) - { - AgentId = this.Id, - AuthorName = this.Name, - MessageId = Guid.NewGuid().ToString("N"), - }; - } -} - -internal sealed class EchoAgentThread() : InMemoryAgentThread(); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/07_GroupChat_Workflow_HostAsAgent.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/07_GroupChat_Workflow_HostAsAgent.cs index 7b6d86afb1..8144d1bd11 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/07_GroupChat_Workflow_HostAsAgent.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/07_GroupChat_Workflow_HostAsAgent.cs @@ -8,7 +8,10 @@ namespace Microsoft.Agents.AI.Workflows.Sample; internal static class Step7EntryPoint { - public static async ValueTask RunAsync(TextWriter writer, int maxSteps = 2) + public static string EchoAgentId => Step6EntryPoint.EchoAgentId; + public static string EchoPrefix => Step6EntryPoint.EchoPrefix; + + public static async ValueTask RunAsync(TextWriter writer, IWorkflowExecutionEnvironment environment, int maxSteps = 2) { Workflow workflow = Step6EntryPoint.CreateWorkflow(maxSteps); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/10_Sequential_HostAsAgent.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/10_Sequential_HostAsAgent.cs new file mode 100644 index 0000000000..4670f8b931 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/10_Sequential_HostAsAgent.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.UnitTests; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.Sample; + +internal static class Step10EntryPoint +{ + public static Workflow CreateWorkflow() + { + TestEchoAgent echoAgent = new("echo", "Echo"); + return AgentWorkflowBuilder.BuildSequential(echoAgent); + } + public static Workflow WorkflowInstance => CreateWorkflow(); + + public static async ValueTask RunAsync(TextWriter writer, IWorkflowExecutionEnvironment executionEnvironment, IEnumerable inputs) + { + AIAgent hostAgent = WorkflowInstance.AsAgent("echo-workflow", "EchoW", executionEnvironment: executionEnvironment); + + AgentThread thread = hostAgent.GetNewThread(); + foreach (string input in inputs) + { + AgentRunResponse response; + object? continuationToken = null; + do + { + response = await hostAgent.RunAsync(input, thread, new AgentRunOptions { ContinuationToken = continuationToken }); + } while ((continuationToken = response.ContinuationToken) is { }); + + foreach (ChatMessage message in response.Messages) + { + writer.WriteLine($"{message.AuthorName}: {message.Text}"); + } + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/11_Concurrent_HostAsAgent.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/11_Concurrent_HostAsAgent.cs new file mode 100644 index 0000000000..ca2ba46405 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/11_Concurrent_HostAsAgent.cs @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.UnitTests; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.Sample; + +internal static class Step11EntryPoint +{ + public const int AgentCount = 2; + + public const string EchoAgentIdPrefix = "echo-"; + public const string EchoAgentNamePrefix = "Echo"; + + public static string ExpectedOutputForInput(string input, int agentNumber) + => $"{EchoAgentNamePrefix}{agentNumber}: {input}"; + + public static Workflow CreateWorkflow() + { + TestEchoAgent[] echoAgents = Enumerable.Range(1, AgentCount) + .Select(i => new TestEchoAgent($"{EchoAgentIdPrefix}{i}", $"{EchoAgentNamePrefix}{i}")) + .ToArray(); + + return AgentWorkflowBuilder.BuildConcurrent(echoAgents); + } + public static Workflow WorkflowInstance => CreateWorkflow(); + + public static async ValueTask RunAsync(TextWriter writer, IWorkflowExecutionEnvironment executionEnvironment, IEnumerable inputs) + { + AIAgent hostAgent = WorkflowInstance.AsAgent("echo-workflow", "EchoW", executionEnvironment: executionEnvironment); + + AgentThread thread = hostAgent.GetNewThread(); + foreach (string input in inputs) + { + AgentRunResponse response; + object? continuationToken = null; + do + { + response = await hostAgent.RunAsync(input, thread, new AgentRunOptions { ContinuationToken = continuationToken }); + } while ((continuationToken = response.ContinuationToken) is { }); + + foreach (ChatMessage message in response.Messages) + { + writer.WriteLine($"{message.AuthorName}: {message.Text}"); + } + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/12_HandOff_HostAsAgent.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/12_HandOff_HostAsAgent.cs new file mode 100644 index 0000000000..8eb553b868 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/12_HandOff_HostAsAgent.cs @@ -0,0 +1,88 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.UnitTests; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.Sample; + +internal sealed class HandoffTestEchoAgent(string id, string name, string prefix = "") + : TestEchoAgent(id, name, prefix) +{ + protected override IEnumerable GetEpilogueMessages(AgentRunOptions? options = null) + { + if (options is ChatClientAgentRunOptions chatClientOptions && + chatClientOptions.ChatOptions != null) + { + IEnumerable? handoffs = chatClientOptions.ChatOptions + .Tools? + .Where(tool => tool.Name?.StartsWith(HandoffsWorkflowBuilder.FunctionPrefix, + StringComparison.OrdinalIgnoreCase) is true); + + if (handoffs != null) + { + AITool? handoff = handoffs.FirstOrDefault(); + if (handoff != null) + { + return [new(ChatRole.Assistant, [new FunctionCallContent(Guid.NewGuid().ToString("N"), handoff.Name)]) + { + AuthorName = this.DisplayName, + MessageId = Guid.NewGuid().ToString("N"), + CreatedAt = DateTime.UtcNow + }]; + } + } + } + + return base.GetEpilogueMessages(options); + } +} + +internal static class Step12EntryPoint +{ + public const int AgentCount = 2; + + public const string EchoAgentIdPrefix = "echo-"; + public const string EchoAgentNamePrefix = "Echo"; + + public static string EchoPrefixForAgent(int agentNumber) + => $"{agentNumber}:"; + + public static Workflow CreateWorkflow() + { + TestEchoAgent[] echoAgents = Enumerable.Range(1, AgentCount) + .Select(i => new HandoffTestEchoAgent($"{EchoAgentIdPrefix}{i}", $"{EchoAgentNamePrefix}{i}", EchoPrefixForAgent(i))) + .ToArray(); + + return new HandoffsWorkflowBuilder(echoAgents[0]) + .WithHandoff(echoAgents[0], echoAgents[1]) + .Build(); + } + + public static Workflow WorkflowInstance => CreateWorkflow(); + + public static async ValueTask RunAsync(TextWriter writer, IWorkflowExecutionEnvironment executionEnvironment, IEnumerable inputs) + { + AIAgent hostAgent = WorkflowInstance.AsAgent("echo-workflow", "EchoW", executionEnvironment: executionEnvironment); + + AgentThread thread = hostAgent.GetNewThread(); + foreach (string input in inputs) + { + AgentRunResponse response; + object? continuationToken = null; + do + { + response = await hostAgent.RunAsync(input, thread, new AgentRunOptions { ContinuationToken = continuationToken }); + } while ((continuationToken = response.ContinuationToken) is { }); + + foreach (ChatMessage message in response.Messages) + { + writer.WriteLine(message.Text); + } + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs index 7730063ec2..47733ea6af 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs @@ -194,23 +194,26 @@ internal async Task Test_RunSample_Step6Async(ExecutionEnvironment environment) Assert.Collection(lines, line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line), - line => Assert.Contains($"{EchoAgent.DefaultId}: {EchoAgent.Prefix}{HelloAgent.Greeting}", line) + line => Assert.Contains($"{Step6EntryPoint.EchoAgentId}: {Step6EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line) ); } - [Fact] - public async Task Test_RunSample_Step7Async() + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + [InlineData(ExecutionEnvironment.InProcess_Concurrent)] + internal async Task Test_RunSample_Step7Async(ExecutionEnvironment environment) { using StringWriter writer = new(); - await Step7EntryPoint.RunAsync(writer); + await Step7EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment()); string result = writer.ToString(); string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries); Assert.Collection(lines, line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line), - line => Assert.Contains($"{EchoAgent.DefaultId}: {EchoAgent.Prefix}{HelloAgent.Greeting}", line) + line => Assert.Contains($"{Step7EntryPoint.EchoAgentId}: {Step7EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line) ); } @@ -262,6 +265,110 @@ internal async Task Test_RunSample_Step9Async(ExecutionEnvironment environment) using StringWriter writer = new(); _ = await Step9EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment()); } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + [InlineData(ExecutionEnvironment.InProcess_Concurrent)] + internal async Task Test_RunSample_Step10Async(ExecutionEnvironment environment) + { + List inputs = ["1", "2", "3"]; + + using StringWriter writer = new(); + await Step10EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs); + + string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries); + Assert.Collection(lines, + inputs.Select(CreateValidator).ToArray()); + + Action CreateValidator(string expected) => actual => actual.Should().Be($"Echo: {expected}"); + } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + [InlineData(ExecutionEnvironment.InProcess_Concurrent)] + internal async Task Test_RunSample_Step11Async(ExecutionEnvironment environment) + { + List inputs = ["1", "2", "3"]; + + using StringWriter writer = new(); + await Step11EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs); + + string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries); + + Array.Sort(lines, StringComparer.OrdinalIgnoreCase); + + string[] expected = Enumerable.Range(1, Step11EntryPoint.AgentCount) + .SelectMany(agentNumber => inputs.Select(input => Step11EntryPoint.ExpectedOutputForInput(input, agentNumber))) + .ToArray(); + + Array.Sort(expected, StringComparer.OrdinalIgnoreCase); + + Assert.Collection(lines, + expected.Select(CreateValidator).ToArray()); + + Action CreateValidator(string expected) => actual => actual.Should().Be(expected); + } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + [InlineData(ExecutionEnvironment.InProcess_Concurrent)] + internal async Task Test_RunSample_Step12Async(ExecutionEnvironment environment) + { + List inputs = ["1", "2", "3"]; + + using StringWriter writer = new(); + await Step12EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment(), inputs); + + string[] lines = writer.ToString().Split(['\r', '\n'], StringSplitOptions.RemoveEmptyEntries); + + // The expectation is that each agent will echo each input along with every echo from previous agents + // E.g.: + // (user): 1 + // (a1): 1:1 + // (a2): 2:1 + // (a2): 2:1:1 + + // If there were three agents, it would then be followed by: + // (a3): 3:1 + // (a3): 3:1:1 + // (a3): 3:2:1 + // (a3): 3:2:1:1 + + string[] expected = inputs.SelectMany(input => EchoesForInput(input)).ToArray(); + + Console.Error.WriteLine("Expected lines: "); + foreach (string expectedLine in expected) + { + Console.Error.WriteLine($"\t{expectedLine}"); + } + + Console.Error.WriteLine("Actual lines: "); + foreach (string line in lines) + { + Console.Error.WriteLine($"\t{line}"); + } + + Assert.Collection(lines, + expected.Select(CreateValidator).ToArray()); + + IEnumerable EchoesForInput(string input) + { + List echoes = [$"{Step12EntryPoint.EchoPrefixForAgent(1)}{input}"]; + for (int i = 2; i <= Step12EntryPoint.AgentCount; i++) + { + string agentPrefix = Step12EntryPoint.EchoPrefixForAgent(i); + List newEchoes = [$"{agentPrefix}{input}", .. echoes.Select(echo => $"{agentPrefix}{echo}")]; + echoes.AddRange(newEchoes); + } + + return echoes; + } + + Action CreateValidator(string expected) => actual => actual.Should().Be(expected); + } } internal sealed class VerifyingPlaybackResponder diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestEchoAgent.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestEchoAgent.cs new file mode 100644 index 0000000000..369f08bd8b --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestEchoAgent.cs @@ -0,0 +1,97 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +internal class TestEchoAgent(string? id = null, string? name = null, string? prefix = null) : AIAgent +{ + public override string Id => id ?? base.Id; + public override string? Name => name ?? base.Name; + + public override AgentThread DeserializeThread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null) + { + return JsonSerializer.Deserialize(serializedThread, jsonSerializerOptions) ?? this.GetNewThread(); + } + + public override AgentThread GetNewThread() + { + return new EchoAgentThread(); + } + + private static ChatMessage UpdateThread(ChatMessage message, InMemoryAgentThread? thread = null) + { + thread?.MessageStore.Add(message); + + return message; + } + + private IEnumerable EchoMessages(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null) + { + foreach (ChatMessage message in messages) + { + UpdateThread(message, thread as InMemoryAgentThread); + } + + IEnumerable echoMessages + = from message in messages + where message.Role == ChatRole.User && + !string.IsNullOrEmpty(message.Text) + select + UpdateThread(new ChatMessage(ChatRole.Assistant, $"{prefix}{message.Text}") + { + AuthorName = this.DisplayName, + CreatedAt = DateTimeOffset.Now, + MessageId = Guid.NewGuid().ToString("N") + }, thread as InMemoryAgentThread); + + return echoMessages.Concat(this.GetEpilogueMessages(options).Select(m => UpdateThread(m, thread as InMemoryAgentThread))); + } + + protected virtual IEnumerable GetEpilogueMessages(AgentRunOptions? options = null) + { + return Enumerable.Empty(); + } + + public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + { + AgentRunResponse result = + new(this.EchoMessages(messages, thread, options).ToList()) + { + AgentId = this.Id, + CreatedAt = DateTimeOffset.Now, + ResponseId = Guid.NewGuid().ToString("N"), + }; + + return Task.FromResult(result); + } + + public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + string responseId = Guid.NewGuid().ToString("N"); + + foreach (ChatMessage message in this.EchoMessages(messages, thread, options).ToList()) + { + yield return + new(message.Role, message.Contents) + { + AgentId = this.Id, + AuthorName = message.AuthorName, + ResponseId = responseId, + MessageId = message.MessageId, + CreatedAt = message.CreatedAt + }; + } + } + + private sealed class EchoAgentThread : InMemoryAgentThread + { + } +}