From 74d5d1fb8758df7f7b55ccbbc27123d1c4bb0825 Mon Sep 17 00:00:00 2001 From: testerxiaodong Date: Sun, 8 Oct 2023 20:50:58 +0800 Subject: [PATCH 1/2] feat(kq): support custom producer balancer and message key --- kq/pusher.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/kq/pusher.go b/kq/pusher.go index 2c56b2b..18eec48 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -99,6 +99,26 @@ func (p *Pusher) Push(v string) error { } } +// SetWriterBalancer set kafka-go custom writer balancer. +func (p *Pusher) SetWriterBalancer(balancer kafka.Balancer) { + if p.producer != nil { + p.producer.Balancer = balancer + } +} + +// PushWithKey sends a message to the Kafka topic with custom message key. +func (p *Pusher) PushWithKey(k, v string) error { + msg := kafka.Message{ + Key: []byte(k), // custom message key + Value: []byte(v), + } + if p.executor != nil { + return p.executor.Add(msg, len(v)) + } else { + return p.producer.WriteMessages(context.Background(), msg) + } +} + // WithChunkSize customizes the Pusher with the given chunk size. func WithChunkSize(chunkSize int) PushOption { return func(options *pushOptions) { From e86d965e4f51d9dd797db0acea5ec692146e4dd7 Mon Sep 17 00:00:00 2001 From: testerxiaodong Date: Sun, 8 Oct 2023 22:16:32 +0800 Subject: [PATCH 2/2] feat(kq): support custom producer balancer and message key --- kq/pusher.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/kq/pusher.go b/kq/pusher.go index 18eec48..d3c3b16 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -22,6 +22,7 @@ type ( pushOptions struct { // kafka.Writer options allowAutoTopicCreation bool + balancer kafka.Balancer // executors.ChunkExecutor options chunkSize int @@ -45,6 +46,9 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { // apply kafka.Writer options producer.AllowAutoTopicCreation = options.allowAutoTopicCreation + if options.balancer != nil { + producer.Balancer = options.balancer + } // apply ChunkExecutor options var chunkOpts []executors.ChunkOption @@ -139,3 +143,10 @@ func WithAllowAutoTopicCreation() PushOption { options.allowAutoTopicCreation = true } } + +// WithBalancer customizes the Pusher with the given balancer. +func WithBalancer(balancer kafka.Balancer) PushOption { + return func(options *pushOptions) { + options.balancer = balancer + } +}