-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice.go
121 lines (95 loc) · 1.9 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package stream
import (
"os"
"sync"
"github.com/sokool/log"
)
type Component interface {
Compose(*Engine) error
}
type Configuration struct {
Name Type
// Logger
Logger NewLogger
// EventStore factory
EventStore func(Logger) EventStore // todo func not needed
}
type Engine struct {
name Type
store EventStore
logger NewLogger
log Logger
writers map[Type]Writer
mu sync.RWMutex
}
func New(c *Configuration) *Engine {
s := Engine{
name: c.Name,
store: MemoryEventStore,
logger: newLogger,
writers: map[Type]Writer{},
}
if c.Logger != nil {
s.logger = c.Logger
}
if c.EventStore != nil {
s.store = c.EventStore(s.logger("EventStore"))
}
s.log = s.logger(s.name.String())
return &s
}
func (s *Engine) Compose(c ...Component) error {
//s.mu.Lock()
//defer s.mu.Unlock()
for i := range c {
if err := c[i].Compose(s); err != nil {
return err
}
}
return nil
}
func (s *Engine) Write(e Events) (n int, err error) {
var swg sync.WaitGroup
var che = make(chan error, len(s.writers))
s.mu.Lock()
defer s.mu.Unlock()
for t := range s.writers {
swg.Add(1)
go func(t Type, w Writer, e Events) {
ok := registry.isCoupled(t, e)
if !ok {
swg.Done()
}
if _, err = w.Write(e); err != nil {
if ok {
che <- err
} else {
s.log("write:err %s %s failed due `%s` error", e, t, err)
}
}
if ok {
swg.Done()
}
}(t, s.writers[t], e)
}
swg.Wait()
close(che)
return len(e), <-che
}
func (s *Engine) register(w Writer, t Type) error {
if _, ok := s.writers[t]; ok {
return Err("%s already registered", t)
}
s.writers[t] = w
return nil
}
func (s *Engine) Run() {}
type Logger = func(string, ...any)
type NewLogger func(...string) Logger
func newLogger(tag ...string) Logger {
l := log.New(os.Stdout, log.Date|log.Time|log.Levels|log.Tags|log.Colors)
if len(tag) != 0 {
l = l.Tag(tag[0])
}
return l.Printf
}