Skip to content

Commit e86d965

Browse files
feat(kq): support custom producer balancer and message key
1 parent 74d5d1f commit e86d965

1 file changed

Lines changed: 11 additions & 0 deletions

File tree

kq/pusher.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type (
2222
pushOptions struct {
2323
// kafka.Writer options
2424
allowAutoTopicCreation bool
25+
balancer kafka.Balancer
2526

2627
// executors.ChunkExecutor options
2728
chunkSize int
@@ -45,6 +46,9 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
4546

4647
// apply kafka.Writer options
4748
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation
49+
if options.balancer != nil {
50+
producer.Balancer = options.balancer
51+
}
4852

4953
// apply ChunkExecutor options
5054
var chunkOpts []executors.ChunkOption
@@ -139,3 +143,10 @@ func WithAllowAutoTopicCreation() PushOption {
139143
options.allowAutoTopicCreation = true
140144
}
141145
}
146+
147+
// WithBalancer customizes the Pusher with the given balancer.
148+
func WithBalancer(balancer kafka.Balancer) PushOption {
149+
return func(options *pushOptions) {
150+
options.balancer = balancer
151+
}
152+
}

0 commit comments

Comments
 (0)