Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public static class Program
private static async Task Main()
{
// Create the executors
UppercaseExecutor uppercase = new();
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");

ReverseTextExecutor reverse = new();

// Build the workflow by connecting executors sequentially
Expand All @@ -40,23 +42,6 @@ private static async Task Main()
}
}

/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
internal sealed class UppercaseExecutor() : Executor<string, string>("UppercaseExecutor")
{
/// <summary>
/// Processes the input message by converting it to uppercase.
/// </summary>
/// <param name="message">The input text to convert</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>The input text converted to uppercase</returns>
public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default) =>
ValueTask.FromResult(message.ToUpperInvariant()); // The return value will be sent as a message along an edge to subsequent executors
}

/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static async Task Main()
.Build();

// Step 2: Configure the sub-workflow as an executor for use in the parent workflow
ExecutorIsh subWorkflowExecutor = subWorkflow.ConfigureSubWorkflow("TextProcessingSubWorkflow");
ExecutorBinding subWorkflowExecutor = subWorkflow.BindAsExecutor("TextProcessingSubWorkflow");

// Step 3: Build a main workflow that uses the sub-workflow as an executor
Console.WriteLine("Building main workflow that uses the sub-workflow as an executor...\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ I cannot process this request as it appears to contain unsafe content.

## What You'll Learn

1. **How to mix executors and agents** - Understanding that both are treated as `ExecutorIsh` internally
1. **How to mix executors and agents** - Understanding that both are treated as `ExecutorBinding` internally
2. **When to use executors vs agents** - Executors for deterministic logic, agents for AI-powered decisions
3. **How to process agent outputs** - Using executors to sync, format, or aggregate agent responses
4. **Building complex pipelines** - Chaining multiple heterogeneous components together
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ public void Connect(IModeledAction source, IModeledAction target, Func<object?,
Debug.WriteLine($"> CONNECT: {source.Id} => {target.Id}{(condition is null ? string.Empty : " (?)")}");

this.WorkflowBuilder.AddEdge(
GetExecutorIsh(source),
GetExecutorIsh(target),
GetExecutorBinding(source),
GetExecutorBinding(target),
condition);
}

private static ExecutorIsh GetExecutorIsh(IModeledAction action) =>
private static ExecutorBinding GetExecutorBinding(IModeledAction action) =>
action switch
{
RequestPortAction port => port.RequestPort,
Expand Down
27 changes: 27 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/AIAgentBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft. All rights reserved.

using Microsoft.Agents.AI.Workflows.Specialized;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// Represents the workflow binding details for an AI agent, including configuration options for event emission.
/// </summary>
/// <param name="Agent">The AI agent.</param>
/// <param name="EmitEvents">Specifies whether the agent should emit events. If null, the default behavior is applied.</param>
public record AIAgentBinding(AIAgent Agent, bool EmitEvents = false)
: ExecutorBinding(Throw.IfNull(Agent).Name ?? Throw.IfNull(Agent.Id),
(_) => new(new AIAgentHostExecutor(Agent, EmitEvents)),
typeof(AIAgentHostExecutor),
Agent)
{
/// <inheritdoc/>
public override bool IsSharedInstance => false;

/// <inheritdoc/>
public override bool SupportsConcurrentSharedExecution => true;

/// <inheritdoc/>
public override bool SupportsResetting => false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static Workflow BuildSequentialCore(string? workflowName, params IEnumer
// Create a builder that chains the agents together in sequence. The workflow simply begins
// with the first agent in the sequence.
WorkflowBuilder? builder = null;
ExecutorIsh? previous = null;
ExecutorBinding? previous = null;
foreach (var agent in agents)
{
AgentRunStreamingExecutor agentExecutor = new(agent, includeInputInOutput: true);
Expand Down Expand Up @@ -125,8 +125,8 @@ private static Workflow BuildConcurrentCore(
// so that the final accumulator receives a single list of messages from each agent. Otherwise, the
// accumulator would not be able to determine what came from what agent, as there's currently no
// provenance tracking exposed in the workflow context passed to a handler.
ExecutorIsh[] agentExecutors = (from agent in agents select (ExecutorIsh)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray();
ExecutorIsh[] accumulators = [.. from agent in agentExecutors select (ExecutorIsh)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")];
ExecutorBinding[] agentExecutors = (from agent in agents select (ExecutorBinding)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray();
ExecutorBinding[] accumulators = [.. from agent in agentExecutors select (ExecutorBinding)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")];
builder.AddFanOutEdge(start, targets: agentExecutors);
for (int i = 0; i < agentExecutors.Length; i++)
{
Expand All @@ -141,7 +141,7 @@ private static Workflow BuildConcurrentCore(
Func<string, string, ValueTask<ConcurrentEndExecutor>> endFactory =
(string _, string __) => new(new ConcurrentEndExecutor(agentExecutors.Length, aggregator));

ExecutorIsh end = endFactory.ConfigureFactory(ConcurrentEndExecutor.ExecutorId);
ExecutorBinding end = endFactory.BindExecutor(ConcurrentEndExecutor.ExecutorId);

builder.AddFanInEdge(end, sources: accumulators);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public bool IsMatch(Executor executor) =>
this.ExecutorType.IsMatch(executor.GetType())
&& this.ExecutorId == executor.Id;

public bool IsMatch(ExecutorRegistration registration) =>
this.ExecutorType.IsMatch(registration.ExecutorType)
&& this.ExecutorId == registration.Id;
public bool IsMatch(ExecutorBinding binding) =>
this.ExecutorType.IsMatch(binding.ExecutorType)
&& this.ExecutorId == binding.Id;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ namespace Microsoft.Agents.AI.Workflows.Checkpointing;

internal static class RepresentationExtensions
{
public static ExecutorInfo ToExecutorInfo(this ExecutorRegistration registration)
public static ExecutorInfo ToExecutorInfo(this ExecutorBinding binding)
{
Throw.IfNull(registration);
return new ExecutorInfo(new TypeId(registration.ExecutorType), registration.Id);
Throw.IfNull(binding);
return new ExecutorInfo(new TypeId(binding.ExecutorType), binding.Id);
}

public static EdgeInfo ToEdgeInfo(this Edge edge)
Expand All @@ -38,8 +38,8 @@ public static WorkflowInfo ToWorkflowInfo(this Workflow workflow)
Throw.IfNull(workflow);

Dictionary<string, ExecutorInfo> executors =
workflow.Registrations.Values.ToDictionary(
keySelector: registration => registration.Id,
workflow.ExecutorBindings.Values.ToDictionary(
keySelector: binding => binding.Id,
elementSelector: ToExecutorInfo);

Dictionary<string, List<EdgeInfo>> edges = workflow.Edges.Keys.ToDictionary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ internal sealed class WorkflowInfo
{
[JsonConstructor]
internal WorkflowInfo(
Dictionary<string, ExecutorInfo> executors,
Dictionary<string, ExecutorInfo> executorBindings,
Dictionary<string, List<EdgeInfo>> edges,
HashSet<RequestPortInfo> requestPorts,
string startExecutorId,
HashSet<string>? outputExecutorIds)
{
this.Executors = Throw.IfNull(executors);
this.ExecutorBindings = Throw.IfNull(executorBindings);
this.Edges = Throw.IfNull(edges);
this.RequestPorts = Throw.IfNull(requestPorts);

this.StartExecutorId = Throw.IfNullOrEmpty(startExecutorId);
this.OutputExecutorIds = outputExecutorIds ?? [];
}

public Dictionary<string, ExecutorInfo> Executors { get; }
public Dictionary<string, ExecutorInfo> ExecutorBindings { get; }
public Dictionary<string, List<EdgeInfo>> Edges { get; }
public HashSet<RequestPortInfo> RequestPorts { get; }

Expand All @@ -47,10 +47,10 @@ public bool IsMatch(Workflow workflow)
}

// Validate the executors
if (workflow.Registrations.Count != this.Executors.Count ||
this.Executors.Keys.Any(
executorId => workflow.Registrations.TryGetValue(executorId, out ExecutorRegistration? registration)
&& !this.Executors[executorId].IsMatch(registration)))
if (workflow.ExecutorBindings.Count != this.ExecutorBindings.Count ||
this.ExecutorBindings.Keys.Any(
executorId => workflow.ExecutorBindings.TryGetValue(executorId, out ExecutorBinding? registration)
&& !this.ExecutorBindings[executorId].IsMatch(registration)))
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Agents.AI.Workflows;

// TODO: Unwrap the Configured object, just like for SubworkflowBinding
internal record ConfiguredExecutorBinding(Configured<Executor> ConfiguredExecutor, Type ExecutorType)
: ExecutorBinding(Throw.IfNull(ConfiguredExecutor).Id,
ConfiguredExecutor.BoundFactoryAsync,
ExecutorType,
ConfiguredExecutor.Raw)
{
/// <inheritdoc/>
public override bool IsSharedInstance { get; } = ConfiguredExecutor.Raw is Executor;

protected override async ValueTask<bool> ResetCoreAsync()
{
if (this.ConfiguredExecutor.Raw is IResettableExecutor resettable)
{
await resettable.ResetAsync().ConfigureAwait(false);
}

return false;
}

/// <inheritdoc/>
public override bool SupportsConcurrentSharedExecution => true;

/// <inheritdoc/>
public override bool SupportsResetting => false;
}
2 changes: 2 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public abstract class Executor : IIdentified
private static readonly string s_namespace = typeof(Executor).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);

// TODO: Add overloads for binding with a configuration/options object once the Configured<T> hierarchy goes away.

/// <summary>
/// Initialize the executor with a unique identifier
/// </summary>
Expand Down
129 changes: 129 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/ExecutorBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;

namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// Represents the binding information for a workflow executor, including its identifier, factory method, type, and
/// optional raw value.
/// </summary>
/// <param name="Id">The unique identifier for the executor in the workflow.</param>
/// <param name="FactoryAsync">A factory function that creates an instance of the executor. The function accepts two string parameters and returns
/// a ValueTask containing the created Executor instance.</param>
/// <param name="ExecutorType">The type of the executor. Must be a type derived from Executor.</param>
/// <param name="RawValue">An optional raw value associated with the binding.</param>
public abstract record class ExecutorBinding(string Id, Func<string, ValueTask<Executor>>? FactoryAsync, Type ExecutorType, object? RawValue = null)
: IIdentified,
IEquatable<IIdentified>,
IEquatable<string>
{
/// <summary>
/// Gets a value indicating whether the binding is a placeholder (i.e., does not have a factory method defined).
/// </summary>
[MemberNotNullWhen(false, nameof(FactoryAsync))]
public bool IsPlaceholder => this.FactoryAsync == null;

/// <summary>
/// Gets a value whether the executor created from this binding is a shared instance across all runs.
/// </summary>
public abstract bool IsSharedInstance { get; }

/// <summary>
/// Gets a value whether instances of the executor created from this binding can be used in concurrent runs
/// from the same <see cref="Workflow"/> instance.
/// </summary>
public abstract bool SupportsConcurrentSharedExecution { get; }

/// <summary>
/// Gets a value whether instances of the executor created from this binding can be reset between subsequent
/// runs from the same <see cref="Workflow"/> instance. This value is not relevant for executors that <see
/// cref="SupportsConcurrentSharedExecution"/>.
/// </summary>
public abstract bool SupportsResetting { get; }

/// <inheritdoc/>
public override string ToString() => $"{this.Id}:{(this.IsPlaceholder ? ":<unbound>" : this.ExecutorType.Name)}";

private Executor CheckId(Executor executor)
{
if (executor.Id != this.Id)
{
throw new InvalidOperationException(
$"Executor ID mismatch: expected '{this.Id}', but got '{executor.Id}'.");
}

return executor;
}

internal async ValueTask<Executor> CreateInstanceAsync(string runId)
=> !this.IsPlaceholder
? this.CheckId(await this.FactoryAsync(runId).ConfigureAwait(false))
: throw new InvalidOperationException(
$"Cannot create executor with ID '{this.Id}': Binding ({this.GetType().Name}) is a placeholder.");

/// <inheritdoc/>
public virtual bool Equals(ExecutorBinding? other) =>
other is not null && other.Id == this.Id;

/// <inheritdoc/>
public bool Equals(IIdentified? other) =>
other is not null && other.Id == this.Id;

/// <inheritdoc/>
public bool Equals(string? other) =>
other is not null && other == this.Id;

internal ValueTask<bool> TryResetAsync()
{
// Non-shared instances do not need resetting
if (!this.IsSharedInstance)
{
return new(true);
}

// If the executor supports concurrent use, then resetting is a no-op.
if (!this.SupportsResetting)
{
return new(false);
}

return this.ResetCoreAsync();
}

/// <summary>
/// Resets the executor's shared resources to their initial state. Must be overridden by bindings that support
/// resetting.
/// </summary>
/// <exception cref="InvalidOperationException"></exception>
protected virtual ValueTask<bool> ResetCoreAsync() => throw new InvalidOperationException("ExecutorBindings that support resetting must override ResetCoreAsync()");

/// <inheritdoc/>
public override int GetHashCode() => this.Id.GetHashCode();

/// <summary>
/// Defines an implicit conversion from an Executor to a <see cref="ExecutorBinding"/>.
/// </summary>
/// <param name="executor">The Executor instance to convert.</param>
public static implicit operator ExecutorBinding(Executor executor) => executor.BindExecutor();

/// <summary>
/// Defines an implicit conversion from a string identifier to an <see cref="ExecutorPlaceholder"/>.
/// </summary>
/// <param name="id">The string identifier to convert to a placeholder.</param>
public static implicit operator ExecutorBinding(string id) => new ExecutorPlaceholder(id);

/// <summary>
/// Defines an implicit conversion from a <see cref="RequestPort "/>to an <see cref="ExecutorBinding"/>.
/// </summary>
/// <param name="port">The RequestPort instance to convert.</param>
public static implicit operator ExecutorBinding(RequestPort port) => port.BindAsExecutor();

/// <summary>
/// Defines an implicit conversion from an <see cref="AIAgent"/> to an <see cref="ExecutorBinding"/> instance.
/// </summary>
/// <param name="agent"></param>
public static implicit operator ExecutorBinding(AIAgent agent) => agent.BindAsExecutor();
}
Loading
Loading