|
4 | 4 | using System.Collections.Generic; |
5 | 5 | using System.Linq; |
6 | 6 | using System.Runtime.CompilerServices; |
| 7 | +using System.Text.Json; |
7 | 8 | using System.Threading; |
8 | 9 | using System.Threading.Tasks; |
9 | 10 | using Microsoft.Agents.AI.Hosting.OpenAI.Responses.Converters; |
10 | 11 | using Microsoft.Agents.AI.Hosting.OpenAI.Responses.Models; |
11 | 12 | using Microsoft.Agents.AI.Hosting.OpenAI.Responses.Streaming; |
| 13 | +using Microsoft.Agents.AI.Workflows; |
12 | 14 | using Microsoft.Extensions.AI; |
13 | 15 |
|
14 | 16 | namespace Microsoft.Agents.AI.Hosting.OpenAI.Responses; |
@@ -50,6 +52,13 @@ internal static async IAsyncEnumerable<StreamingResponseEvent> ToStreamingRespon |
50 | 52 | cancellationToken.ThrowIfCancellationRequested(); |
51 | 53 | var update = updateEnumerator.Current; |
52 | 54 |
|
| 55 | + // Special-case for agent framework workflow events. |
| 56 | + if (update.RawRepresentation is WorkflowEvent workflowEvent) |
| 57 | + { |
| 58 | + yield return CreateWorkflowEventResponse(workflowEvent, seq.Increment(), outputIndex); |
| 59 | + continue; |
| 60 | + } |
| 61 | + |
53 | 62 | if (!IsSameMessage(update, previousUpdate)) |
54 | 63 | { |
55 | 64 | // Finalize the current generator when moving to a new message. |
@@ -192,4 +201,47 @@ static bool IsSameValue(string? str1, string? str2) => |
192 | 201 | static bool IsSameRole(ChatRole? value1, ChatRole? value2) => |
193 | 202 | !value1.HasValue || !value2.HasValue || value1.Value == value2.Value; |
194 | 203 | } |
| 204 | + |
| 205 | + private static StreamingWorkflowEventComplete CreateWorkflowEventResponse(WorkflowEvent workflowEvent, int sequenceNumber, int outputIndex) |
| 206 | + { |
| 207 | + // Extract executor_id if this is an ExecutorEvent |
| 208 | + string? executorId = null; |
| 209 | + if (workflowEvent is ExecutorEvent execEvent) |
| 210 | + { |
| 211 | + executorId = execEvent.ExecutorId; |
| 212 | + } |
| 213 | + JsonElement eventData; |
| 214 | + if (JsonSerializer.IsReflectionEnabledByDefault) |
| 215 | + { |
| 216 | + var eventDataDict = new Dictionary<string, object?> |
| 217 | + { |
| 218 | + ["event_type"] = workflowEvent.GetType().Name, |
| 219 | + ["data"] = workflowEvent.Data, |
| 220 | + ["executor_id"] = executorId, |
| 221 | + ["timestamp"] = DateTime.UtcNow.ToString("O") |
| 222 | + }; |
| 223 | + |
| 224 | +#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code |
| 225 | +#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. |
| 226 | + eventData = JsonSerializer.SerializeToElement(eventDataDict, ResponsesJsonSerializerOptions.Default); |
| 227 | +#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. |
| 228 | +#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code |
| 229 | + } |
| 230 | + else |
| 231 | + { |
| 232 | + eventData = JsonSerializer.SerializeToElement( |
| 233 | + "Unsupported. Workflow event serialization is currently only supported when JsonSerializer.IsReflectionEnabledByDefault is true.", |
| 234 | + ResponsesJsonContext.Default.String); |
| 235 | + } |
| 236 | + |
| 237 | + // Create the properly typed streaming workflow event |
| 238 | + return new StreamingWorkflowEventComplete |
| 239 | + { |
| 240 | + SequenceNumber = sequenceNumber, |
| 241 | + OutputIndex = outputIndex, |
| 242 | + Data = eventData, |
| 243 | + ExecutorId = executorId, |
| 244 | + ItemId = $"wf_{Guid.NewGuid().ToString("N")[..8]}" |
| 245 | + }; |
| 246 | + } |
195 | 247 | } |
0 commit comments