Skip to content

Commit c421c5c

Browse files
Michal Tichákknopers8
authored andcommitted
[core] keeping created kafka writers in global object
1 parent 0f367c7 commit c421c5c

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

common/event/writer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ func NewWriterWithTopic(topic topic.Topic) *Writer {
5656
}
5757
}
5858

59+
func (w *Writer) Close() {
60+
w.Close()
61+
}
62+
5963
func (w *Writer) WriteEvent(e interface{}) {
6064
w.WriteEventWithTimestamp(e, time.Now())
6165
}

core/core.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ func Run() error {
112112

113113
// Ensure all workflow plugins are destroyed before core teardown
114114
integration.PluginsInstance().DestroyAll()
115+
the.ClearEventWriters()
115116

116117
return err
117118
}

core/the/eventwriter.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,47 @@
2525
package the
2626

2727
import (
28+
"sync"
29+
2830
"github.com/AliceO2Group/Control/common/event"
2931
"github.com/AliceO2Group/Control/common/event/topic"
32+
"github.com/AliceO2Group/Control/common/logger"
33+
"github.com/sirupsen/logrus"
34+
)
35+
36+
var (
37+
writers = make(map[topic.Topic]*event.Writer)
38+
mu sync.Mutex
39+
log = logger.New(logrus.StandardLogger(), "core")
3040
)
3141

42+
func createOrGetWriter(topic topic.Topic) *event.Writer {
43+
mu.Lock()
44+
defer mu.Unlock()
45+
46+
if writer, ok := writers[topic]; ok {
47+
return writer
48+
}
49+
50+
writers[topic] = event.NewWriterWithTopic(topic)
51+
return writers[topic]
52+
}
53+
3254
func EventWriter() *event.Writer {
33-
return EventWriterWithTopic(topic.Root)
55+
return createOrGetWriter(topic.Root)
3456
}
3557

3658
func EventWriterWithTopic(topic topic.Topic) *event.Writer {
37-
return event.NewWriterWithTopic(topic)
59+
return createOrGetWriter(topic)
60+
}
61+
62+
func ClearEventWriters() {
63+
mu.Lock()
64+
defer mu.Unlock()
65+
66+
log.Logf(logrus.InfoLevel, "Clearing %d kafka producers", len(writers))
67+
for _, writer := range writers {
68+
writer.Close()
69+
}
70+
clear(writers)
3871
}

0 commit comments

Comments
 (0)