-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathproducer.go
58 lines (44 loc) · 1.39 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package metamorphosis
import (
"fmt"
kafkaavro "github.com/elodina/go-kafka-avro"
"github.com/elodina/siesta"
siestaProducer "github.com/elodina/siesta-producer"
)
// Producer is the interface for emitting data to a Kafka topic.
type Producer interface {
Emit(topic string, value interface{}) error
}
type defaultProducer struct {
kafkaProducer *siestaProducer.KafkaProducer
}
func NewProducer(broker, schemaRegistry string) (Producer, error) {
encoder := kafkaavro.NewKafkaAvroEncoder(schemaRegistry)
config := siesta.NewConnectorConfig()
config.BrokerList = []string{broker}
connector, err := siesta.NewDefaultConnector(config)
if err != nil {
return nil, err
}
fmt.Printf("%v\n", connector)
producerConfig := siestaProducer.NewProducerConfig()
producerConfig.BatchSize = 1
p := siestaProducer.NewKafkaProducer(
producerConfig,
siestaProducer.ByteSerializer,
encoder.Encode,
connector,
)
return &defaultProducer{p}, nil
}
func (p *defaultProducer) Emit(topic string, value interface{}) error {
record := siestaProducer.ProducerRecord{Topic: topic, Value: value}
resultChannel := p.kafkaProducer.Send(&record)
res := <-resultChannel
// Disappointing that a string is returned when everything works successfully.
// This is a limitation of the library that we're using under the hood.
if res.Error.Error() != "No error - it worked!" {
return res.Error
}
return nil
}