diff --git a/dotnet/samples/GettingStarted/Workflows/_Foundational/01_ExecutorsAndEdges/Program.cs b/dotnet/samples/GettingStarted/Workflows/_Foundational/01_ExecutorsAndEdges/Program.cs index 68e2effd04..af1dcb50d9 100644 --- a/dotnet/samples/GettingStarted/Workflows/_Foundational/01_ExecutorsAndEdges/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/_Foundational/01_ExecutorsAndEdges/Program.cs @@ -20,7 +20,9 @@ public static class Program private static async Task Main() { // Create the executors - UppercaseExecutor uppercase = new(); + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + ReverseTextExecutor reverse = new(); // Build the workflow by connecting executors sequentially @@ -40,23 +42,6 @@ private static async Task Main() } } -/// -/// First executor: converts input text to uppercase. -/// -internal sealed class UppercaseExecutor() : Executor("UppercaseExecutor") -{ - /// - /// Processes the input message by converting it to uppercase. - /// - /// The input text to convert - /// Workflow context for accessing workflow services and adding events - /// The to monitor for cancellation requests. - /// The default is . - /// The input text converted to uppercase - public override ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) => - ValueTask.FromResult(message.ToUpperInvariant()); // The return value will be sent as a message along an edge to subsequent executors -} - /// /// Second executor: reverses the input text and completes the workflow. /// diff --git a/dotnet/samples/GettingStarted/Workflows/_Foundational/06_SubWorkflows/Program.cs b/dotnet/samples/GettingStarted/Workflows/_Foundational/06_SubWorkflows/Program.cs index de00c35ae8..7f9980e047 100644 --- a/dotnet/samples/GettingStarted/Workflows/_Foundational/06_SubWorkflows/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/_Foundational/06_SubWorkflows/Program.cs @@ -40,7 +40,7 @@ private static async Task Main() .Build(); // Step 2: Configure the sub-workflow as an executor for use in the parent workflow - ExecutorIsh subWorkflowExecutor = subWorkflow.ConfigureSubWorkflow("TextProcessingSubWorkflow"); + ExecutorBinding subWorkflowExecutor = subWorkflow.BindAsExecutor("TextProcessingSubWorkflow"); // Step 3: Build a main workflow that uses the sub-workflow as an executor Console.WriteLine("Building main workflow that uses the sub-workflow as an executor...\n"); diff --git a/dotnet/samples/GettingStarted/Workflows/_Foundational/07_MixedWorkflowAgentsAndExecutors/README.md b/dotnet/samples/GettingStarted/Workflows/_Foundational/07_MixedWorkflowAgentsAndExecutors/README.md index 1fd263888b..4ec203892b 100644 --- a/dotnet/samples/GettingStarted/Workflows/_Foundational/07_MixedWorkflowAgentsAndExecutors/README.md +++ b/dotnet/samples/GettingStarted/Workflows/_Foundational/07_MixedWorkflowAgentsAndExecutors/README.md @@ -138,7 +138,7 @@ I cannot process this request as it appears to contain unsafe content. ## What You'll Learn -1. **How to mix executors and agents** - Understanding that both are treated as `ExecutorIsh` internally +1. **How to mix executors and agents** - Understanding that both are treated as `ExecutorBinding` internally 2. **When to use executors vs agents** - Executors for deterministic logic, agents for AI-powered decisions 3. **How to process agent outputs** - Using executors to sync, format, or aggregate agent responses 4. **Building complex pipelines** - Chaining multiple heterogeneous components together diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowModelBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowModelBuilder.cs index 1d3826371d..d70b0841ce 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowModelBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowModelBuilder.cs @@ -19,12 +19,12 @@ public void Connect(IModeledAction source, IModeledAction target, Func CONNECT: {source.Id} => {target.Id}{(condition is null ? string.Empty : " (?)")}"); this.WorkflowBuilder.AddEdge( - GetExecutorIsh(source), - GetExecutorIsh(target), + GetExecutorBinding(source), + GetExecutorBinding(target), condition); } - private static ExecutorIsh GetExecutorIsh(IModeledAction action) => + private static ExecutorBinding GetExecutorBinding(IModeledAction action) => action switch { RequestPortAction port => port.RequestPort, diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/AIAgentBinding.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/AIAgentBinding.cs new file mode 100644 index 0000000000..171f7d9eec --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/AIAgentBinding.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows.Specialized; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Represents the workflow binding details for an AI agent, including configuration options for event emission. +/// +/// The AI agent. +/// Specifies whether the agent should emit events. If null, the default behavior is applied. +public record AIAgentBinding(AIAgent Agent, bool EmitEvents = false) + : ExecutorBinding(Throw.IfNull(Agent).Name ?? Throw.IfNull(Agent.Id), + (_) => new(new AIAgentHostExecutor(Agent, EmitEvents)), + typeof(AIAgentHostExecutor), + Agent) +{ + /// + public override bool IsSharedInstance => false; + + /// + public override bool SupportsConcurrentSharedExecution => true; + + /// + public override bool SupportsResetting => false; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs index 8b527d5c44..81d254aa53 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs @@ -40,7 +40,7 @@ private static Workflow BuildSequentialCore(string? workflowName, params IEnumer // Create a builder that chains the agents together in sequence. The workflow simply begins // with the first agent in the sequence. WorkflowBuilder? builder = null; - ExecutorIsh? previous = null; + ExecutorBinding? previous = null; foreach (var agent in agents) { AgentRunStreamingExecutor agentExecutor = new(agent, includeInputInOutput: true); @@ -125,8 +125,8 @@ private static Workflow BuildConcurrentCore( // so that the final accumulator receives a single list of messages from each agent. Otherwise, the // accumulator would not be able to determine what came from what agent, as there's currently no // provenance tracking exposed in the workflow context passed to a handler. - ExecutorIsh[] agentExecutors = (from agent in agents select (ExecutorIsh)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray(); - ExecutorIsh[] accumulators = [.. from agent in agentExecutors select (ExecutorIsh)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")]; + ExecutorBinding[] agentExecutors = (from agent in agents select (ExecutorBinding)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray(); + ExecutorBinding[] accumulators = [.. from agent in agentExecutors select (ExecutorBinding)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")]; builder.AddFanOutEdge(start, targets: agentExecutors); for (int i = 0; i < agentExecutors.Length; i++) { @@ -141,7 +141,7 @@ private static Workflow BuildConcurrentCore( Func> endFactory = (string _, string __) => new(new ConcurrentEndExecutor(agentExecutors.Length, aggregator)); - ExecutorIsh end = endFactory.ConfigureFactory(ConcurrentEndExecutor.ExecutorId); + ExecutorBinding end = endFactory.BindExecutor(ConcurrentEndExecutor.ExecutorId); builder.AddFanInEdge(end, sources: accumulators); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/ExecutorInfo.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/ExecutorInfo.cs index c9994a91d0..6e019b4928 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/ExecutorInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/ExecutorInfo.cs @@ -12,7 +12,7 @@ public bool IsMatch(Executor executor) => this.ExecutorType.IsMatch(executor.GetType()) && this.ExecutorId == executor.Id; - public bool IsMatch(ExecutorRegistration registration) => - this.ExecutorType.IsMatch(registration.ExecutorType) - && this.ExecutorId == registration.Id; + public bool IsMatch(ExecutorBinding binding) => + this.ExecutorType.IsMatch(binding.ExecutorType) + && this.ExecutorId == binding.Id; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/RepresentationExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/RepresentationExtensions.cs index 1878cbdd21..3d76c965bd 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/RepresentationExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/RepresentationExtensions.cs @@ -9,10 +9,10 @@ namespace Microsoft.Agents.AI.Workflows.Checkpointing; internal static class RepresentationExtensions { - public static ExecutorInfo ToExecutorInfo(this ExecutorRegistration registration) + public static ExecutorInfo ToExecutorInfo(this ExecutorBinding binding) { - Throw.IfNull(registration); - return new ExecutorInfo(new TypeId(registration.ExecutorType), registration.Id); + Throw.IfNull(binding); + return new ExecutorInfo(new TypeId(binding.ExecutorType), binding.Id); } public static EdgeInfo ToEdgeInfo(this Edge edge) @@ -38,8 +38,8 @@ public static WorkflowInfo ToWorkflowInfo(this Workflow workflow) Throw.IfNull(workflow); Dictionary executors = - workflow.Registrations.Values.ToDictionary( - keySelector: registration => registration.Id, + workflow.ExecutorBindings.Values.ToDictionary( + keySelector: binding => binding.Id, elementSelector: ToExecutorInfo); Dictionary> edges = workflow.Edges.Keys.ToDictionary( diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfo.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfo.cs index 59a41db405..4a1447d8bf 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfo.cs @@ -11,13 +11,13 @@ internal sealed class WorkflowInfo { [JsonConstructor] internal WorkflowInfo( - Dictionary executors, + Dictionary executorBindings, Dictionary> edges, HashSet requestPorts, string startExecutorId, HashSet? outputExecutorIds) { - this.Executors = Throw.IfNull(executors); + this.ExecutorBindings = Throw.IfNull(executorBindings); this.Edges = Throw.IfNull(edges); this.RequestPorts = Throw.IfNull(requestPorts); @@ -25,7 +25,7 @@ internal WorkflowInfo( this.OutputExecutorIds = outputExecutorIds ?? []; } - public Dictionary Executors { get; } + public Dictionary ExecutorBindings { get; } public Dictionary> Edges { get; } public HashSet RequestPorts { get; } @@ -47,10 +47,10 @@ public bool IsMatch(Workflow workflow) } // Validate the executors - if (workflow.Registrations.Count != this.Executors.Count || - this.Executors.Keys.Any( - executorId => workflow.Registrations.TryGetValue(executorId, out ExecutorRegistration? registration) - && !this.Executors[executorId].IsMatch(registration))) + if (workflow.ExecutorBindings.Count != this.ExecutorBindings.Count || + this.ExecutorBindings.Keys.Any( + executorId => workflow.ExecutorBindings.TryGetValue(executorId, out ExecutorBinding? registration) + && !this.ExecutorBindings[executorId].IsMatch(registration))) { return false; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ConfiguredExecutorBinding.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ConfiguredExecutorBinding.cs new file mode 100644 index 0000000000..cfbfeba355 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ConfiguredExecutorBinding.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Threading.Tasks; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +// TODO: Unwrap the Configured object, just like for SubworkflowBinding +internal record ConfiguredExecutorBinding(Configured ConfiguredExecutor, Type ExecutorType) + : ExecutorBinding(Throw.IfNull(ConfiguredExecutor).Id, + ConfiguredExecutor.BoundFactoryAsync, + ExecutorType, + ConfiguredExecutor.Raw) +{ + /// + public override bool IsSharedInstance { get; } = ConfiguredExecutor.Raw is Executor; + + protected override async ValueTask ResetCoreAsync() + { + if (this.ConfiguredExecutor.Raw is IResettableExecutor resettable) + { + await resettable.ResetAsync().ConfigureAwait(false); + } + + return false; + } + + /// + public override bool SupportsConcurrentSharedExecution => true; + + /// + public override bool SupportsResetting => false; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index 4c2821476a..e0b53429f9 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -27,6 +27,8 @@ public abstract class Executor : IIdentified private static readonly string s_namespace = typeof(Executor).Namespace!; private static readonly ActivitySource s_activitySource = new(s_namespace); + // TODO: Add overloads for binding with a configuration/options object once the Configured hierarchy goes away. + /// /// Initialize the executor with a unique identifier /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorBinding.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorBinding.cs new file mode 100644 index 0000000000..f4c196426c --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorBinding.cs @@ -0,0 +1,129 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Represents the binding information for a workflow executor, including its identifier, factory method, type, and +/// optional raw value. +/// +/// The unique identifier for the executor in the workflow. +/// A factory function that creates an instance of the executor. The function accepts two string parameters and returns +/// a ValueTask containing the created Executor instance. +/// The type of the executor. Must be a type derived from Executor. +/// An optional raw value associated with the binding. +public abstract record class ExecutorBinding(string Id, Func>? FactoryAsync, Type ExecutorType, object? RawValue = null) + : IIdentified, + IEquatable, + IEquatable +{ + /// + /// Gets a value indicating whether the binding is a placeholder (i.e., does not have a factory method defined). + /// + [MemberNotNullWhen(false, nameof(FactoryAsync))] + public bool IsPlaceholder => this.FactoryAsync == null; + + /// + /// Gets a value whether the executor created from this binding is a shared instance across all runs. + /// + public abstract bool IsSharedInstance { get; } + + /// + /// Gets a value whether instances of the executor created from this binding can be used in concurrent runs + /// from the same instance. + /// + public abstract bool SupportsConcurrentSharedExecution { get; } + + /// + /// Gets a value whether instances of the executor created from this binding can be reset between subsequent + /// runs from the same instance. This value is not relevant for executors that . + /// + public abstract bool SupportsResetting { get; } + + /// + public override string ToString() => $"{this.Id}:{(this.IsPlaceholder ? ":" : this.ExecutorType.Name)}"; + + private Executor CheckId(Executor executor) + { + if (executor.Id != this.Id) + { + throw new InvalidOperationException( + $"Executor ID mismatch: expected '{this.Id}', but got '{executor.Id}'."); + } + + return executor; + } + + internal async ValueTask CreateInstanceAsync(string runId) + => !this.IsPlaceholder + ? this.CheckId(await this.FactoryAsync(runId).ConfigureAwait(false)) + : throw new InvalidOperationException( + $"Cannot create executor with ID '{this.Id}': Binding ({this.GetType().Name}) is a placeholder."); + + /// + public virtual bool Equals(ExecutorBinding? other) => + other is not null && other.Id == this.Id; + + /// + public bool Equals(IIdentified? other) => + other is not null && other.Id == this.Id; + + /// + public bool Equals(string? other) => + other is not null && other == this.Id; + + internal ValueTask TryResetAsync() + { + // Non-shared instances do not need resetting + if (!this.IsSharedInstance) + { + return new(true); + } + + // If the executor supports concurrent use, then resetting is a no-op. + if (!this.SupportsResetting) + { + return new(false); + } + + return this.ResetCoreAsync(); + } + + /// + /// Resets the executor's shared resources to their initial state. Must be overridden by bindings that support + /// resetting. + /// + /// + protected virtual ValueTask ResetCoreAsync() => throw new InvalidOperationException("ExecutorBindings that support resetting must override ResetCoreAsync()"); + + /// + public override int GetHashCode() => this.Id.GetHashCode(); + + /// + /// Defines an implicit conversion from an Executor to a . + /// + /// The Executor instance to convert. + public static implicit operator ExecutorBinding(Executor executor) => executor.BindExecutor(); + + /// + /// Defines an implicit conversion from a string identifier to an . + /// + /// The string identifier to convert to a placeholder. + public static implicit operator ExecutorBinding(string id) => new ExecutorPlaceholder(id); + + /// + /// Defines an implicit conversion from a to an . + /// + /// The RequestPort instance to convert. + public static implicit operator ExecutorBinding(RequestPort port) => port.BindAsExecutor(); + + /// + /// Defines an implicit conversion from an to an instance. + /// + /// + public static implicit operator ExecutorBinding(AIAgent agent) => agent.BindAsExecutor(); +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorBindingExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorBindingExtensions.cs new file mode 100644 index 0000000000..c308ce8ea1 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorBindingExtensions.cs @@ -0,0 +1,434 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.ComponentModel; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Extension methods for configuring executors and functions as instances. +/// +public static class ExecutorBindingExtensions +{ + /// + /// Configures an instance for use in a workflow. + /// + /// + /// Note that Executor Ids must be unique within a workflow. + /// + /// The executor instance. + /// An instance wrapping the specified . + public static ExecutorBinding BindExecutor(this Executor executor) + => new ExecutorInstanceBinding(executor); + + /// + /// Configures a factory method for creating an of type , using the + /// type name as the id. + /// + /// + /// Note that Executor Ids must be unique within a workflow. + /// + /// Although this will generally result in a delay-instantiated once messages are available + /// for it, it will be instantiated if a for the is requested, + /// and it is the starting executor. + /// + /// The type of the resulting executor + /// The factory method. + /// An instance that resolves to the result of the factory call when messages get sent to it. + public static ExecutorBinding BindExecutor(this Func> factoryAsync) + where TExecutor : Executor + => BindExecutor((config, runId) => factoryAsync(config.Id, runId), id: typeof(TExecutor).Name, options: null); + + /// + /// Configures a factory method for creating an of type , using the + /// type name as the id. + /// + /// + /// Note that Executor Ids must be unique within a workflow. + /// + /// Although this will generally result in a delay-instantiated once messages are available + /// for it, it will be instantiated if a for the is requested, + /// and it is the starting executor. + /// + /// The type of the resulting executor + /// The factory method. + /// An instance that resolves to the result of the factory call when messages get sent to it. + [Obsolete("Use BindExecutor() instead.")] + [EditorBrowsable(EditorBrowsableState.Never)] + public static ExecutorBinding ConfigureFactory(this Func> factoryAsync) + where TExecutor : Executor + => factoryAsync.BindExecutor(); + + /// + /// Configures a factory method for creating an of type , with + /// the specified id. + /// + /// + /// Although this will generally result in a delay-instantiated once messages are available + /// for it, it will be instantiated if a for the is requested, + /// and it is the starting executor. + /// + /// The type of the resulting executor + /// The factory method. + /// An id for the executor to be instantiated. + /// An instance that resolves to the result of the factory call when messages get sent to it. + public static ExecutorBinding BindExecutor(this Func> factoryAsync, string id) + where TExecutor : Executor + => BindExecutor((_, runId) => factoryAsync(id, runId), id, options: null); + + /// + /// Configures a factory method for creating an of type , with + /// the specified id. + /// + /// + /// Although this will generally result in a delay-instantiated once messages are available + /// for it, it will be instantiated if a for the is requested, + /// and it is the starting executor. + /// + /// The type of the resulting executor + /// The factory method. + /// An id for the executor to be instantiated. + /// An instance that resolves to the result of the factory call when messages get sent to it. + [Obsolete("Use BindExecutor() instead.")] + [EditorBrowsable(EditorBrowsableState.Never)] + public static ExecutorBinding ConfigureFactory(this Func> factoryAsync, string id) + where TExecutor : Executor + => factoryAsync.BindExecutor(id); + + /// + /// Configures a factory method for creating an of type , with + /// the specified id and options. + /// + /// + /// Although this will generally result in a delay-instantiated once messages are available + /// for it, it will be instantiated if a for the is requested, + /// and it is the starting executor. + /// + /// The type of the resulting executor + /// The type of options object to be passed to the factory method. + /// The factory method. + /// An id for the executor to be instantiated. + /// An optional parameter specifying the options. + /// An instance that resolves to the result of the factory call when messages get sent to it. + public static ExecutorBinding BindExecutor(this Func, string, ValueTask> factoryAsync, string id, TOptions? options = null) + where TExecutor : Executor + where TOptions : ExecutorOptions + { + Configured configured = new(factoryAsync, id, options); + + return new ConfiguredExecutorBinding(configured.Super(), typeof(TExecutor)); + } + + /// + /// Configures a factory method for creating an of type , with + /// the specified id and options. + /// + /// + /// Although this will generally result in a delay-instantiated once messages are available + /// for it, it will be instantiated if a for the is requested, + /// and it is the starting executor. + /// + /// The type of the resulting executor + /// The type of options object to be passed to the factory method. + /// The factory method. + /// An id for the executor to be instantiated. + /// An optional parameter specifying the options. + /// An instance that resolves to the result of the factory call when messages get sent to it. + [Obsolete("Use RegisterExecutor() instead")] + [EditorBrowsable(EditorBrowsableState.Never)] + public static ExecutorBinding ConfigureFactory(this Func, string, ValueTask> factoryAsync, string id, TOptions? options = null) + where TExecutor : Executor + where TOptions : ExecutorOptions + => factoryAsync.BindExecutor(id, options); + + private static ConfiguredExecutorBinding ToBinding(this FunctionExecutor executor, Delegate raw) + => new(Configured.FromInstance(executor, raw: raw) + .Super, Executor>(), + typeof(FunctionExecutor)); + + private static ConfiguredExecutorBinding ToBinding(this FunctionExecutor executor, Delegate raw) + => new(Configured.FromInstance(executor, raw: raw) + .Super, Executor>(), + typeof(FunctionExecutor)); + + /// + /// Configures a sub-workflow executor for the specified workflow, using the provided identifier and options. + /// + /// The workflow instance to be executed as a sub-workflow. Cannot be null. + /// A unique identifier for the sub-workflow execution. Used to distinguish this sub-workflow instance. + /// Optional configuration options for the sub-workflow executor. If null, default options are used. + /// An ExecutorRegistration instance representing the configured sub-workflow executor. + [Obsolete("Use AsExecutor() instead")] + [EditorBrowsable(EditorBrowsableState.Never)] + public static ExecutorBinding ConfigureSubWorkflow(this Workflow workflow, string id, ExecutorOptions? options = null) + => workflow.BindAsExecutor(id, options); + + /// + /// Configures a sub-workflow executor for the specified workflow, using the provided identifier and options. + /// + /// The workflow instance to be executed as a sub-workflow. Cannot be null. + /// A unique identifier for the sub-workflow execution. Used to distinguish this sub-workflow instance. + /// Optional configuration options for the sub-workflow executor. If null, default options are used. + /// An instance representing the configured sub-workflow executor. + public static ExecutorBinding BindAsExecutor(this Workflow workflow, string id, ExecutorOptions? options = null) + => new SubworkflowBinding(workflow, id, options); + + /// + /// Configures a function-based asynchronous message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// A delegate that defines the asynchronous function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) + => new FunctionExecutor(id, messageHandlerAsync, options, declareCrossRunShareable: threadsafe).ToBinding(messageHandlerAsync); + + /// + /// Configures a function-based asynchronous message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// A delegate that defines the asynchronous function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func)((input, _, __) => messageHandlerAsync(input))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based asynchronous message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// A delegate that defines the asynchronous function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func)((input, ctx, __) => messageHandlerAsync(input, ctx))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based asynchronous message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// A delegate that defines the asynchronous function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func)((input, _, ct) => messageHandlerAsync(input, ct))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// A delegate that defines the function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Action messageHandler, string id, ExecutorOptions? options = null, bool threadsafe = false) + => new FunctionExecutor(id, messageHandler, options, declareCrossRunShareable: threadsafe).ToBinding(messageHandler); + + /// + /// Configures a function-based message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// A delegate that defines the function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Action messageHandler, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Action)((input, _, __) => messageHandler(input))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// A delegate that defines the function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Action messageHandler, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Action)((input, ctx, __) => messageHandler(input, ctx))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// A delegate that defines the function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Action messageHandler, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Action)((input, _, ct) => messageHandler(input, ct))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based asynchronous message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// The type of output message. + /// A delegate that defines the asynchronous function to execute for each input message. + /// A unique identifier for the executor. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func> messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) + => new FunctionExecutor(Throw.IfNull(id), messageHandlerAsync, options, declareCrossRunShareable: threadsafe).ToBinding(messageHandlerAsync); + + /// + /// Configures a function-based asynchronous message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// The type of output message. + /// A delegate that defines the asynchronous function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func> messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func>)((input, _, __) => messageHandlerAsync(input))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based asynchronous message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// The type of output message. + /// A delegate that defines the asynchronous function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func> messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func>)((input, ctx, __) => messageHandlerAsync(input, ctx))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based asynchronous message handler as an executor with the specified identifier and + /// options. + /// + /// The type of input message. + /// The type of output message. + /// A delegate that defines the asynchronous function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func> messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func>)((input, _, ct) => messageHandlerAsync(input, ct))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based message handler as an executor with the specified identifier and options. + /// + /// The type of input message. + /// The type of output message. + /// A delegate that defines the function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func messageHandler, string id, ExecutorOptions? options = null, bool threadsafe = false) + => new FunctionExecutor(id, messageHandler, options, declareCrossRunShareable: threadsafe).ToBinding(messageHandler); + + /// + /// Configures a function-based message handler as an executor with the specified identifier and options. + /// + /// The type of input message. + /// The type of output message. + /// A delegate that defines the function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func messageHandler, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func)((input, _, __) => messageHandler(input))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based message handler as an executor with the specified identifier and options. + /// + /// The type of input message. + /// The type of output message. + /// A delegate that defines the function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func messageHandler, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func)((input, ctx, __) => messageHandler(input, ctx))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based message handler as an executor with the specified identifier and options. + /// + /// The type of input message. + /// The type of output message. + /// A delegate that defines the function to execute for each input message. + /// An optional unique identifier for the executor. If null, will use the function argument as an id. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func messageHandler, string id, ExecutorOptions? options = null, bool threadsafe = false) + => ((Func)((input, _, ct) => messageHandler(input, ct))) + .BindAsExecutor(id, options, threadsafe); + + /// + /// Configures a function-based aggregating executor with the specified identifier and options. + /// + /// The type of input message. + /// The type of the accumulating object. + /// A delegate the defines the aggregation procedure + /// A unique identifier for the executor. + /// Configuration options for the executor. If null, default options will be used. + /// Declare that the message handler may be used simultaneously by multiple runs concurrently. + /// An instance that wraps the provided asynchronous message handler and configuration. + public static ExecutorBinding BindAsExecutor(this Func aggregatorFunc, string id, ExecutorOptions? options = null, bool threadsafe = false) + => new AggregatingExecutor(id, aggregatorFunc, options, declareCrossRunShareable: threadsafe); + + /// + /// Configure an as an executor for use in a workflow. + /// + /// The agent instance. + /// Specifies whether the agent should emit streaming events. + /// An instance that wraps the provided agent. + public static ExecutorBinding BindAsExecutor(this AIAgent agent, bool emitEvents = false) + => new AIAgentBinding(agent, emitEvents); + + /// + /// Configure a as an executor for use in a workflow. + /// + /// The port configuration. + /// Specifies whether the port should accept requests already wrapped in + /// . + /// A instance that wraps the provided port. + public static ExecutorBinding BindAsExecutor(this RequestPort port, bool allowWrappedRequests = true) + => new RequestPortBinding(port, allowWrappedRequests); +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorInstanceBinding.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorInstanceBinding.cs new file mode 100644 index 0000000000..916e28a930 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorInstanceBinding.cs @@ -0,0 +1,39 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Represents the workflow binding details for a shared executor instance, including configuration options +/// for event emission. +/// +/// The executor instance to bind. Cannot be null. +public record ExecutorInstanceBinding(Executor ExecutorInstance) + : ExecutorBinding(Throw.IfNull(ExecutorInstance).Id, + (_) => new(ExecutorInstance), + ExecutorInstance.GetType(), + ExecutorInstance) +{ + /// + public override bool SupportsConcurrentSharedExecution => this.ExecutorInstance.IsCrossRunShareable; + + /// + public override bool SupportsResetting => this.ExecutorInstance is IResettableExecutor; + + /// + public override bool IsSharedInstance => true; + + /// + protected override async ValueTask ResetCoreAsync() + { + if (this.ExecutorInstance is IResettableExecutor resettable) + { + await resettable.ResetAsync().ConfigureAwait(false); + return true; + } + + return false; + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorIsh.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorIsh.cs deleted file mode 100644 index 63807c5576..0000000000 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorIsh.cs +++ /dev/null @@ -1,369 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Agents.AI.Workflows.Specialized; -using Microsoft.Shared.Diagnostics; - -namespace Microsoft.Agents.AI.Workflows; - -/// -/// Extension methods for configuring executors and functions as instances. -/// -public static class ExecutorIshConfigurationExtensions -{ - /// - /// Configures a factory method for creating an of type , using the - /// type name as the id. - /// - /// - /// Note that Executor Ids must be unique within a workflow. - /// - /// Although this will generally result in a delay-instantiated once messages are available - /// for it, it will be instantiated if a for the is requested, - /// and it is the starting executor. - /// - /// The type of the resulting executor - /// The factory method. - /// An ExecutorIsh instance that resolves to the result of the factory call when messages get sent to it. - public static ExecutorIsh ConfigureFactory(this Func> factoryAsync) - where TExecutor : Executor - => ConfigureFactory((config, runId) => factoryAsync(config.Id, runId), typeof(TExecutor).Name, options: null); - - /// - /// Configures a factory method for creating an of type , with - /// the specified id. - /// - /// - /// Although this will generally result in a delay-instantiated once messages are available - /// for it, it will be instantiated if a for the is requested, - /// and it is the starting executor. - /// - /// The type of the resulting executor - /// The factory method. - /// An id for the executor to be instantiated. - /// An ExecutorIsh instance that resolves to the result of the factory call when messages get sent to it. - public static ExecutorIsh ConfigureFactory(this Func> factoryAsync, string id) - where TExecutor : Executor - => ConfigureFactory((_, runId) => factoryAsync(id, runId), id, options: null); - - /// - /// Configures a factory method for creating an of type , with - /// the specified id and options. - /// - /// - /// Although this will generally result in a delay-instantiated once messages are available - /// for it, it will be instantiated if a for the is requested, - /// and it is the starting executor. - /// - /// The type of the resulting executor - /// The type of options object to be passed to the factory method. - /// The factory method. - /// An id for the executor to be instantiated. - /// An optional parameter specifying the options. - /// An ExecutorIsh instance that resolves to the result of the factory call when messages get sent to it. - public static ExecutorIsh ConfigureFactory(this Func, string, ValueTask> factoryAsync, string id, TOptions? options = null) - where TExecutor : Executor - where TOptions : ExecutorOptions - { - Configured configured = new(factoryAsync, id, options); - - return new ExecutorIsh(configured.Super(), typeof(TExecutor), ExecutorIsh.Type.Executor); - } - - private static ExecutorIsh ToExecutorIsh(this FunctionExecutor executor, Delegate raw) => new(Configured.FromInstance(executor, raw: raw) - .Super, Executor>(), - typeof(FunctionExecutor), - ExecutorIsh.Type.Function); - - private static ExecutorIsh ToExecutorIsh(this FunctionExecutor executor, Delegate raw) => new(Configured.FromInstance(executor, raw: raw) - .Super, Executor>(), - typeof(FunctionExecutor), - ExecutorIsh.Type.Function); - - /// - /// Configures a sub-workflow executor for the specified workflow, using the provided identifier and options. - /// - /// The workflow instance to be executed as a sub-workflow. Cannot be null. - /// A unique identifier for the sub-workflow execution. Used to distinguish this sub-workflow instance. - /// Optional configuration options for the sub-workflow executor. If null, default options are used. - /// An ExecutorIsh instance representing the configured sub-workflow executor. - public static ExecutorIsh ConfigureSubWorkflow(this Workflow workflow, string id, ExecutorOptions? options = null) - { - object ownershipToken = new(); - workflow.TakeOwnership(ownershipToken, subworkflow: true); - - Configured configured = new(InitHostExecutorAsync, id, options, raw: workflow); - return new ExecutorIsh(configured.Super(), typeof(WorkflowHostExecutor), ExecutorIsh.Type.Workflow); - - ValueTask InitHostExecutorAsync(Config config, string runId) - { - return new(new WorkflowHostExecutor(config.Id, workflow, runId, ownershipToken, config.Options)); - } - } - - /// - /// Configures a function-based asynchronous message handler as an executor with the specified identifier and - /// options. - /// - /// The type of input message. - /// A delegate that defines the asynchronous function to execute for each input message. - /// A optional unique identifier for the executor. If null, will use the function argument as an id. - /// Configuration options for the executor. If null, default options will be used. - /// Declare that the message handler may be used simultaneously by multiple runs concurrently. - /// An ExecutorIsh instance that wraps the provided asynchronous message handler and configuration. - public static ExecutorIsh AsExecutor(this Func messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) - => new FunctionExecutor(id, messageHandlerAsync, options, declareCrossRunShareable: threadsafe).ToExecutorIsh(messageHandlerAsync); - - /// - /// Configures a function-based asynchronous message handler as an executor with the specified identifier and - /// options. - /// - /// The type of input message. - /// The type of output message. - /// A delegate that defines the asynchronous function to execute for each input message. - /// A unique identifier for the executor. - /// Configuration options for the executor. If null, default options will be used. - /// Declare that the message handler may be used simultaneously by multiple runs concurrently. - /// An ExecutorIsh instance that wraps the provided asynchronous message handler and configuration. - public static ExecutorIsh AsExecutor(this Func> messageHandlerAsync, string id, ExecutorOptions? options = null, bool threadsafe = false) - => new FunctionExecutor(Throw.IfNull(id), messageHandlerAsync, options, declareCrossRunShareable: threadsafe).ToExecutorIsh(messageHandlerAsync); - - /// - /// Configures a function-based aggregating executor with the specified identifier and options. - /// - /// The type of input message. - /// The type of the accumulating object. - /// A delegate the defines the aggregation procedure - /// A unique identifier for the executor. - /// Configuration options for the executor. If null, default options will be used. - /// Declare that the message handler may be used simultaneously by multiple runs concurrently. - /// An ExecutorIsh instance that wraps the provided asynchronous message handler and configuration. - public static ExecutorIsh AsExecutor(this Func aggregatorFunc, string id, ExecutorOptions? options = null, bool threadsafe = false) - => new AggregatingExecutor(id, aggregatorFunc, options, declareCrossRunShareable: threadsafe); -} - -/// -/// A tagged union representing an object that can function like an in a , -/// or a reference to one by ID. -/// -public sealed class ExecutorIsh : - IIdentified, - IEquatable, - IEquatable, - IEquatable -{ - /// - /// The type of the . - /// - public enum Type - { - /// - /// An unbound executor reference, identified only by ID. - /// - Unbound, - /// - /// An actual instance. - /// - Executor, - /// - /// A function delegate to be wrapped as an executor. - /// - Function, - /// - /// An for servicing external requests. - /// - RequestPort, - /// - /// An instance. - /// - Agent, - /// - /// A nested instance. - /// - Workflow, - } - - /// - /// Gets the type of data contained in this instance. - /// - public Type ExecutorType { get; init; } - - private readonly string? _idValue; - - private readonly Configured? _configuredExecutor; - private readonly System.Type? _configuredExecutorType; - - internal readonly RequestPort? _requestPortValue; - private readonly AIAgent? _aiAgentValue; - - /// - /// Initializes a new instance of the class as an unbound reference by ID. - /// - /// A unique identifier for an in the - public ExecutorIsh(string id) - { - this.ExecutorType = Type.Unbound; - this._idValue = Throw.IfNull(id); - } - - internal ExecutorIsh(Configured configured, System.Type configuredExecutorType, Type type) - { - this.ExecutorType = type; - this._configuredExecutor = configured; - this._configuredExecutorType = configuredExecutorType; - } - - /// - /// Initializes a new instance of the ExecutorIsh class using the specified executor. - /// - /// The executor instance to be wrapped. - public ExecutorIsh(Executor executor) - { - this.ExecutorType = Type.Executor; - this._configuredExecutor = Configured.FromInstance(Throw.IfNull(executor)); - this._configuredExecutorType = executor.GetType(); - } - - /// - /// Initializes a new instance of the ExecutorIsh class using the specified input port. - /// - /// The input port to associate to be wrapped. - public ExecutorIsh(RequestPort port) - { - this.ExecutorType = Type.RequestPort; - this._requestPortValue = Throw.IfNull(port); - } - - /// - /// Initializes a new instance of the ExecutorIsh class using the specified AI agent. - /// - /// - public ExecutorIsh(AIAgent aiAgent) - { - this.ExecutorType = Type.Agent; - this._aiAgentValue = Throw.IfNull(aiAgent); - } - - internal bool IsUnbound => this.ExecutorType == Type.Unbound; - - /// - public string Id => this.ExecutorType switch - { - Type.Unbound => this._idValue ?? throw new InvalidOperationException("This ExecutorIsh is unbound and has no ID."), - Type.Executor => this._configuredExecutor!.Id, - Type.RequestPort => this._requestPortValue!.Id, - Type.Agent => this._aiAgentValue!.Id, - Type.Function => this._configuredExecutor!.Id, - Type.Workflow => this._configuredExecutor!.Id, - _ => throw new InvalidOperationException($"Unknown ExecutorIsh type: {this.ExecutorType}") - }; - - internal object? RawData => this.ExecutorType switch - { - Type.Unbound => this._idValue, - Type.Executor => this._configuredExecutor!.Raw ?? this._configuredExecutor, - Type.RequestPort => this._requestPortValue, - Type.Agent => this._aiAgentValue, - Type.Function => this._configuredExecutor!.Raw ?? this._configuredExecutor, - Type.Workflow => this._configuredExecutor!.Raw ?? this._configuredExecutor, - _ => throw new InvalidOperationException($"Unknown ExecutorIsh type: {this.ExecutorType}") - }; - - /// - /// Gets the registration details for the current executor. - /// - /// The returned registration depends on the type of the executor. If the executor is unbound, an - /// is thrown. For other executor types, the registration includes the - /// appropriate ID, type, and provider based on the executor's configuration. - internal ExecutorRegistration Registration => new(this.Id, this.RuntimeType, this.ExecutorProvider, this.RawData); - - private System.Type RuntimeType => this.ExecutorType switch - { - Type.Unbound => throw new InvalidOperationException($"ExecutorIsh with ID '{this.Id}' is unbound."), - Type.Executor => this._configuredExecutorType!, - Type.RequestPort => typeof(RequestInfoExecutor), - Type.Agent => typeof(AIAgentHostExecutor), - Type.Function => this._configuredExecutorType!, - Type.Workflow => this._configuredExecutorType!, - _ => throw new InvalidOperationException($"Unknown ExecutorIsh type: {this.ExecutorType}") - }; - - /// - /// Gets an that can be used to obtain an instance - /// corresponding to this . - /// - private Func> ExecutorProvider => this.ExecutorType switch - { - Type.Unbound => throw new InvalidOperationException($"Executor with ID '{this.Id}' is unbound."), - Type.Executor => this._configuredExecutor!.BoundFactoryAsync, - Type.RequestPort => (runId) => new(new RequestInfoExecutor(this._requestPortValue!)), - Type.Agent => (runId) => new(new AIAgentHostExecutor(this._aiAgentValue!)), - Type.Function => this._configuredExecutor!.BoundFactoryAsync, - Type.Workflow => this._configuredExecutor!.BoundFactoryAsync, - _ => throw new InvalidOperationException($"Unknown ExecutorIsh type: {this.ExecutorType}") - }; - - /// - /// Defines an implicit conversion from an instance to an object. - /// - /// The instance to convert to . - public static implicit operator ExecutorIsh(Executor executor) => new(executor); - - /// - /// Defines an implicit conversion from an to an instance. - /// - /// The to convert to an . - public static implicit operator ExecutorIsh(RequestPort inputPort) => new(inputPort); - - /// - /// Defines an implicit conversion from an to an instance. - /// - /// The to convert to an . - public static implicit operator ExecutorIsh(AIAgent aiAgent) => new(aiAgent); - - /// - /// Defines an implicit conversion from a string to an instance. - /// - /// The string ID to convert to an . - public static implicit operator ExecutorIsh(string id) => new(id); - - /// - public bool Equals(ExecutorIsh? other) => - other is not null && other.Id == this.Id; - - /// - public bool Equals(IIdentified? other) => - other is not null && other.Id == this.Id; - - /// - public bool Equals(string? other) => - other is not null && other == this.Id; - - /// - public override bool Equals(object? obj) => - obj switch - { - null => false, - ExecutorIsh ish => this.Equals(ish), - IIdentified identified => this.Equals(identified), - string str => this.Equals(str), - _ => false - }; - - /// - public override int GetHashCode() => this.Id.GetHashCode(); - - /// - public override string ToString() => this.ExecutorType switch - { - Type.Unbound => $"'{this.Id}':", - Type.Executor => $"'{this.Id}':{this._configuredExecutorType!.Name}", - Type.RequestPort => $"'{this.Id}':Input({this._requestPortValue!.Request.Name}->{this._requestPortValue!.Response.Name})", - Type.Agent => $"{this.Id}':AIAgent(@{this._aiAgentValue!.GetType().Name})", - Type.Function => $"'{this.Id}':{this._configuredExecutorType!.Name}", - Type.Workflow => $"'{this.Id}':{this._configuredExecutorType!.Name}", - _ => $"'{this.Id}':" - }; -} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorPlaceholder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorPlaceholder.cs new file mode 100644 index 0000000000..f3dc902839 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorPlaceholder.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Represents a placeholder entry for an , identified by a unique ID. +/// +/// The unique identifier for the placeholder registration. +public record ExecutorPlaceholder(string Id) + : ExecutorBinding(Id, + null, + typeof(Executor), + Id) +{ + /// + public override bool SupportsConcurrentSharedExecution => false; + + /// + public override bool SupportsResetting => false; + + /// + public override bool IsSharedInstance => false; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorRegistration.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorRegistration.cs deleted file mode 100644 index eef749966c..0000000000 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorRegistration.cs +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using System; -using System.Threading.Tasks; -using Microsoft.Shared.Diagnostics; - -using ExecutorFactoryF = System.Func>; - -namespace Microsoft.Agents.AI.Workflows; - -internal sealed class ExecutorRegistration(string id, Type executorType, ExecutorFactoryF provider, object? rawData) -{ - public string Id { get; } = Throw.IfNullOrEmpty(id); - public Type ExecutorType { get; } = Throw.IfNull(executorType); - private ExecutorFactoryF ProviderAsync { get; } = Throw.IfNull(provider); - - public bool IsSharedInstance { get; } = rawData is Executor; - - /// - /// Gets a value whether instances of the executor created from this registration can be reset between subsequent - /// runs from the same instance. This value is not relevant for executors that . - /// - public bool SupportsResetting { get; } = rawData is Executor && - // Cross-Run Shareable executors are "trivially" resettable, since they - // have no on-object state. - rawData is IResettableExecutor; - - /// - /// Gets a value whether instances of the executor created from this registration can be used in concurrent runs - /// from the same instance. - /// - public bool SupportsConcurrent { get; } = rawData is not Executor executor || executor.IsCrossRunShareable; - - internal async ValueTask TryResetAsync() - { - // Non-shared instances do not need resetting - if (!this.IsSharedInstance) - { - return true; - } - - // Technically we definitely know this is true, since if rawData is an Executor, if it was not resettable - // then we would have returned in the first condition, and if rawData is not an Executor, we would have - // returned in the second condition. That only leaves the possibility of rawData is Executor and also - // IResettableExecutor. - if (this.RawExecutorishData is IResettableExecutor resettableExecutor) - { - await resettableExecutor.ResetAsync().ConfigureAwait(false); - return true; - } - - return false; - } - - internal object? RawExecutorishData { get; } = rawData; - - public override string ToString() => $"{this.ExecutorType.Name}({this.Id})"; - - private Executor CheckId(Executor executor) - { - if (executor.Id != this.Id) - { - throw new InvalidOperationException( - $"Executor ID mismatch: expected '{this.Id}', but got '{executor.Id}'."); - } - - return executor; - } - - public async ValueTask CreateInstanceAsync(string runId) => this.CheckId(await this.ProviderAsync(runId).ConfigureAwait(false)); -} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/FunctionExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/FunctionExecutor.cs index 63db9456b4..a3371dc302 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/FunctionExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/FunctionExecutor.cs @@ -38,7 +38,9 @@ ValueTask RunActionAsync(TInput input, IWorkflowContext workflowContext, Cancell /// /// A unique identifier for the executor. /// A synchronous function to execute for each input message and workflow context. - public FunctionExecutor(string id, Action handlerSync) : this(id, WrapAction(handlerSync)) + /// Configuration options for the executor. If null, default options will be used. + /// Declare that this executor may be used simultaneously by multiple runs safely. + public FunctionExecutor(string id, Action handlerSync, ExecutorOptions? options = null, bool declareCrossRunShareable = false) : this(id, WrapAction(handlerSync), options, declareCrossRunShareable) { } } @@ -76,7 +78,9 @@ ValueTask RunFuncAsync(TInput input, IWorkflowContext workflowContext, /// /// A unique identifier for the executor. /// A synchronous function to execute for each input message and workflow context. - public FunctionExecutor(string id, Func handlerSync) : this(id, WrapFunc(handlerSync)) + /// Configuration options for the executor. If null, default options will be used. + /// Declare that this executor may be used simultaneously by multiple runs safely. + public FunctionExecutor(string id, Func handlerSync, ExecutorOptions? options = null, bool declareCrossRunShareable = false) : this(id, WrapFunc(handlerSync), options, declareCrossRunShareable) { } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/GroupChatWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/GroupChatWorkflowBuilder.cs index 92b73083a9..c02a609f75 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/GroupChatWorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/GroupChatWorkflowBuilder.cs @@ -50,12 +50,12 @@ public GroupChatWorkflowBuilder AddParticipants(params IEnumerable agen public Workflow Build() { AIAgent[] agents = this._participants.ToArray(); - Dictionary agentMap = agents.ToDictionary(a => a, a => (ExecutorIsh)new AgentRunStreamingExecutor(a, includeInputInOutput: true)); + Dictionary agentMap = agents.ToDictionary(a => a, a => (ExecutorBinding)new AgentRunStreamingExecutor(a, includeInputInOutput: true)); Func> groupChatHostFactory = (string id, string runId) => new(new GroupChatHost(id, agents, agentMap, this._managerFactory)); - ExecutorIsh host = groupChatHostFactory.ConfigureFactory(nameof(GroupChatHost)); + ExecutorBinding host = groupChatHostFactory.BindExecutor(nameof(GroupChatHost)); WorkflowBuilder builder = new(host); foreach (var participant in agentMap.Values) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index 874464812e..1750f779f2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -73,7 +73,7 @@ public async ValueTask EnsureExecutorAsync(string executorId, IStepTra async Task CreateExecutorAsync(string id) { - if (!this._workflow.Registrations.TryGetValue(executorId, out var registration)) + if (!this._workflow.ExecutorBindings.TryGetValue(executorId, out var registration)) { throw new InvalidOperationException($"Executor with ID '{executorId}' is not registered."); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/RequestPortBinding.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/RequestPortBinding.cs new file mode 100644 index 0000000000..726895a1a7 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/RequestPortBinding.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Specialized; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Represents the registration details for a request port, including configuration for allowing wrapped requests. +/// +/// The request port. +/// true to allow wrapped requests to be handled by the port; otherwise, false. +/// The default is true. +public record RequestPortBinding(RequestPort Port, bool AllowWrapped = true) + : ExecutorBinding(Throw.IfNull(Port).Id, + (_) => new ValueTask(new RequestInfoExecutor(Port, AllowWrapped)), + typeof(RequestInfoExecutor), + Port) +{ + /// + public override bool IsSharedInstance => false; + + /// + public override bool SupportsConcurrentSharedExecution => true; + + /// + public override bool SupportsResetting => false; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/GroupChatHost.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/GroupChatHost.cs index 16f749d5a3..76e3f10bd2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/GroupChatHost.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/GroupChatHost.cs @@ -10,11 +10,11 @@ namespace Microsoft.Agents.AI.Workflows.Specialized; internal sealed class GroupChatHost( string id, AIAgent[] agents, - Dictionary agentMap, + Dictionary agentMap, Func, GroupChatManager> managerFactory) : Executor(id), IResettableExecutor { private readonly AIAgent[] _agents = agents; - private readonly Dictionary _agentMap = agentMap; + private readonly Dictionary _agentMap = agentMap; private readonly Func, GroupChatManager> _managerFactory = managerFactory; private readonly List _pendingMessages = []; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SubworkflowBinding.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SubworkflowBinding.cs new file mode 100644 index 0000000000..1f29ffe426 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SubworkflowBinding.cs @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Specialized; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Represents the workflow binding details for a subworkflow, including its instance, identifier, and optional +/// executor options. +/// +/// +/// +/// +public record SubworkflowBinding(Workflow WorkflowInstance, string Id, ExecutorOptions? ExecutorOptions = null) + : ExecutorBinding(Throw.IfNull(Id), + CreateWorkflowExecutorFactory(WorkflowInstance, Id, ExecutorOptions), + typeof(WorkflowHostExecutor), + WorkflowInstance) +{ + private static Func> CreateWorkflowExecutorFactory(Workflow workflow, string id, ExecutorOptions? options) + { + object ownershipToken = new(); + workflow.TakeOwnership(ownershipToken, subworkflow: true); + + return InitHostExecutorAsync; + + ValueTask InitHostExecutorAsync(string runId) + { + return new(new WorkflowHostExecutor(id, workflow, runId, ownershipToken, options)); + } + } + + /// + public override bool IsSharedInstance => false; + + /// + public override bool SupportsConcurrentSharedExecution => true; + + /// + public override bool SupportsResetting => false; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SwitchBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SwitchBuilder.cs index 306acd4b4e..e180650935 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/SwitchBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SwitchBuilder.cs @@ -13,7 +13,7 @@ namespace Microsoft.Agents.AI.Workflows; /// public sealed class SwitchBuilder { - private readonly List _executors = []; + private readonly List _executors = []; private readonly Dictionary _executorIndicies = []; private readonly List<(Func Predicate, HashSet OutgoingIndicies)> _caseMap = []; private readonly HashSet _defaultIndicies = []; @@ -30,14 +30,14 @@ public sealed class SwitchBuilder /// One or more executors to associate with the predicate. Each executor will be invoked if the predicate matches. /// Cannot be null. /// The current instance, allowing for method chaining. - public SwitchBuilder AddCase(Func predicate, params IEnumerable executors) + public SwitchBuilder AddCase(Func predicate, params IEnumerable executors) { Throw.IfNull(predicate); Throw.IfNull(executors); HashSet indicies = []; - foreach (ExecutorIsh executor in executors) + foreach (ExecutorBinding executor in executors) { if (!this._executorIndicies.TryGetValue(executor.Id, out int index)) { @@ -60,11 +60,11 @@ public SwitchBuilder AddCase(Func predicate, params IEnumerable /// /// - public SwitchBuilder WithDefault(params IEnumerable executors) + public SwitchBuilder WithDefault(params IEnumerable executors) { Throw.IfNull(executors); - foreach (ExecutorIsh executor in executors) + foreach (ExecutorBinding executor in executors) { if (!this._executorIndicies.TryGetValue(executor.Id, out int index)) { @@ -79,7 +79,7 @@ public SwitchBuilder WithDefault(params IEnumerable executors) return this; } - internal WorkflowBuilder ReduceToFanOut(WorkflowBuilder builder, ExecutorIsh source) + internal WorkflowBuilder ReduceToFanOut(WorkflowBuilder builder, ExecutorBinding source) { List<(Func Predicate, HashSet OutgoingIndicies)> caseMap = this._caseMap; HashSet defaultIndicies = this._defaultIndicies; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Visualization/WorkflowVisualizer.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Visualization/WorkflowVisualizer.cs index 886002320f..b871fbc881 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Visualization/WorkflowVisualizer.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Visualization/WorkflowVisualizer.cs @@ -69,7 +69,7 @@ private static void EmitWorkflowDigraph(Workflow workflow, List lines, s lines.Add($"{indent}\"{MapId(startExecutorId)}\" [fillcolor=lightgreen, label=\"{startExecutorId}\\n(Start)\"];"); // Add other executor nodes - foreach (var executorId in workflow.Registrations.Keys) + foreach (var executorId in workflow.ExecutorBindings.Keys) { if (executorId != startExecutorId) { @@ -108,7 +108,7 @@ private static void EmitWorkflowDigraph(Workflow workflow, List lines, s private static void EmitSubWorkflowsDigraph(Workflow workflow, List lines, string indent) { - foreach (var kvp in workflow.Registrations) + foreach (var kvp in workflow.ExecutorBindings) { var execId = kvp.Key; var registration = kvp.Value; @@ -145,7 +145,7 @@ string sanitize(string input) lines.Add($"{indent}{MapId(startExecutorId)}[\"{startExecutorId} (Start)\"];"); // Add other executor nodes - foreach (var executorId in workflow.Registrations.Keys) + foreach (var executorId in workflow.ExecutorBindings.Keys) { if (executorId != startExecutorId) { @@ -264,9 +264,9 @@ private static string ComputeShortHash(string input) #endif } - private static bool TryGetNestedWorkflow(ExecutorRegistration registration, [NotNullWhen(true)] out Workflow? workflow) + private static bool TryGetNestedWorkflow(ExecutorBinding registration, [NotNullWhen(true)] out Workflow? workflow) { - if (registration.RawExecutorishData is Workflow subWorkflow) + if (registration.RawValue is Workflow subWorkflow) { workflow = subWorkflow; return true; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs index 3636500438..a4f6be1210 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs @@ -19,7 +19,7 @@ public class Workflow /// /// A dictionary of executor providers, keyed by executor ID. /// - internal Dictionary Registrations { get; init; } = []; + internal Dictionary ExecutorBindings { get; init; } = []; internal Dictionary> Edges { get; init; } = []; internal HashSet OutputExecutors { get; init; } = []; @@ -41,7 +41,7 @@ public Dictionary> ReflectEdges() /// Gets the collection of external request ports, keyed by their ID. /// /// - /// Each port has a corresponding entry in the dictionary. + /// Each port has a corresponding entry in the dictionary. /// public Dictionary ReflectPorts() { @@ -66,10 +66,10 @@ public Dictionary ReflectPorts() /// public string? Description { get; internal init; } - internal bool AllowConcurrent => this.Registrations.Values.All(registration => registration.SupportsConcurrent); + internal bool AllowConcurrent => this.ExecutorBindings.Values.All(registration => registration.SupportsConcurrentSharedExecution); internal IEnumerable NonConcurrentExecutorIds => - this.Registrations.Values.Where(r => !r.SupportsConcurrent).Select(r => r.Id); + this.ExecutorBindings.Values.Where(r => !r.SupportsConcurrentSharedExecution).Select(r => r.Id); /// /// Initializes a new instance of the class with the specified starting executor identifier @@ -86,12 +86,14 @@ internal Workflow(string startExecutorId, string? name = null, string? descripti } private bool _needsReset; - private bool HasResettable => this.Registrations.Values.Any(registration => registration.SupportsResetting); + private bool HasResettableExecutors => + this.ExecutorBindings.Values.Any(registration => registration.SupportsResetting); + private async ValueTask TryResetExecutorRegistrationsAsync() { - if (this.HasResettable) + if (this.HasResettableExecutors) { - foreach (ExecutorRegistration registration in this.Registrations.Values) + foreach (ExecutorBinding registration in this.ExecutorBindings.Values) { // TryResetAsync returns true if the executor does not need resetting if (!await registration.TryResetAsync().ConfigureAwait(false)) @@ -158,7 +160,7 @@ internal void TakeOwnership(object ownerToken, bool subworkflow = false, object? }); } - this._needsReset = this.HasResettable; + this._needsReset = this.HasResettableExecutors; this._ownedAsSubworkflow = subworkflow; } @@ -188,7 +190,7 @@ internal async ValueTask ReleaseOwnershipAsync(object ownerToken) /// a the protocol this follows. public async ValueTask DescribeProtocolAsync(CancellationToken cancellationToken = default) { - ExecutorRegistration startExecutorRegistration = this.Registrations[this.StartExecutorId]; + ExecutorBinding startExecutorRegistration = this.ExecutorBindings[this.StartExecutorId]; Executor startExecutor = await startExecutorRegistration.CreateInstanceAsync(string.Empty) .ConfigureAwait(false); return startExecutor.DescribeProtocol(); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs index 9797843fa7..c277a84b36 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs @@ -19,7 +19,7 @@ namespace Microsoft.Agents.AI.Workflows; /// Use the WorkflowBuilder to incrementally add executors and edges, including fan-in and fan-out /// patterns, before building a strongly-typed workflow instance. Executors must be bound before building the workflow. /// All executors must be bound by calling into if they were intially specified as -/// . +/// . public class WorkflowBuilder { private readonly record struct EdgeConnection(string SourceId, string TargetId) @@ -28,11 +28,11 @@ private readonly record struct EdgeConnection(string SourceId, string TargetId) } private int _edgeCount; - private readonly Dictionary _executors = []; + private readonly Dictionary _executors = []; private readonly Dictionary> _edges = []; private readonly HashSet _unboundExecutors = []; private readonly HashSet _conditionlessConnections = []; - private readonly Dictionary _inputPorts = []; + private readonly Dictionary _requestPorts = []; private readonly HashSet _outputExecutors = []; private readonly string _startExecutorId; @@ -46,57 +46,56 @@ private readonly record struct EdgeConnection(string SourceId, string TargetId) /// Initializes a new instance of the WorkflowBuilder class with the specified starting executor. /// /// The executor that defines the starting point of the workflow. Cannot be null. - public WorkflowBuilder(ExecutorIsh start) + public WorkflowBuilder(ExecutorBinding start) { this._startExecutorId = this.Track(start).Id; } - private ExecutorIsh Track(ExecutorIsh executorish) + private ExecutorBinding Track(ExecutorBinding registration) { // If the executor is unbound, create an entry for it, unless it already exists. // Otherwise, update the entry for it, and remove the unbound tag - if (executorish.IsUnbound && !this._executors.ContainsKey(executorish.Id)) + if (registration.IsPlaceholder && !this._executors.ContainsKey(registration.Id)) { // If this is an unbound executor, we need to track it separately - this._unboundExecutors.Add(executorish.Id); + this._unboundExecutors.Add(registration.Id); } - else if (!executorish.IsUnbound) + else if (!registration.IsPlaceholder) { - ExecutorRegistration incoming = executorish.Registration; // If there is already a bound executor with this ID, we need to validate (to best efforts) // that the two are matching (at least based on type) - if (this._executors.TryGetValue(executorish.Id, out ExecutorRegistration? existing)) + if (this._executors.TryGetValue(registration.Id, out ExecutorBinding? existing)) { - if (existing.ExecutorType != incoming.ExecutorType) + if (existing.ExecutorType != registration.ExecutorType) { throw new InvalidOperationException( - $"Cannot bind executor with ID '{executorish.Id}' because an executor with the same ID but a different type ({existing.ExecutorType.Name} vs {incoming.ExecutorType.Name}) is already bound."); + $"Cannot bind executor with ID '{registration.Id}' because an executor with the same ID but a different type ({existing.ExecutorType.Name} vs {registration.ExecutorType.Name}) is already bound."); } - if (existing.RawExecutorishData is not null && - !ReferenceEquals(existing.RawExecutorishData, incoming.RawExecutorishData)) + if (existing.RawValue is not null && + !ReferenceEquals(existing.RawValue, registration.RawValue)) { throw new InvalidOperationException( - $"Cannot bind executor with ID '{executorish.Id}' because an executor with the same ID but different instance is already bound."); + $"Cannot bind executor with ID '{registration.Id}' because an executor with the same ID but different instance is already bound."); } } else { - this._executors[executorish.Id] = executorish.Registration; - if (this._unboundExecutors.Contains(executorish.Id)) + this._executors[registration.Id] = registration; + if (this._unboundExecutors.Contains(registration.Id)) { - this._unboundExecutors.Remove(executorish.Id); + this._unboundExecutors.Remove(registration.Id); } } } - if (executorish.ExecutorType == ExecutorIsh.Type.RequestPort) + if (registration is RequestPortBinding portRegistration) { - RequestPort port = executorish._requestPortValue!; - this._inputPorts[port.Id] = port; + RequestPort port = portRegistration.Port; + this._requestPorts[port.Id] = port; } - return executorish; + return registration; } /// @@ -106,9 +105,9 @@ private ExecutorIsh Track(ExecutorIsh executorish) /// /// /// - public WorkflowBuilder WithOutputFrom(params ExecutorIsh[] executors) + public WorkflowBuilder WithOutputFrom(params ExecutorBinding[] executors) { - foreach (ExecutorIsh executor in executors) + foreach (ExecutorBinding executor in executors) { this._outputExecutors.Add(this.Track(executor).Id); } @@ -139,21 +138,21 @@ public WorkflowBuilder WithDescription(string description) } /// - /// Binds the specified executor to the workflow, allowing it to participate in workflow execution. + /// Binds the specified executor (via registration) to the workflow, allowing it to participate in workflow execution. /// - /// The executor instance to bind. The executor must exist in the workflow and not be already bound. + /// The executor instance to bind. The executor must exist in the workflow and not be already bound. /// The current instance, enabling fluent configuration. /// Thrown if the specified executor is already bound or does not exist in the workflow. - public WorkflowBuilder BindExecutor(Executor executor) + public WorkflowBuilder BindExecutor(ExecutorBinding registration) { - if (!this._unboundExecutors.Contains(executor.Id)) + if (Throw.IfNull(registration) is ExecutorPlaceholder) { throw new InvalidOperationException( - $"Executor with ID '{executor.Id}' is already bound or does not exist in the workflow."); + $"Cannot bind executor with ID '{registration.Id}' because it is a placeholder registration. " + + "You must provide a concrete executor instance or registration."); } - this._executors[executor.Id] = new ExecutorIsh(executor).Registration; - this._unboundExecutors.Remove(executor.Id); + this.Track(registration); return this; } @@ -180,7 +179,7 @@ private HashSet EnsureEdgesFor(string sourceId) /// The current instance of . /// Thrown if an unconditional edge between the specified source and target /// executors already exists. - public WorkflowBuilder AddEdge(ExecutorIsh source, ExecutorIsh target, bool idempotent = false) + public WorkflowBuilder AddEdge(ExecutorBinding source, ExecutorBinding target, bool idempotent = false) => this.AddEdge(source, target, null, idempotent); internal static Func? CreateConditionFunc(Func? condition) @@ -236,7 +235,7 @@ public WorkflowBuilder AddEdge(ExecutorIsh source, ExecutorIsh target, bool idem /// The current instance of . /// Thrown if an unconditional edge between the specified source and target /// executors already exists. - public WorkflowBuilder AddEdge(ExecutorIsh source, ExecutorIsh target, Func? condition = null, bool idempotent = false) + public WorkflowBuilder AddEdge(ExecutorBinding source, ExecutorBinding target, Func? condition = null, bool idempotent = false) { // Add an edge from source to target with an optional condition. // This is a low-level builder method that does not enforce any specific executor type. @@ -273,7 +272,7 @@ public WorkflowBuilder AddEdge(ExecutorIsh source, ExecutorIsh target, FuncThe source executor from which the fan-out edge originates. Cannot be null. /// One or more target executors that will receive the fan-out edge. Cannot be null or empty. /// The current instance of . - public WorkflowBuilder AddFanOutEdge(ExecutorIsh source, params IEnumerable targets) + public WorkflowBuilder AddFanOutEdge(ExecutorBinding source, params IEnumerable targets) => this.AddFanOutEdge(source, null, targets); internal static Func>? CreateEdgeAssignerFunc(Func>? partitioner) @@ -305,7 +304,7 @@ public WorkflowBuilder AddFanOutEdge(ExecutorIsh source, params IEnumerable /// One or more target executors that will receive the fan-out edge. Cannot be null or empty. /// The current instance of . - public WorkflowBuilder AddFanOutEdge(ExecutorIsh source, Func>? partitioner = null, params IEnumerable targets) + public WorkflowBuilder AddFanOutEdge(ExecutorBinding source, Func>? partitioner = null, params IEnumerable targets) { Throw.IfNull(source); Throw.IfNull(targets); @@ -339,7 +338,7 @@ public WorkflowBuilder AddFanOutEdge(ExecutorIsh source, FuncThe target executor that receives input from the specified source executors. Cannot be null. /// One or more source executors that provide input to the target. Cannot be null or empty. /// The current instance of . - public WorkflowBuilder AddFanInEdge(ExecutorIsh target, params IEnumerable sources) + public WorkflowBuilder AddFanInEdge(ExecutorBinding target, params IEnumerable sources) { Throw.IfNull(target); Throw.IfNull(sources); @@ -398,9 +397,9 @@ private Workflow BuildInternal(Activity? activity = null) var workflow = new Workflow(this._startExecutorId, this._name, this._description) { - Registrations = this._executors, + ExecutorBindings = this._executors, Edges = this._edges, - Ports = this._inputPorts, + Ports = this._requestPorts, OutputExecutors = this._outputExecutors }; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilderExtensions.cs index a0d42ac35f..e338f73fc6 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilderExtensions.cs @@ -24,7 +24,7 @@ public static class WorkflowBuilderExtensions /// The source executor from which messages will be forwarded. /// The target executors to which messages will be forwarded. /// The updated instance. - public static WorkflowBuilder ForwardMessage(this WorkflowBuilder builder, ExecutorIsh source, params IEnumerable executors) + public static WorkflowBuilder ForwardMessage(this WorkflowBuilder builder, ExecutorBinding source, params IEnumerable executors) => builder.ForwardMessage(source, condition: null, executors); /// @@ -38,7 +38,7 @@ public static WorkflowBuilder ForwardMessage(this WorkflowBuilder buil /// all messages of type will be forwarded. /// The target executors to which messages will be forwarded. /// The updated instance. - public static WorkflowBuilder ForwardMessage(this WorkflowBuilder builder, ExecutorIsh source, Func? condition = null, params IEnumerable executors) + public static WorkflowBuilder ForwardMessage(this WorkflowBuilder builder, ExecutorBinding source, Func? condition = null, params IEnumerable executors) { Throw.IfNull(executors); @@ -47,7 +47,7 @@ public static WorkflowBuilder ForwardMessage(this WorkflowBuilder buil #if NET if (executors.TryGetNonEnumeratedCount(out int count) && count == 1) #else - if (executors is ICollection { Count: 1 }) + if (executors is ICollection { Count: 1 }) #endif { return builder.AddEdge(source, executors.First(), predicate); @@ -68,7 +68,7 @@ public static WorkflowBuilder ForwardMessage(this WorkflowBuilder buil /// The source executor from which messages will be forwarded. /// The target executors to which messages, except those of type , will be forwarded. /// The updated instance with the added edges. - public static WorkflowBuilder ForwardExcept(this WorkflowBuilder builder, ExecutorIsh source, params IEnumerable executors) + public static WorkflowBuilder ForwardExcept(this WorkflowBuilder builder, ExecutorBinding source, params IEnumerable executors) { Throw.IfNull(executors); @@ -77,7 +77,7 @@ public static WorkflowBuilder ForwardExcept(this WorkflowBuilder build #if NET if (executors.TryGetNonEnumeratedCount(out int count) && count == 1) #else - if (executors is ICollection { Count: 1 }) + if (executors is ICollection { Count: 1 }) #endif { return builder.AddEdge(source, executors.First(), predicate); @@ -102,7 +102,7 @@ public static WorkflowBuilder ForwardExcept(this WorkflowBuilder build /// An ordered array of executors to be added to the chain after the source. /// The original workflow builder instance with the specified executor chain added. /// Thrown if there is a cycle in the chain. - public static WorkflowBuilder AddChain(this WorkflowBuilder builder, ExecutorIsh source, bool allowRepetition = false, params IEnumerable executors) + public static WorkflowBuilder AddChain(this WorkflowBuilder builder, ExecutorBinding source, bool allowRepetition = false, params IEnumerable executors) { Throw.IfNull(builder); Throw.IfNull(source); @@ -139,7 +139,7 @@ public static WorkflowBuilder AddChain(this WorkflowBuilder builder, ExecutorIsh /// The source executor representing the external system or process to connect. Cannot be null. /// The unique identifier for the input port that will handle the external call. Cannot be null. /// The original workflow builder instance with the external call added. - public static WorkflowBuilder AddExternalCall(this WorkflowBuilder builder, ExecutorIsh source, string portId) + public static WorkflowBuilder AddExternalCall(this WorkflowBuilder builder, ExecutorBinding source, string portId) { Throw.IfNull(builder); Throw.IfNull(source); @@ -160,7 +160,7 @@ public static WorkflowBuilder AddExternalCall(this Workflow /// The source executor that determines the branching condition for the switch. Cannot be null. /// An action used to configure the switch builder, specifying the branches and their conditions. Cannot be null. /// The workflow builder instance with the configured switch step added. - public static WorkflowBuilder AddSwitch(this WorkflowBuilder builder, ExecutorIsh source, Action configureSwitch) + public static WorkflowBuilder AddSwitch(this WorkflowBuilder builder, ExecutorBinding source, Action configureSwitch) { Throw.IfNull(builder); Throw.IfNull(source); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs index 1c20a7a091..2333a605e2 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs @@ -167,7 +167,7 @@ private static Workflow CreateTestWorkflow() builder.AddEdge(forwardString, stringToInt) .AddEdge(stringToInt, forwardInt) .AddEdge(forwardInt, intToString) - .AddEdge(intToString, StreamingAggregators.Last().AsExecutor("Aggregate")); + .AddEdge(intToString, StreamingAggregators.Last().BindAsExecutor("Aggregate")); return builder.Build(); } @@ -180,7 +180,7 @@ internal static WorkflowInfo CreateTestWorkflowInfo() private static void ValidateWorkflowInfo(WorkflowInfo actual, WorkflowInfo prototype) { - ValidateExecutorDictionary(prototype.Executors, prototype.Edges, actual.Executors, actual.Edges); + ValidateExecutorDictionary(prototype.ExecutorBindings, prototype.Edges, actual.ExecutorBindings, actual.Edges); ValidateRequestPorts(prototype.RequestPorts, actual.RequestPorts); actual.InputType.Should().Match(prototype.InputType.CreateValidator()); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RepresentationTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RepresentationTests.cs index fa5ef22903..9cf460e658 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RepresentationTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RepresentationTests.cs @@ -2,6 +2,8 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Reflection; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -38,34 +40,40 @@ public override IAsyncEnumerable RunStreamingAsync(IEnum private static RequestPort TestRequestPort => RequestPort.Create("ExternalFunction"); - private static async ValueTask RunExecutorishInfoMatchTestAsync(ExecutorIsh target) + private static async ValueTask RunExecutorBindingInfoMatchTestAsync(ExecutorBinding binding) { - ExecutorRegistration registration = target.Registration; - ExecutorInfo info = registration.ToExecutorInfo(); + ExecutorInfo info = binding.ToExecutorInfo(); - info.IsMatch(await registration.CreateInstanceAsync(runId: string.Empty)).Should().BeTrue(); + info.IsMatch(await binding.CreateInstanceAsync(runId: string.Empty)).Should().BeTrue(); } [Fact] - public async Task Test_Executorish_InfosAsync() + public async Task Test_ExecutorBinding_InfosAsync() { int testsRun = 0; - await RunExecutorishTestAsync(new TestExecutor()); - await RunExecutorishTestAsync(TestRequestPort); - await RunExecutorishTestAsync(new TestAgent()); - await RunExecutorishTestAsync(Step1EntryPoint.WorkflowInstance.ConfigureSubWorkflow(nameof(Step1EntryPoint))); + await RunExecutorBindingTestAsync(new TestExecutor()); + await RunExecutorBindingTestAsync(TestRequestPort); + await RunExecutorBindingTestAsync(new TestAgent()); + await RunExecutorBindingTestAsync(Step1EntryPoint.WorkflowInstance.BindAsExecutor(nameof(Step1EntryPoint))); Func function = MessageHandlerAsync; - await RunExecutorishTestAsync(function.AsExecutor("FunctionExecutor")); + await RunExecutorBindingTestAsync(function.BindAsExecutor("FunctionExecutor")); - if (Enum.GetValues(typeof(ExecutorIsh.Type)).Length > testsRun + 1) + Type bindingBaseType = typeof(ExecutorBinding); + Assembly workflowAssembly = bindingBaseType.Assembly; + int expectedTests = workflowAssembly.GetTypes() + .Count(type => type != bindingBaseType + && bindingBaseType.IsAssignableFrom(type)); + expectedTests.Should().BePositive(); + + if (expectedTests > testsRun + 1) { - Assert.Fail("Not all ExecutorIsh types were tested."); + Assert.Fail("Not all ExecutorBinding types were tested."); } - async ValueTask RunExecutorishTestAsync(ExecutorIsh executorish) + async ValueTask RunExecutorBindingTestAsync(ExecutorBinding binding) { - await RunExecutorishInfoMatchTestAsync(executorish); + await RunExecutorBindingInfoMatchTestAsync(binding); testsRun++; } @@ -77,8 +85,8 @@ async ValueTask MessageHandlerAsync(int message, IWorkflowContext workflowContex [Fact] public async Task Test_SpecializedExecutor_InfosAsync() { - await RunExecutorishInfoMatchTestAsync(new AIAgentHostExecutor(new TestAgent())); - await RunExecutorishInfoMatchTestAsync(new RequestInfoExecutor(TestRequestPort)); + await RunExecutorBindingInfoMatchTestAsync(new AIAgentHostExecutor(new TestAgent())); + await RunExecutorBindingInfoMatchTestAsync(new RequestInfoExecutor(TestRequestPort)); } private static string Source(int id) => $"Source/{id}"; diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/07_GroupChat_Workflow_HostAsAgent.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/07_GroupChat_Workflow_HostAsAgent.cs index 8144d1bd11..ffc4d0ba35 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/07_GroupChat_Workflow_HostAsAgent.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/07_GroupChat_Workflow_HostAsAgent.cs @@ -11,21 +11,23 @@ internal static class Step7EntryPoint public static string EchoAgentId => Step6EntryPoint.EchoAgentId; public static string EchoPrefix => Step6EntryPoint.EchoPrefix; - public static async ValueTask RunAsync(TextWriter writer, IWorkflowExecutionEnvironment environment, int maxSteps = 2) + public static async ValueTask RunAsync(TextWriter writer, IWorkflowExecutionEnvironment environment, int maxSteps = 2, int numIterations = 2) { Workflow workflow = Step6EntryPoint.CreateWorkflow(maxSteps); AIAgent agent = workflow.AsAgent("group-chat-agent", "Group Chat Agent"); - AgentThread thread = agent.GetNewThread(); - - await foreach (AgentRunResponseUpdate update in agent.RunStreamingAsync(thread).ConfigureAwait(false)) + for (int i = 0; i < numIterations; i++) { - string updateText = $"{update.AuthorName - ?? update.AgentId - ?? update.Role.ToString() - ?? ChatRole.Assistant.ToString()}: {update.Text}"; - writer.WriteLine(updateText); + AgentThread thread = agent.GetNewThread(); + await foreach (AgentRunResponseUpdate update in agent.RunStreamingAsync(thread).ConfigureAwait(false)) + { + string updateText = $"{update.AuthorName + ?? update.AgentId + ?? update.Role.ToString() + ?? ChatRole.Assistant.ToString()}: {update.Text}"; + writer.WriteLine(updateText); + } } } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/08_Subworkflow_Simple.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/08_Subworkflow_Simple.cs index 8ec7978606..58372103f4 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/08_Subworkflow_Simple.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/08_Subworkflow_Simple.cs @@ -29,13 +29,13 @@ internal static class Step8EntryPoint public static async ValueTask> RunAsync(TextWriter writer, IWorkflowExecutionEnvironment environment, List textsToProcess) { Func processTextAsyncFunc = ProcessTextAsync; - ExecutorIsh processText = processTextAsyncFunc.AsExecutor("TextProcessor", threadsafe: true); + ExecutorBinding processText = processTextAsyncFunc.BindAsExecutor("TextProcessor", threadsafe: true); Workflow subWorkflow = new WorkflowBuilder(processText).WithOutputFrom(processText).Build(); - ExecutorIsh textProcessor = subWorkflow.ConfigureSubWorkflow("TextProcessor"); + ExecutorBinding textProcessor = subWorkflow.BindAsExecutor("TextProcessor"); Func> createOrchestrator = (id, _) => new(new TextProcessingOrchestrator(id)); - var orchestrator = createOrchestrator.ConfigureFactory(); + var orchestrator = createOrchestrator.BindExecutor(); Workflow workflow = new WorkflowBuilder(orchestrator) .AddEdge(orchestrator, textProcessor) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/09_Subworkflow_ExternalRequest.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/09_Subworkflow_ExternalRequest.cs index 7f9b6189bc..aac3ed1d94 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/09_Subworkflow_ExternalRequest.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/09_Subworkflow_ExternalRequest.cs @@ -62,7 +62,7 @@ internal sealed record class RequestFinished(string Id, string RequestType, Reso internal static class Step9EntryPoint { - public static WorkflowBuilder AddPassthroughRequestHandler(this WorkflowBuilder builder, ExecutorIsh source, ExecutorIsh filter, string? id = null) + public static WorkflowBuilder AddPassthroughRequestHandler(this WorkflowBuilder builder, ExecutorBinding source, ExecutorBinding filter, string? id = null) { id ??= typeof(TRequest).Name; @@ -74,10 +74,10 @@ public static WorkflowBuilder AddPassthroughRequestHandler( .ForwardMessage(filter, executors: [source], condition: message => message.DataIs()); } - public static WorkflowBuilder AddExternalRequest(this WorkflowBuilder builder, ExecutorIsh source, string? id = null) + public static WorkflowBuilder AddExternalRequest(this WorkflowBuilder builder, ExecutorBinding source, string? id = null) => builder.AddExternalRequest(source, out RequestPort _, id); - public static WorkflowBuilder AddExternalRequest(this WorkflowBuilder builder, ExecutorIsh source, out RequestPort inputPort, string? id = null) + public static WorkflowBuilder AddExternalRequest(this WorkflowBuilder builder, ExecutorBinding source, out RequestPort inputPort, string? id = null) { id = id ?? $"{source.Id}.Requests[{typeof(TRequest).Name}=>{typeof(TResponse).Name}]"; @@ -86,7 +86,7 @@ public static WorkflowBuilder AddExternalRequest(this Workf return builder.AddExternalRequest(source, inputPort); } - public static WorkflowBuilder AddExternalRequest(this WorkflowBuilder builder, ExecutorIsh source, RequestPort inputPort) + public static WorkflowBuilder AddExternalRequest(this WorkflowBuilder builder, ExecutorBinding source, RequestPort inputPort) { return builder.ForwardMessage(source, inputPort) .ForwardMessage(source, inputPort) @@ -110,7 +110,7 @@ public static Workflow CreateWorkflow() Coordinator coordinator = new(); ResourceCache cache = new(); QuotaPolicyEngine policyEngine = new(); - ExecutorIsh subworkflow = CreateSubWorkflow().ConfigureSubWorkflow("ResourceWorkflow"); + ExecutorBinding subworkflow = CreateSubWorkflow().BindAsExecutor("ResourceWorkflow"); return new WorkflowBuilder(coordinator) .AddChain(coordinator, allowRepetition: true, subworkflow, coordinator) @@ -522,4 +522,26 @@ async ValueTask CountFinishedRequestAndYieldResultAsync(int state, IWorkflo return state + requests.Count; } } + + internal async ValueTask RunWorkflowHandleEventsAsync(Workflow workflow, TInput input) where TInput : notnull + { + StreamingRun run = await InProcessExecution.StreamAsync(workflow, input); + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + switch (evt) + { + case ExecutorInvokedEvent invoked: + Console.WriteLine($"Executor invoked: {invoked.ExecutorId}"); + break; + case ExecutorCompletedEvent completed: + Console.WriteLine($"Executor completed: {completed.ExecutorId}"); + break; + + // Other event types can be handled here as needed + + default: + break; + } + } + } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs index 47733ea6af..dbe3a56d06 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs @@ -212,6 +212,8 @@ internal async Task Test_RunSample_Step7Async(ExecutionEnvironment environment) string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries); Assert.Collection(lines, + line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line), + line => Assert.Contains($"{Step7EntryPoint.EchoAgentId}: {Step7EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line), line => Assert.Contains($"{HelloAgent.DefaultId}: {HelloAgent.Greeting}", line), line => Assert.Contains($"{Step7EntryPoint.EchoAgentId}: {Step7EntryPoint.EchoPrefix}{HelloAgent.Greeting}", line) ); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderSmokeTests.cs index c1ba97b0e9..b5485043b3 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderSmokeTests.cs @@ -30,9 +30,9 @@ public void Test_LateBinding_Executor() workflow.StartExecutorId.Should().Be("start"); - workflow.Registrations.Should().HaveCount(1); - workflow.Registrations.Should().ContainKey("start"); - workflow.Registrations["start"].ExecutorType.Should().Be(); + workflow.ExecutorBindings.Should().HaveCount(1); + workflow.ExecutorBindings.Should().ContainKey("start"); + workflow.ExecutorBindings["start"].ExecutorType.Should().Be(); } [Fact] @@ -45,9 +45,9 @@ public void Test_LateImplicitBinding_Executor() workflow.StartExecutorId.Should().Be("start"); - workflow.Registrations.Should().HaveCount(1); - workflow.Registrations.Should().ContainKey("start"); - workflow.Registrations["start"].ExecutorType.Should().Be(); + workflow.ExecutorBindings.Should().HaveCount(1); + workflow.ExecutorBindings.Should().ContainKey("start"); + workflow.ExecutorBindings["start"].ExecutorType.Should().Be(); } [Fact] @@ -77,9 +77,9 @@ public void Test_RebindToSameish_Allowed() workflow.StartExecutorId.Should().Be("start"); - workflow.Registrations.Should().HaveCount(1); - workflow.Registrations.Should().ContainKey("start"); - workflow.Registrations["start"].ExecutorType.Should().Be(); + workflow.ExecutorBindings.Should().HaveCount(1); + workflow.ExecutorBindings.Should().ContainKey("start"); + workflow.ExecutorBindings["start"].ExecutorType.Should().Be(); } [Fact]