-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathDocumentIndexingWorkflow.cs
More file actions
189 lines (169 loc) · 8.16 KB
/
DocumentIndexingWorkflow.cs
File metadata and controls
189 lines (169 loc) · 8.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// DocumentIndexingWorkflow — sequential embedding of text chunks via DurableEmbeddingGenerator,
// where each GenerateAsync call becomes an independently retried Temporal activity.
using Microsoft.Extensions.AI;
using Temporalio.Common;
using Temporalio.Extensions.AI;
using Temporalio.Workflows;
namespace DurableEmbeddings;
/// <summary>
/// Input for <see cref="DocumentIndexingWorkflow"/>.
/// </summary>
public sealed class DocumentIndexingInput
{
/// <summary>
/// The text chunks to embed. Each chunk becomes a separate Temporal activity.
/// </summary>
public required IReadOnlyList<string> Chunks { get; init; }
/// <summary>
/// Activity start-to-close timeout forwarded to DurableEmbeddingGenerator.
/// </summary>
public TimeSpan ActivityTimeout { get; init; } = TimeSpan.FromMinutes(2);
/// <summary>
/// The embedding model id. Forwarded into <see cref="EmbeddingGenerationOptions.ModelId"/>
/// on every <c>GenerateAsync</c> call so the Temporal Web UI activity summary
/// (populated by <c>DurableEmbeddingGenerator</c> from <c>options.ModelId</c>) is non-empty.
/// </summary>
public required string ModelId { get; init; }
}
/// <summary>
/// Result of embedding a batch of text chunks.
/// </summary>
public sealed class DocumentIndexingResult
{
/// <summary>
/// The original input chunks, in order.
/// </summary>
public required IReadOnlyList<string> Chunks { get; init; }
/// <summary>
/// The number of dimensions in each embedding vector.
/// </summary>
public required int Dimensions { get; init; }
/// <summary>
/// Dot-product similarity between the first two embeddings (if at least two chunks were given).
/// A higher value means the texts are more semantically similar.
/// </summary>
public double? FirstPairSimilarity { get; init; }
}
/// <summary>
/// A Temporal workflow that indexes a batch of text chunks by generating an embedding
/// for each one as a separate, independently retried Temporal activity.
/// </summary>
/// <remarks>
/// <para>
/// This is a realistic RAG (Retrieval Augmented Generation) indexing scenario.
/// In production you would persist the returned vectors to a vector database
/// (e.g. Qdrant, pgvector, Azure AI Search) after the workflow completes.
/// </para>
/// <para>
/// The workflow creates a <see cref="DurableEmbeddingGenerator"/> with
/// <see cref="DurableExecutionOptions"/> derived from the workflow input.
/// The inner generator passed to the constructor is a stub — it is never called
/// because <c>Workflow.InWorkflow == true</c> causes every <c>GenerateAsync</c>
/// call to be dispatched as a Temporal activity instead.
/// </para>
/// </remarks>
[Workflow]
public sealed class DocumentIndexingWorkflow
{
[WorkflowRun]
public async Task<DocumentIndexingResult> RunAsync(DocumentIndexingInput input)
{
// Build a DurableEmbeddingGenerator configured with the activity timeout
// from the workflow input. The stub inner generator is never called inside
// a workflow — Workflow.InWorkflow == true causes GenerateAsync to dispatch
// to DurableEmbeddingActivities instead.
//
// RetryPolicy: DurableEmbeddingGenerator only forwards the policy when non-null;
// without it Temporal applies the server default (retry forever). Embeddings are
// idempotent so retries are safe, but a bounded cap of 3 attempts is appropriate
// — persistent failure should fail the workflow rather than spin indefinitely.
var options = new DurableExecutionOptions
{
ActivityTimeout = input.ActivityTimeout,
// HeartbeatTimeout defaults to 2 minutes. The activity heartbeats once before
// GenerateAsync (see DurableEmbeddingActivities.cs:47) — fine for short calls,
// but for slow self-hosted models or large batches consider raising
// HeartbeatTimeout or adding a background heartbeat (see HumanInTheLoop sample).
RetryPolicy = new RetryPolicy { MaximumAttempts = 3 },
};
// EmbeddingGenerationOptions.ModelId drives the Temporal Web UI activity summary
// (see DurableEmbeddingGenerator.BuildActivitySummary). EmbeddingGenerationOptions
// is a plain MEAI DTO, safe to construct inside a workflow.
var embeddingOptions = new EmbeddingGenerationOptions { ModelId = input.ModelId };
// NullEmbeddingGenerator is a throw-stub: never invoked at runtime because
// DurableEmbeddingGenerator short-circuits to activity dispatch when
// Workflow.InWorkflow == true. The constructor argument is required to
// satisfy the wrapper's signature.
var generator = new DurableEmbeddingGenerator(
new NullEmbeddingGenerator(),
options);
// Generate one embedding per chunk as a separate Temporal activity.
//
// Why one call per chunk instead of batching them all together?
//
// • Independent retry: if chunk #3 fails, only chunk #3 is retried.
// Chunks #1 and #2 replay from workflow history — no extra API calls.
//
// • Deterministic replay: Temporal replays workflow code on worker restart.
// Each completed activity result is read from history, so already-embedded
// chunks are never sent to the model again.
//
// • Progress visibility: each activity appears individually in the Temporal
// UI, making it easy to see how far along a large indexing job is.
var embeddings = new List<Embedding<float>>(input.Chunks.Count);
foreach (var chunk in input.Chunks)
{
// This call dispatches to DurableEmbeddingActivities.GenerateAsync
// because Workflow.InWorkflow == true. On the worker side, the
// activity resolves IEmbeddingGenerator<string, Embedding<float>>
// from DI and calls the real OpenAI embeddings endpoint.
var result = await generator.GenerateAsync([chunk], embeddingOptions);
embeddings.Add(result[0]);
}
// Compute a dot-product similarity between the first two embeddings to
// show they capture different semantic content.
double? similarity = null;
if (embeddings.Count >= 2)
{
similarity = DotProduct(embeddings[0].Vector.Span, embeddings[1].Vector.Span);
}
return new DocumentIndexingResult
{
Chunks = input.Chunks,
Dimensions = embeddings.Count > 0 ? embeddings[0].Vector.Length : 0,
FirstPairSimilarity = similarity,
};
}
// Dot product of two float vectors — a simple cosine-similarity proxy when
// both vectors are unit-normalised (which OpenAI embeddings are).
private static double DotProduct(ReadOnlySpan<float> a, ReadOnlySpan<float> b)
{
double sum = 0;
for (int i = 0; i < a.Length; i++)
{
sum += a[i] * b[i];
}
return sum;
}
}
/// <summary>
/// A no-op <see cref="IEmbeddingGenerator{String, Embedding}"/> used as the inner
/// generator for <see cref="DurableEmbeddingGenerator"/> inside a Temporal workflow.
/// It is never called: <c>Workflow.InWorkflow == true</c> causes the durable wrapper
/// to dispatch to <c>DurableEmbeddingActivities</c> before reaching this generator.
/// </summary>
internal sealed class NullEmbeddingGenerator : IEmbeddingGenerator<string, Embedding<float>>
{
public EmbeddingGeneratorMetadata Metadata { get; } =
new EmbeddingGeneratorMetadata("null", null, null, null);
public Task<GeneratedEmbeddings<Embedding<float>>> GenerateAsync(
IEnumerable<string> values,
EmbeddingGenerationOptions? options = null,
CancellationToken cancellationToken = default)
=> throw new InvalidOperationException(
"NullEmbeddingGenerator should never be called inside a Temporal workflow. " +
"DurableEmbeddingGenerator dispatches to DurableEmbeddingActivities when " +
"Workflow.InWorkflow == true.");
public object? GetService(Type serviceType, object? serviceKey = null) => null;
public void Dispose() { }
}