1
+ using Akka . Actor ;
2
+ using Akka . Cluster . Sharding ;
3
+ using Akka . Event ;
4
+ using Akka . Persistence ;
5
+ using CqrsSqlServer . Shared ;
6
+ using CqrsSqlServer . Shared . Commands ;
7
+ using CqrsSqlServer . Shared . Events ;
8
+ using CqrsSqlServer . Shared . Queries ;
9
+
10
+ namespace CqrsSqlServer . Backend . Actors ;
11
+
12
+ /// <summary>
13
+ /// Manages the state for a given product.
14
+ /// </summary>
15
+ public sealed class ProductTotalsActor : ReceivePersistentActor
16
+ {
17
+ public static Props GetProps ( string persistenceId )
18
+ {
19
+ return Props . Create ( ( ) => new ProductTotalsActor ( persistenceId ) ) ;
20
+ }
21
+
22
+ /// <summary>
23
+ /// Used to help differentiate what type of entity this is inside Akka.Persistence's database
24
+ /// </summary>
25
+ public const string TotalsEntityNameConstant = "totals" ;
26
+
27
+ private readonly ILoggingAdapter _log = Context . GetLogger ( ) ;
28
+
29
+ public ProductTotalsActor ( string persistenceId )
30
+ {
31
+ PersistenceId = $ "{ TotalsEntityNameConstant } -" + persistenceId ;
32
+ State = new ProductState ( ) ;
33
+
34
+ Recover < SnapshotOffer > ( offer =>
35
+ {
36
+ if ( offer . Snapshot is ProductState state )
37
+ {
38
+ State = state ;
39
+ }
40
+ } ) ;
41
+
42
+ Recover < IProductEvent > ( productEvent =>
43
+ {
44
+ State = State . ProcessEvent ( productEvent ) ;
45
+ } ) ;
46
+
47
+ Command < IProductCommand > ( cmd =>
48
+ {
49
+ var response = State . ProcessCommand ( cmd ) ;
50
+ var sentResponse = false ;
51
+
52
+ if ( response . ResponseEvents . Any ( ) )
53
+ {
54
+ PersistAll ( response . ResponseEvents , productEvent =>
55
+ {
56
+ _log . Info ( "Processed: {0}" , productEvent ) ;
57
+
58
+ if ( productEvent is ProductInventoryWarningEvent warning )
59
+ {
60
+ _log . Warning ( warning . ToString ( ) ) ;
61
+ }
62
+ State = State . ProcessEvent ( productEvent ) ;
63
+
64
+ if ( ! sentResponse ) // otherwise we'll generate a response-per-event
65
+ {
66
+ sentResponse = true ;
67
+
68
+ async Task < ProductCommandResponse > ReplyToSender ( )
69
+ {
70
+ await Task . Delay ( 1 ) ;
71
+ return response ;
72
+ }
73
+ var sender = Sender ;
74
+ ReplyToSender ( ) . PipeTo ( sender , failure : ex => new Status . Failure ( ex ) ) ;
75
+
76
+ }
77
+
78
+ if ( LastSequenceNr % 10 == 0 )
79
+ SaveSnapshot ( State ) ;
80
+ } ) ;
81
+ }
82
+ else
83
+ {
84
+ Sender . Tell ( response ) ;
85
+ }
86
+ } ) ;
87
+
88
+
89
+ Command < SaveSnapshotSuccess > ( success =>
90
+ {
91
+
92
+ } ) ;
93
+
94
+ Command < FetchProduct > ( fetch =>
95
+ {
96
+ Sender . Tell ( new FetchResult ( State ) ) ;
97
+
98
+ if ( State . IsEmpty )
99
+ {
100
+ // we don't exist, so don't let `remember-entities` keep us alive
101
+ Context . Parent . Tell ( new Passivate ( PoisonPill . Instance ) ) ;
102
+ }
103
+ } ) ;
104
+ }
105
+
106
+ public override string PersistenceId { get ; }
107
+
108
+ public ProductState State { get ; set ; }
109
+ }
0 commit comments