@@ -19,10 +19,28 @@ type MessageSender interface {
1919 SendMessages (msgs []* sarama.ProducerMessage ) error
2020}
2121
22+ type AsyncMessageSender struct {
23+ sarama.AsyncProducer
24+ logger * fflog.FFLogger
25+ }
26+
27+ func (a * AsyncMessageSender ) SendMessages (msgs []* sarama.ProducerMessage ) error {
28+ for len (msgs ) > 0 {
29+ select {
30+ case err := <- a .AsyncProducer .Errors ():
31+ a .logger .Warn ("Failed to produce message: %w" , err )
32+ case a .AsyncProducer .Input () <- msgs [0 ]:
33+ msgs = msgs [1 :]
34+ }
35+ }
36+ return nil
37+ }
38+
2239// Settings contains Kafka-specific configurations needed for message creation
2340type Settings struct {
2441 Topic string `json:"topic"`
2542 Addresses []string `json:"addresses"`
43+ Async bool `json:"async"`
2644 * sarama.Config
2745}
2846
@@ -41,18 +59,24 @@ type Exporter struct {
4159 // dialer will create the producer. This field is added for dependency injection during testing as sarama
4260 // has the annoying tendency to dial as soon as a producer is created.
4361 dialer func (addrs []string , config * sarama.Config ) (MessageSender , error )
62+
63+ logger * fflog.FFLogger
4464}
4565
4666// Export will produce a message to the Kafka topic. The message's value will contain the event encoded in the
4767// selected format. Messages are published synchronously and will error immediately on failure.
48- func (e * Exporter ) Export (_ context.Context , _ * fflog.FFLogger , featureEvents []exporter.FeatureEvent ) error {
68+ func (e * Exporter ) Export (_ context.Context , logger * fflog.FFLogger , featureEvents []exporter.FeatureEvent ) error {
4969 if e .sender == nil {
5070 err := e .initializeProducer ()
5171 if err != nil {
5272 return fmt .Errorf ("writer: %w" , err )
5373 }
5474 }
5575
76+ if e .logger == nil {
77+ e .logger = logger
78+ }
79+
5680 messages := make ([]* sarama.ProducerMessage , 0 , len (featureEvents ))
5781 for _ , event := range featureEvents {
5882 data , err := e .formatMessage (event )
@@ -90,6 +114,13 @@ func (e *Exporter) initializeProducer() error {
90114 if e .dialer == nil {
91115 e .dialer = func (addrs []string , config * sarama.Config ) (MessageSender , error ) {
92116 // Adapter for the function to comply with the MessageSender interface return
117+ if e .Settings .Async {
118+ asyncProducer , err := sarama .NewAsyncProducer (addrs , config )
119+ if err != nil {
120+ return nil , err
121+ }
122+ return & AsyncMessageSender {AsyncProducer : asyncProducer , logger : e .logger }, nil //TODO Close should be called on shutdown
123+ }
93124 return sarama .NewSyncProducer (addrs , config )
94125 }
95126 }
0 commit comments