Skip to content

Commit b329e8d

Browse files
committed
finished first pass of projector
1 parent ab5d8f4 commit b329e8d

File tree

6 files changed

+277
-12
lines changed

6 files changed

+277
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="ExponentialBackoffTimeout.cs" company="Akka.NET Project">
3+
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
4+
// </copyright>
5+
// -----------------------------------------------------------------------
6+
7+
namespace CqrsSqlServer.Backend.Actors;
8+
9+
/// <summary>
10+
/// Utility class for retries / exponential backoff
11+
/// </summary>
12+
public static class ExponentialBackoffTimeout
13+
{
14+
public static TimeSpan BackoffTimeout(int attemptCount, TimeSpan initialTimeout)
15+
{
16+
// going to keep the values small for now
17+
return initialTimeout + attemptCount * TimeSpan.FromSeconds(1);
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="ProductProjectorActor.cs" company="Akka.NET Project">
3+
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
4+
// </copyright>
5+
// -----------------------------------------------------------------------
6+
7+
using Akka.Actor;
8+
using Akka.Event;
9+
using Akka.Persistence;
10+
using Akka.Persistence.Query;
11+
using Akka.Persistence.Query.Sql;
12+
using Akka.Streams;
13+
using Akka.Streams.Dsl;
14+
using CqrsSqlServer.DataModel;
15+
using CqrsSqlServer.DataModel.Entities;
16+
using CqrsSqlServer.Shared;
17+
using CqrsSqlServer.Shared.Events;
18+
using Microsoft.Extensions.DependencyInjection;
19+
using static CqrsSqlServer.Backend.Actors.ExponentialBackoffTimeout;
20+
21+
namespace CqrsSqlServer.Backend.Actors;
22+
23+
public sealed record ProjectionFailed(Exception Cause);
24+
25+
public sealed class ProjectionCompleted
26+
{
27+
public static readonly ProjectionCompleted Instance = new();
28+
}
29+
30+
public sealed class ProjectionAck
31+
{
32+
public static readonly ProjectionAck Instance = new();
33+
}
34+
35+
public sealed class ProjectionStarting
36+
{
37+
public static readonly ProjectionStarting Instance = new();
38+
}
39+
40+
public record MaterializedViewState(Offset LastOffset);
41+
42+
public sealed class ProductProjectorActor : ReceivePersistentActor
43+
{
44+
public override string PersistenceId { get; }
45+
46+
public MaterializedViewState CurrentState { get; set; }
47+
private const int MaxRetryAttempts = 3;
48+
private readonly ILoggingAdapter _log = Context.GetLogger();
49+
private readonly IServiceProvider _serviceProvider;
50+
51+
public ProductProjectorActor(IServiceProvider serviceProvider)
52+
{
53+
_serviceProvider = serviceProvider;
54+
PersistenceId = "product-projector";
55+
56+
Recovers();
57+
}
58+
59+
private void Recovers()
60+
{
61+
Recover<MaterializedViewState>(s => { CurrentState = s; });
62+
63+
Recover<SnapshotOffer>(offer =>
64+
{
65+
if (offer.Snapshot is MaterializedViewState state)
66+
{
67+
CurrentState = state;
68+
}
69+
});
70+
}
71+
72+
protected override void OnReplaySuccess()
73+
{
74+
var readJournal = PersistenceQuery.Get(Context.System)
75+
.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
76+
var self = Self;
77+
var sink = Sink.ActorRefWithAck<EventEnvelope>(self, ProjectionStarting.Instance, ProjectionAck.Instance,
78+
ProjectionCompleted.Instance, ex => new ProjectionFailed(ex));
79+
80+
readJournal.EventsByTag(MessageTagger.ProductEventTag, CurrentState.LastOffset)
81+
.RunWith(sink, Context.Materializer());
82+
}
83+
84+
private void Commands()
85+
{
86+
CommandAsync<EventEnvelope>(async e =>
87+
{
88+
_log.Info("Received envelope with offset [{0}]", e.Offset);
89+
var currentOffset = e.Offset;
90+
91+
if (e.Event is IProductEvent publisherEvent)
92+
{
93+
var initialTime = TimeSpan.FromSeconds(3);
94+
var attempts = 0;
95+
while (attempts < MaxRetryAttempts)
96+
{
97+
var timeout = BackoffTimeout(attempts, initialTime);
98+
var ex = await TryProcess(publisherEvent, timeout);
99+
if (ex.HasValue)
100+
{
101+
_log.Warning(ex.Value,
102+
"Failed to project event [{0}] on attempt #[{1}] after [{2}] seconds. Retrying in [{3}] with [{4}] attempts remaining.",
103+
publisherEvent, attempts, timeout, BackoffTimeout(attempts + 1, initialTime),
104+
MaxRetryAttempts - attempts);
105+
attempts++;
106+
}
107+
else
108+
{
109+
// success
110+
PersistAndAck(currentOffset, publisherEvent);
111+
return;
112+
}
113+
}
114+
115+
if (attempts == MaxRetryAttempts)
116+
throw new ApplicationException(
117+
$"Unable to process [{publisherEvent}] after [{MaxRetryAttempts}] - crashing projection process");
118+
}
119+
else
120+
{
121+
_log.Warning(
122+
"Unsupported event [{0}] at offset [{1}] found by projector. Maybe this was tagged incorrectly?",
123+
e.Event, e.Offset);
124+
125+
// don't bother persisting here - move onto the next events in the buffer.
126+
Sender.Tell(ProjectionAck.Instance);
127+
}
128+
});
129+
130+
Command<ProjectionStarting>(_ =>
131+
{
132+
_log.Info("Projection for Tag [{0}] is starting from Offset [{1}]", MessageTagger.ProductEventTag,
133+
CurrentState.LastOffset);
134+
Sender.Tell(ProjectionAck.Instance);
135+
});
136+
137+
Command<ProjectionCompleted>(_ =>
138+
{
139+
_log.Info("Projection completed for Tag [{0}] at Offset [{1}]", MessageTagger.ProductEventTag,
140+
CurrentState.LastOffset);
141+
});
142+
143+
Command<ProjectionFailed>(failed =>
144+
{
145+
var val = 0L;
146+
if (CurrentState.LastOffset is Sequence seq)
147+
val = seq.Value;
148+
_log.Error(failed.Cause, "Projection FAILED for Tag [{0}] at Offset [{1}]",
149+
MessageTagger.ProductEventTag, val);
150+
throw new ApplicationException("Projection failed due to error. See InnerException for details.",
151+
failed.Cause);
152+
});
153+
154+
Command<SaveSnapshotSuccess>(success =>
155+
{
156+
// purge older snapshots and messages
157+
DeleteMessages(success.Metadata.SequenceNr);
158+
DeleteSnapshots(new SnapshotSelectionCriteria(success.Metadata.SequenceNr - 1));
159+
});
160+
}
161+
162+
private async Task<Akka.Util.Option<Exception>> TryProcess(IProductEvent pve,
163+
TimeSpan timeout)
164+
{
165+
try
166+
{
167+
using var cts = new CancellationTokenSource(timeout);
168+
using var scope = _serviceProvider.CreateScope();
169+
await using var context = scope.ServiceProvider.GetRequiredService<CqrsSqlServerContext>();
170+
171+
switch (pve)
172+
{
173+
case ProductCreated created:
174+
await UpdateProductDefinitionAsync(created, context, cts.Token);
175+
return Akka.Util.Option<Exception>.None;
176+
case ProductSold sold:
177+
await UpdateProductSoldAsync(sold, context, cts.Token);
178+
return Akka.Util.Option<Exception>.None;
179+
case ProductInventoryChanged changed:
180+
await UpdateProductInventoryAsync(changed, context, cts.Token);
181+
return Akka.Util.Option<Exception>.None;
182+
}
183+
184+
return Akka.Util.Option<Exception>.None;
185+
}
186+
catch (Exception ex)
187+
{
188+
return ex;
189+
}
190+
}
191+
192+
private async Task UpdateProductInventoryAsync(ProductInventoryChanged changed, CqrsSqlServerContext context, CancellationToken ctsToken)
193+
{
194+
var productListing = await context.Products.FindAsync([changed.ProductId], cancellationToken: ctsToken);
195+
if (productListing != null)
196+
{
197+
productListing.AllInventory += changed.Quantity;
198+
productListing.LastModified = changed.Timestamp;
199+
await context.SaveChangesAsync(ctsToken);
200+
}
201+
}
202+
203+
private static async Task UpdateProductSoldAsync(ProductSold sold, CqrsSqlServerContext context, CancellationToken ctsToken)
204+
{
205+
var productListing = await context.Products.FindAsync([sold.ProductId], cancellationToken: ctsToken);
206+
if (productListing != null)
207+
{
208+
productListing.SoldUnits += sold.Order.Quantity;
209+
productListing.TotalRevenue += sold.TotalPrice;
210+
productListing.LastModified = sold.Order.Timestamp;
211+
await context.SaveChangesAsync(ctsToken);
212+
}
213+
}
214+
215+
private static async Task UpdateProductDefinitionAsync(ProductCreated created, CqrsSqlServerContext context,
216+
CancellationToken ct = default)
217+
{
218+
var productListing = new ProductListing
219+
{
220+
ProductId = created.ProductId,
221+
ProductName = created.ProductName,
222+
Price = created.Price,
223+
Created = DateTime.UtcNow
224+
};
225+
226+
var existing = await context.Products.FindAsync([created.ProductId], cancellationToken: ct);
227+
if (existing == null)
228+
{
229+
await context.AddAsync(productListing, ct);
230+
await context.SaveChangesAsync(ct);
231+
}
232+
}
233+
234+
private void PersistAndAck(Offset currentOffset, IProductEvent pve)
235+
{
236+
var nextState = new MaterializedViewState(LastOffset: currentOffset);
237+
Persist(nextState, state =>
238+
{
239+
CurrentState = state;
240+
_log.Info("Successfully processed event [{0}] - projection state updated to [{1}]", pve,
241+
currentOffset);
242+
Sender.Tell(ProjectionAck.Instance);
243+
244+
if (LastSequenceNr % 10 == 0)
245+
{
246+
SaveSnapshot(CurrentState);
247+
}
248+
});
249+
}
250+
}

src/cqrs/cqrs-sqlserver/CqrsSqlServer.DataModel/Entities/ProductListing.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class ProductListing
1414

1515
public decimal Price { get; set; }
1616

17-
public int RemainingInventory { get; set; }
17+
public int AllInventory { get; set; }
1818

1919
public int SoldUnits { get; set; }
2020

src/cqrs/cqrs-sqlserver/CqrsSqlServer.DataModel/Migrations/CqrsSqlServerContextModelSnapshot.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ protected override void BuildModel(ModelBuilder modelBuilder)
4242
.HasMaxLength(256)
4343
.HasColumnType("nvarchar(256)");
4444

45-
b.Property<int>("RemainingInventory")
45+
b.Property<int>("AllInventory")
4646
.HasColumnType("int");
4747

4848
b.Property<int>("SoldUnits")

src/cqrs/cqrs-sqlserver/CqrsSqlServer.Shared/MessageTagger.cs

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33

44
namespace CqrsSqlServer.Shared;
55

6-
public class MessageTagger : IWriteEventAdapter
6+
public sealed class MessageTagger : IWriteEventAdapter
77
{
88
public const string ChangedEventTag = "Changed";
99
public const string SoldEventTag = "Sold";
1010
public const string WarningEventTag = "Warning";
11+
public const string ProductEventTag = "ProductEvent";
1112

1213
public string Manifest(object evt)
1314
{
@@ -18,9 +19,9 @@ public object ToJournal(object evt)
1819
{
1920
return evt switch
2021
{
21-
ProductInventoryChanged pic => new Tagged(pic, new[] { ChangedEventTag, pic.Reason.ToString() }),
22-
ProductSold sold => new Tagged(sold, new[] { SoldEventTag }),
23-
ProductInventoryWarningEvent warning => new Tagged(warning, new [] { WarningEventTag }),
22+
ProductInventoryChanged pic => new Tagged(pic, new[] { ProductEventTag, ChangedEventTag, pic.Reason.ToString() }),
23+
ProductSold sold => new Tagged(sold, new[] {ProductEventTag, SoldEventTag }),
24+
ProductInventoryWarningEvent warning => new Tagged(warning, new [] { ProductEventTag, WarningEventTag }),
2425
_ => evt
2526
};
2627
}

src/cqrs/cqrs-sqlserver/CqrsSqlServer.Shared/ProductState.cs

+1-6
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,7 @@ public static ProductState ProcessEvent(this ProductState productState, IProduct
133133
{
134134
return productState with
135135
{
136-
Data = productState.Data with
137-
{
138-
ProductId = productId,
139-
CurrentPrice = price,
140-
ProductName = productName
141-
}
136+
Data = new ProductData(ProductId: productId, CurrentPrice: price, ProductName: productName)
142137
};
143138
}
144139
case ProductInventoryChanged(var productId, var quantity, var timestamp, var inventoryChangeReason) @event:

0 commit comments

Comments
 (0)