Skip to content

Commit 2bbba59

Browse files
committed
#3 global Schema with encoder and decoder
1 parent 4d03534 commit 2bbba59

20 files changed

+365
-345
lines changed

aggregates.go

Lines changed: 52 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,29 @@ type Aggregate[R Root] struct {
1818
// It is kept in memory
1919
OnCreate func(string) (R, error)
2020

21-
// Events
22-
Events Schemas
23-
2421
// OnSession called on command dispatch when Session not exists
2522
//OnSession func(R) (Session, error)
2623

27-
// OnRead when all events are committed to R and state is rebuild from all previously persisted Event
28-
OnRead RootFunc[R]
24+
// OnLoad when all events are committed to R and state is rebuild from all previously persisted Event
25+
OnLoad RootFunc[R]
2926

3027
// OnRecall
3128
//OnRecall func(Session, R) error
3229

33-
// OnWrite called just before events are persisted to database
34-
OnWrite RootFunc[R]
30+
// OnSave called just after events are persisted to database
31+
OnSave RootFunc[R]
3532

36-
// OnCommit
33+
// OnCommit when new events are committed to a Root
3734
OnCommit func(R, Events) error // todo not able to deny it (error)
3835

36+
// OnRecall
37+
OnRecall func(R) time.Time
38+
3939
// OnCacheCleanup when aggregate is removed from memory
4040
OnCacheCleanup RootFunc[R]
4141

42-
// RecallAfter default
43-
//RecallAfter time.Duration
42+
// Events
43+
Events Schemas
4444

4545
CleanCacheAfter time.Duration
4646

@@ -56,7 +56,7 @@ type Aggregate[R Root] struct {
5656
Writer Writer
5757

5858
// Logger
59-
//Log Printer
59+
Log Printer
6060

6161
// memory keeps created Changelog of Aggregate in order to avoid rebuilding
6262
// state of each Aggregate everytime when Thread is called
@@ -89,24 +89,30 @@ func (a *Aggregate[R]) Execute(id string, command RootFunc[R]) error {
8989
}
9090

9191
func (a *Aggregate[R]) Get(id string) (R, error) {
92-
d, r, err := a.read(id)
93-
if err != nil {
92+
var found bool
93+
var d RootID
94+
var r R
95+
var err error
96+
97+
if err = a.init(); err != nil {
9498
return r, err
9599
}
96100

97-
if a.Store == nil {
98-
a.Store = NewEventStore()
101+
if r, found = a.memory.Get(id); !found {
102+
if r, err = a.OnCreate(id); err != nil {
103+
return r, err
104+
}
99105
}
100106

101-
rw, evs, m := a.Store.ReadWriter(d), make([]Event[any], a.LoadEventsInChunks), 0
107+
if d, err = NewRootID(r); err != nil {
108+
return r, err
109+
}
110+
111+
rw, evs, m := a.Store.ReadWriter(d), make(Events, a.LoadEventsInChunks), 0
102112
for {
103113
switch m, err = rw.ReadAt(evs, r.Version()); {
104114

105115
case err == ErrEndOfStream || err == nil:
106-
if m == 0 {
107-
return r, nil
108-
}
109-
110116
if failed := a.commit(r, evs[:m]); failed != nil {
111117
return r, failed
112118
}
@@ -115,13 +121,13 @@ func (a *Aggregate[R]) Get(id string) (R, error) {
115121
continue
116122
}
117123

118-
if a.OnRead != nil {
119-
if err = a.OnRead(r); err != nil {
124+
if a.OnLoad != nil {
125+
if err = a.OnLoad(r); err != nil {
120126
return r, err
121127
}
122128
}
123129

124-
return r, nil
130+
return r, a.memory.Set(r.ID(), r)
125131

126132
default:
127133
return r, Err("%s root read failed due %w", r, err)
@@ -135,7 +141,7 @@ func (a *Aggregate[R]) Set(r R) error {
135141
}
136142

137143
events, err := NewEvents(r)
138-
if err != nil || len(events) == 0 {
144+
if err = events.Extend(r); err != nil || len(events) == 0 {
139145
return err
140146
}
141147

@@ -158,67 +164,44 @@ func (a *Aggregate[R]) Set(r R) error {
158164
return err
159165
}
160166

161-
if a.OnWrite != nil {
162-
if err = a.OnWrite(r); err != nil {
163-
return err
164-
}
165-
}
166-
167167
if a.Writer != nil {
168168
if _, err = a.Writer.Write(events); err != nil {
169169
return err
170170
}
171171
}
172172

173-
return nil
174-
}
175-
176-
func (a *Aggregate[R]) read(id string) (RootID, R, error) {
177-
var r, ok = a.memory.Get(id)
178-
var d RootID
179-
var err error
180-
181-
if err = a.init(); err != nil {
182-
return d, r, err
183-
}
184-
185-
if !ok {
186-
if r, err = a.OnCreate(id); err != nil {
187-
return d, r, err
173+
if a.OnSave != nil {
174+
if err = a.OnSave(r); err != nil {
175+
return err
188176
}
189177
}
190178

191-
if d, err = NewRootID(r); err != nil {
192-
return d, r, err
193-
}
194-
195-
return d, r, nil
179+
return a.memory.Set(r.ID(), r)
196180
}
197181

198-
func (a *Aggregate[R]) commit(r R, e []Event[any]) error {
182+
func (a *Aggregate[R]) commit(r R, e []Event) error {
199183
if len(e) == 0 {
200184
return nil
201185
}
202186

203-
if a.OnCommit != nil {
204-
if err := a.OnCommit(r, e); err != nil {
205-
return err
206-
}
207-
}
208-
209187
// todo check if given slice of events has correct iteration of sequences, match it with current
210188
// version of R
211189
for i := range e {
212-
if err := r.Commit(e[i].Body, e[i].CreatedAt); err != nil {
190+
if err := r.Commit(e[i].body, e[i].createdAt); err != nil {
213191
return err
214192
}
215193

216194
//s += int64(len(e.Body))
217195
}
218196

197+
if a.OnCommit != nil {
198+
if err := a.OnCommit(r, e); err != nil {
199+
return err
200+
}
201+
}
219202
//a.version = events[len(events)-1].Sequence
220203
//a.size += s
221-
return a.memory.Set(r.ID(), r)
204+
return nil
222205
}
223206

224207
func (a *Aggregate[R]) init() (err error) {
@@ -229,6 +212,10 @@ func (a *Aggregate[R]) init() (err error) {
229212
}
230213
}
231214

215+
if a.Store == nil {
216+
a.Store = NewEventStore()
217+
}
218+
232219
if a.memory == nil {
233220
a.memory = NewCache[string, R](a.CleanCacheAfter)
234221
}
@@ -258,14 +245,18 @@ func (a *Aggregate[R]) Register(in *Domain) (err error) {
258245
return err
259246
}
260247

261-
if a.Store == nil {
248+
if _, ok := a.Store.(*store); ok {
262249
a.Store = in.store
263250
}
264251

265-
if err = in.schemas.Merge(a.Events); err != nil {
252+
if err = registry.merge(a.Events, a.Type); err != nil {
266253
return err
267254
}
268255

256+
if a.Log == nil {
257+
a.Log = in.logger(a.Type)
258+
}
259+
269260
if a.Writer != nil {
270261
in.writers.list = append(in.writers.list, a.Writer)
271262
}

aggregates_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ func TestAggregates(t *testing.T) {
3535

3636
func NewDomain(t *testing.T) *stream.Domain {
3737
return stream.NewDomain(&stream.Configuration{
38-
EventStore: func(s *stream.Schemas, l stream.Printer) stream.EventStore {
39-
es, err := mysql.NewEventsStore(os.Getenv("MYSQL_EVENT_STORE"), s, l)
38+
EventStore: func(l stream.Printer) stream.EventStore {
39+
es, err := mysql.NewEventsStore(os.Getenv("MYSQL_EVENT_STORE"), l)
4040
if err != nil {
4141
t.Fatal(err)
4242
}

domain.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,26 @@ package stream
22

33
import "os"
44

5-
type RR interface {
5+
type Registerer interface {
66
Register(*Domain) error
77
}
88

99
type Domain struct {
1010
store EventStore
11-
schemas *Schemas
1211
logger Logger
1312
writers *multiWriter
1413
}
1514

1615
type Configuration struct {
17-
EventStore EventStoreFunc
18-
Logger Logger
16+
// Logger
17+
Logger func(Type) Printer
18+
19+
// EventStore factory
20+
EventStore func(Printer) EventStore // todo func not needed
1921
}
2022

2123
func NewDomain(c *Configuration) *Domain {
2224
s := Domain{
23-
schemas: NewSchemas(),
2425
store: NewEventStore(),
2526
logger: NewLogger(os.Stdout, "stream", true).WithTag,
2627
writers: &multiWriter{},
@@ -30,13 +31,13 @@ func NewDomain(c *Configuration) *Domain {
3031
s.logger = c.Logger
3132
}
3233
if c.EventStore != nil {
33-
s.store = c.EventStore(s.schemas, s.logger("EventStore"))
34+
s.store = c.EventStore(s.logger("EventStore"))
3435
}
3536

3637
return &s
3738
}
3839

39-
func (s *Domain) Register(r ...RR) error {
40+
func (s *Domain) Register(r ...Registerer) error {
4041
for i := range r {
4142
if err := r[i].Register(s); err != nil {
4243
return err
@@ -45,6 +46,8 @@ func (s *Domain) Register(r ...RR) error {
4546
return nil
4647
}
4748

49+
func (s *Domain) Run() {}
50+
4851
// MultiWriter creates a writer that duplicates its writes to all the
4952
// provided writers, similar to the Unix tee(1) command.
5053
//

0 commit comments

Comments
 (0)