File tree Expand file tree Collapse file tree 1 file changed +20
-0
lines changed Expand file tree Collapse file tree 1 file changed +20
-0
lines changed Original file line number Diff line number Diff line change @@ -19,10 +19,22 @@ type MessageSender interface {
1919 SendMessages (msgs []* sarama.ProducerMessage ) error
2020}
2121
22+ type AsyncMessageSender struct {
23+ sarama.AsyncProducer
24+ }
25+
26+ func (a * AsyncMessageSender ) SendMessages (msgs []* sarama.ProducerMessage ) error {
27+ for _ , msg := range msgs {
28+ a .AsyncProducer .Input () <- msg
29+ }
30+ return nil
31+ }
32+
2233// Settings contains Kafka-specific configurations needed for message creation
2334type Settings struct {
2435 Topic string `json:"topic"`
2536 Addresses []string `json:"addresses"`
37+ Async bool `json:"async"`
2638 * sarama.Config
2739}
2840
@@ -90,6 +102,14 @@ func (e *Exporter) initializeProducer() error {
90102 if e .dialer == nil {
91103 e .dialer = func (addrs []string , config * sarama.Config ) (MessageSender , error ) {
92104 // Adapter for the function to comply with the MessageSender interface return
105+ if e .Settings .Async {
106+ asyncProducer , err := sarama .NewAsyncProducer (addrs , config )
107+ if err != nil {
108+ return nil , err
109+ }
110+ e .Settings .Config .Producer .Return .Errors = false //TODO these should be read
111+ return & AsyncMessageSender {AsyncProducer : asyncProducer }, nil
112+ }
93113 return sarama .NewSyncProducer (addrs , config )
94114 }
95115 }
You can’t perform that action at this time.
0 commit comments