Skip to content

Commit c851bec

Browse files
authored
Merge pull request #396 from klowdo/eventstore-snapshot
Eventstore snapshot
2 parents a7a482f + 4f802e8 commit c851bec

File tree

16 files changed

+783
-38
lines changed

16 files changed

+783
-38
lines changed

aggregate.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"fmt"
2121
"sync"
22+
"time"
2223

2324
"github.com/looplab/eventhorizon/uuid"
2425
)
@@ -58,6 +59,13 @@ type AggregateStore interface {
5859
Save(context.Context, Aggregate) error
5960
}
6061

62+
// SnapshotStrategy determines if a snapshot should be taken or not.
63+
type SnapshotStrategy interface {
64+
ShouldTakeSnapshot(lastSnapshotVersion int,
65+
lastSnapshotTimestamp time.Time,
66+
event Event) bool
67+
}
68+
6169
var (
6270
// ErrAggregateNotFound is when no aggregate can be found.
6371
ErrAggregateNotFound = errors.New("aggregate not found")
@@ -148,7 +156,8 @@ func (e *AggregateError) Cause() error {
148156
// used to create concrete aggregate types when loading from the database.
149157
//
150158
// An example would be:
151-
// RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })
159+
//
160+
// RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })
152161
func RegisterAggregate(factory func(uuid.UUID) Aggregate) {
153162
// Check that the created aggregate matches the registered type.
154163
// TODO: Explore the use of reflect/gob for creating concrete types without

aggregatestore/events/aggregatestore.go

+87-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"time"
2122

2223
eh "github.com/looplab/eventhorizon"
2324
"github.com/looplab/eventhorizon/uuid"
@@ -27,7 +28,10 @@ import (
2728
// uses an event store for loading and saving events used to build the aggregate
2829
// and an event handler to handle resulting events.
2930
type AggregateStore struct {
30-
store eh.EventStore
31+
store eh.EventStore
32+
snapshotStore eh.SnapshotStore
33+
isSnapshotStore bool
34+
snapshotStrategy eh.SnapshotStrategy
3135
}
3236

3337
var (
@@ -39,10 +43,10 @@ var (
3943
ErrMismatchedEventType = errors.New("mismatched event type and aggregate type")
4044
)
4145

42-
// NewAggregateStore creates a aggregate store with an event store and an event
46+
// NewAggregateStore creates an aggregate store with an event store and an event
4347
// handler that will handle resulting events (for example by publishing them
4448
// on an event bus).
45-
func NewAggregateStore(store eh.EventStore) (*AggregateStore, error) {
49+
func NewAggregateStore(store eh.EventStore, options ...Option) (*AggregateStore, error) {
4650
if store == nil {
4751
return nil, ErrInvalidEventStore
4852
}
@@ -51,9 +55,31 @@ func NewAggregateStore(store eh.EventStore) (*AggregateStore, error) {
5155
store: store,
5256
}
5357

58+
d.snapshotStrategy = &NoSnapshotStrategy{}
59+
60+
for _, option := range options {
61+
if err := option(d); err != nil {
62+
return nil, fmt.Errorf("error while applying option: %w", err)
63+
}
64+
}
65+
66+
d.snapshotStore, d.isSnapshotStore = store.(eh.SnapshotStore)
67+
5468
return d, nil
5569
}
5670

71+
// Option is an option setter used to configure creation.
72+
type Option func(*AggregateStore) error
73+
74+
// WithSnapshotStrategy add the strategy to use when determining if a snapshot should be taken
75+
func WithSnapshotStrategy(s eh.SnapshotStrategy) Option {
76+
return func(as *AggregateStore) error {
77+
as.snapshotStrategy = s
78+
79+
return nil
80+
}
81+
}
82+
5783
// Load implements the Load method of the eventhorizon.AggregateStore interface.
5884
// It loads an aggregate from the event store by creating a new aggregate of the
5985
// type with the ID and then applies all events to it, thus making it the most
@@ -79,7 +105,26 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp
79105
}
80106
}
81107

82-
events, err := r.store.Load(ctx, a.EntityID())
108+
fromVersion := 1
109+
110+
if sa, ok := a.(eh.Snapshotable); ok && r.isSnapshotStore {
111+
snapshot, err := r.snapshotStore.LoadSnapshot(ctx, id)
112+
if err != nil {
113+
return nil, &eh.AggregateStoreError{
114+
Err: err,
115+
Op: eh.AggregateStoreOpLoad,
116+
AggregateType: aggregateType,
117+
AggregateID: id,
118+
}
119+
}
120+
121+
if snapshot != nil {
122+
sa.ApplySnapshot(snapshot)
123+
fromVersion = snapshot.Version + 1
124+
}
125+
}
126+
127+
events, err := r.store.LoadFrom(ctx, a.EntityID(), fromVersion)
83128
if err != nil && !errors.Is(err, eh.ErrAggregateNotFound) {
84129
return nil, &eh.AggregateStoreError{
85130
Err: err,
@@ -142,6 +187,44 @@ func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error {
142187
}
143188
}
144189

190+
return r.takeSnapshot(ctx, agg, events[len(events)-1])
191+
}
192+
193+
func (r *AggregateStore) takeSnapshot(ctx context.Context, agg eh.Aggregate, lastEvent eh.Event) error {
194+
a, ok := agg.(eh.Snapshotable)
195+
if !ok || !r.isSnapshotStore {
196+
return nil
197+
}
198+
199+
s, err := r.snapshotStore.LoadSnapshot(ctx, agg.EntityID())
200+
if err != nil {
201+
return &eh.AggregateStoreError{
202+
Err: err,
203+
Op: eh.AggregateStoreOpSave,
204+
AggregateType: agg.AggregateType(),
205+
AggregateID: agg.EntityID(),
206+
}
207+
}
208+
209+
version := 0
210+
timestamp := time.Now()
211+
212+
if s != nil {
213+
version = s.Version
214+
timestamp = s.Timestamp
215+
}
216+
217+
if res := r.snapshotStrategy.ShouldTakeSnapshot(version, timestamp, lastEvent); res {
218+
if err = r.snapshotStore.SaveSnapshot(ctx, agg.EntityID(), *a.CreateSnapshot()); err != nil {
219+
return &eh.AggregateStoreError{
220+
Err: err,
221+
Op: eh.AggregateStoreOpSave,
222+
AggregateType: agg.AggregateType(),
223+
AggregateID: agg.EntityID(),
224+
}
225+
}
226+
}
227+
145228
return nil
146229
}
147230

aggregatestore/events/aggregatestore_test.go

+60-1
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ package events
1717
import (
1818
"context"
1919
"errors"
20+
"fmt"
2021
"reflect"
2122
"testing"
2223
"time"
2324

2425
eh "github.com/looplab/eventhorizon"
2526
"github.com/looplab/eventhorizon/mocks"
2627
"github.com/looplab/eventhorizon/uuid"
28+
"github.com/stretchr/testify/assert"
2729
)
2830

2931
func TestNewAggregateStore(t *testing.T) {
@@ -253,6 +255,46 @@ func TestAggregateStore_SaveEvents(t *testing.T) {
253255
agg.err = nil
254256
}
255257

258+
func TestAggregateStore_TakeSnapshot(t *testing.T) {
259+
eventStore := &mocks.EventStore{
260+
Events: make([]eh.Event, 0),
261+
}
262+
263+
store, err := NewAggregateStore(eventStore, WithSnapshotStrategy(NewEveryNumberEventSnapshotStrategy(2)))
264+
if err != nil {
265+
t.Fatal("there should be no error:", err)
266+
}
267+
268+
ctx := context.Background()
269+
270+
id := uuid.New()
271+
agg := NewTestAggregateOther(id)
272+
273+
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
274+
275+
for i := 0; i < 3; i++ {
276+
agg.AppendEvent(mocks.EventType, &mocks.EventData{Content: fmt.Sprintf("event%d", i)}, timestamp)
277+
278+
if err := store.Save(ctx, agg); err != nil {
279+
t.Error("should not be an error")
280+
}
281+
}
282+
283+
assert.NotNil(t, eventStore.Snapshot, "snapshot should be taken")
284+
285+
agg2, err := store.Load(ctx, agg.AggregateType(), agg.EntityID())
286+
if err != nil {
287+
t.Error("should not be an error")
288+
}
289+
290+
a, ok := agg2.(*TestAggregateOther)
291+
if !ok {
292+
t.Error("wrong aggregate type")
293+
}
294+
295+
assert.Equal(t, 1, a.appliedEvents)
296+
}
297+
256298
func TestAggregateStore_AggregateNotRegistered(t *testing.T) {
257299
store, _ := createStore(t)
258300

@@ -296,7 +338,8 @@ const TestAggregateOtherType eh.AggregateType = "TestAggregateOther"
296338

297339
type TestAggregateOther struct {
298340
*AggregateBase
299-
err error
341+
err error
342+
appliedEvents int
300343
}
301344

302345
var _ = VersionedAggregate(&TestAggregateOther{})
@@ -316,5 +359,21 @@ func (a *TestAggregateOther) ApplyEvent(ctx context.Context, event eh.Event) err
316359
return a.err
317360
}
318361

362+
a.appliedEvents++
363+
319364
return nil
320365
}
366+
367+
func (a *TestAggregateOther) CreateSnapshot() *eh.Snapshot {
368+
return &eh.Snapshot{
369+
Version: a.AggregateVersion(),
370+
Timestamp: time.Now(),
371+
AggregateType: TestAggregateType,
372+
State: a,
373+
}
374+
}
375+
376+
func (a *TestAggregateOther) ApplySnapshot(snapshot *eh.Snapshot) {
377+
agg := snapshot.State.(*TestAggregateOther)
378+
a.id = agg.id
379+
}
+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) 2021 - The Event Horizon authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package events
16+
17+
import (
18+
"time"
19+
20+
eh "github.com/looplab/eventhorizon"
21+
)
22+
23+
// NoSnapshotStrategy no snapshot should be taken.
24+
type NoSnapshotStrategy struct {
25+
}
26+
27+
func (s *NoSnapshotStrategy) ShouldTakeSnapshot(_ int,
28+
_ time.Time,
29+
_ eh.Event) bool {
30+
return false
31+
}
32+
33+
// EveryNumberEventSnapshotStrategy use to take a snapshot every n number of events.
34+
type EveryNumberEventSnapshotStrategy struct {
35+
snapshotThreshold int
36+
}
37+
38+
func NewEveryNumberEventSnapshotStrategy(threshold int) *EveryNumberEventSnapshotStrategy {
39+
return &EveryNumberEventSnapshotStrategy{
40+
snapshotThreshold: threshold,
41+
}
42+
}
43+
44+
func (s *EveryNumberEventSnapshotStrategy) ShouldTakeSnapshot(lastSnapshotVersion int,
45+
_ time.Time,
46+
event eh.Event) bool {
47+
return event.Version()-lastSnapshotVersion >= s.snapshotThreshold
48+
}
49+
50+
// PeriodSnapshotStrategy use to take a snapshot every time a period has elapsed, for example every hour.
51+
type PeriodSnapshotStrategy struct {
52+
snapshotThreshold time.Duration
53+
}
54+
55+
func NewPeriodSnapshotStrategy(threshold time.Duration) *PeriodSnapshotStrategy {
56+
return &PeriodSnapshotStrategy{
57+
snapshotThreshold: threshold,
58+
}
59+
}
60+
61+
func (s *PeriodSnapshotStrategy) ShouldTakeSnapshot(_ int,
62+
lastSnapshotTimestamp time.Time,
63+
event eh.Event) bool {
64+
return event.Timestamp().Sub(lastSnapshotTimestamp) >= s.snapshotThreshold
65+
}

0 commit comments

Comments
 (0)