-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregate.go
162 lines (133 loc) · 3.59 KB
/
aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package stream
import (
"fmt"
"sync"
)
// Aggregate tracks Root state, by keeping Sequence of changes while read and write
// of uncommitted events.
// Events are generated by Root and passed to EventStore, on the other hand state
// of Root rebuild from EventStore to Root
// To change state of Root
type Aggregate[R Root] struct {
mu sync.Mutex
root R
sequence Sequence
uncommitted Events
}
func NewAggregate[R Root](id ID, nr NewRoot[R], definitions []event) (*Aggregate[R], error) {
var a = Aggregate[R]{sequence: id.Sequence()}
var err error
if nr == nil {
return nil, Err("new aggregate requires non-nil NewRoot[R]")
}
if a.root, err = nr(id.UUID().String()); err != nil {
return nil, Err("new aggregate instance failed %w", err)
}
if err = registry.set(definitions, id.Type()); err != nil {
return nil, Err("new aggregate events definition registration failed %w", err)
}
return &a, err
}
func MustAggregate[R Root](id ID, nr NewRoot[R], definitions []event) *Aggregate[R] {
a, err := NewAggregate(id, nr, definitions)
if err != nil {
panic(err)
}
return a
}
func (a *Aggregate[R]) ID() string {
return a.sequence.ID().UUID().String()
}
func (a *Aggregate[R]) Version() int64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.sequence.Number()
}
func (a *Aggregate[R]) ReadFrom(es EventStore) error {
a.mu.Lock()
defer a.mu.Unlock()
var rw, events, n = es.ReadWriter(a.sequence), make(Events, 1024), 0
var err error
for {
switch n, err = rw.ReadAt(events, a.sequence.Number()); {
case err == ErrEndOfStream || err == nil:
if failed := a.commit(events[:n]); failed != nil {
return failed
}
if err == nil {
continue
}
return nil
default:
return Err("%s root read failed due %w", a.sequence, err)
}
}
}
// Run todo recover panic from Command
func (a *Aggregate[R]) Run(c Command[R]) error {
a.mu.Lock()
defer a.mu.Unlock()
var err error
if a.uncommitted.Size() > 0 {
return Err("write events before another command in %s aggregate ", a.sequence)
}
if err = c(a.root); err != nil {
return err
}
var e Events
if e, err = NewEvents(a.sequence, a.root.Uncommitted(true)...); err != nil {
return err
}
a.uncommitted = e
return nil
}
func (a *Aggregate[R]) Events() Events {
return a.uncommitted
}
func (a *Aggregate[R]) WriteTo(es EventStore) (Events, error) {
a.mu.Lock()
defer a.mu.Unlock()
var n int
var err error
defer func() { a.uncommitted = make(Events, 0) }()
if a.uncommitted.Size() == 0 {
return nil, nil
}
if ok := registry.exists(a.uncommitted); !ok {
return nil, Err("%s event schema not found, please register it in stream.Aggregates.Events", a.uncommitted)
}
if !a.uncommitted.IsUnique(a.sequence.ID()) {
return nil, Err("aggregate %s events required to be from one root", a.sequence)
}
switch n, err = es.ReadWriter(a.sequence).WriteAt(a.uncommitted, a.sequence.Number()); {
case err != nil:
return nil, err
case n != len(a.uncommitted):
return nil, ErrShortWrite
default:
return a.uncommitted, a.commit(a.uncommitted)
}
}
func (a *Aggregate[R]) String() string {
//a.mu.Lock()
//defer a.mu.Unlock()
if s := len(a.uncommitted); s != 0 {
return fmt.Sprintf("%s->%d", a.sequence, a.sequence.Number()+int64(len(a.uncommitted)))
}
return a.sequence.String()
}
func (a *Aggregate[R]) Resource() Resource {
return a.sequence.ID().Resource()
}
func (a *Aggregate[R]) GoString() string {
return ""
}
func (a *Aggregate[R]) commit(e []Event) error {
for i := range e {
if err := a.root.Commit(e[i].body, e[i].createdAt); err != nil {
return err
}
a.sequence = a.sequence.Next()
}
return nil
}