-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregates_test.go
121 lines (106 loc) · 2.75 KB
/
aggregates_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package stream_test
import (
"os"
"strings"
"testing"
"time"
"github.com/icrowley/fake"
"github.com/sokool/stream"
"github.com/sokool/stream/example/chat"
"github.com/sokool/stream/example/chat/threads"
"github.com/sokool/stream/store/sql"
)
func TestAggregatesWithProjection(t *testing.T) {
sp := &Person{}
chats, err := chat.New(NewEngine(t))
chats.Threads.OnWrite(func(s stream.Session, e stream.Event) error {
return s.Grant(
e.Role("1", "Customer"),
e.Role("4", "Provider"),
e.Role("", "Inflect"),
)
})
if err != nil {
t.Fatal(err)
}
id, ch := fake.CharactersN(6), "#"+strings.ReplaceAll(strings.ToLower(fake.Street()), " ", "-")
type Thread = threads.Thread
if err = chats.Threads.Execute(sp, id, func(t *Thread) error { return t.Start(ch, "[email protected]") }); err != nil {
t.Fatal(err)
}
if err = chats.Threads.Execute(sp, id, func(t *Thread) error { return t.Message("[email protected]", "hi there") }); err != nil {
t.Fatal(err)
}
if err = chats.Threads.Execute(sp, id, func(t *Thread) error { return t.Join("[email protected]") }); err != nil {
t.Fatal(err)
}
if err = chats.Threads.Execute(sp, id, func(t *Thread) error { return t.Message("[email protected]", "crusher!") }); err != nil {
t.Fatal(err)
}
if err = chats.Threads.Execute(sp, id, func(t *Thread) error { return t.Join("[email protected]") }); err != nil {
t.Fatal(err)
}
if err = chats.Threads.Execute(sp, id, func(t *Thread) error { return t.Message("[email protected]", "fine, thx!") }); err != nil {
t.Fatal(err)
}
if err = chats.Threads.Execute(sp, id, func(t *Thread) error { return t.Leave("[email protected]") }); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second * 7)
}
func TestAggregates_SetGet(t *testing.T) {
sp := &Person{}
id, se := fake.CharactersN(8), NewEngine(t)
chats, err := chat.New(se)
t1, err := chats.Threads.Get(sp, id)
if err != nil {
t.Fatal(err)
}
t2, err := chats.Threads.Get(sp, id)
if err != nil {
t.Fatal(err)
}
if t1 != t2 {
t.Fatal()
}
if t1.Version() != 0 {
t.Fatal()
}
if len(t1.Events()) != 0 {
t.Fatal()
}
if err = t1.Run(func(t *threads.Thread) error { return t.Start("#general", "tom") }); err != nil {
t.Fatal(err)
}
if t1.Version() != 0 {
t.Fatal()
}
if t1.Events().Size() != 2 {
t.Fatal()
}
if err = chats.Threads.Set(sp, t1); err != nil {
t.Fatal(err)
}
if t1.Version() != 2 {
t.Fatal()
}
if t1.Events().Size() != 0 {
t.Fatal()
}
}
func NewEngine(t *testing.T) *stream.Engine {
return stream.New(&stream.Configuration{
Name: "MyCoolTestDomain",
EventStore: func(l stream.Logger) stream.EventStore {
host := os.Getenv("MYSQL_EVENT_STORE")
if host == "" {
return stream.NewMemoryEventStore()
}
es, err := sql.NewEventsStore(host, l)
if err != nil {
t.Fatal(err)
}
return es
},
})
}