Skip to content

Commit 319b750

Browse files
authored
Merge pull request #55 from testerxiaodong/master
feat(kq): support custom producer balancer and message key
2 parents f2ee5c0 + ce1549c commit 319b750

File tree

1 file changed

+31
-0
lines changed

1 file changed

+31
-0
lines changed

kq/pusher.go

+31
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type (
2424
pushOptions struct {
2525
// kafka.Writer options
2626
allowAutoTopicCreation bool
27+
balancer kafka.Balancer
2728

2829
// executors.ChunkExecutor options
2930
chunkSize int
@@ -50,6 +51,9 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
5051

5152
// apply kafka.Writer options
5253
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation
54+
if options.balancer != nil {
55+
producer.Balancer = options.balancer
56+
}
5357

5458
pusher := &Pusher{
5559
producer: producer,
@@ -134,6 +138,26 @@ func (p *Pusher) PushWithKey(ctx context.Context, key, v string) error {
134138
}
135139
}
136140

141+
// SetWriterBalancer set kafka-go custom writer balancer.
142+
func (p *Pusher) SetWriterBalancer(balancer kafka.Balancer) {
143+
if p.producer != nil {
144+
p.producer.Balancer = balancer
145+
}
146+
}
147+
148+
// PushWithKey sends a message to the Kafka topic with custom message key.
149+
func (p *Pusher) PushWithKey(k, v string) error {
150+
msg := kafka.Message{
151+
Key: []byte(k), // custom message key
152+
Value: []byte(v),
153+
}
154+
if p.executor != nil {
155+
return p.executor.Add(msg, len(v))
156+
} else {
157+
return p.producer.WriteMessages(context.Background(), msg)
158+
}
159+
}
160+
137161
// WithChunkSize customizes the Pusher with the given chunk size.
138162
func WithChunkSize(chunkSize int) PushOption {
139163
return func(options *pushOptions) {
@@ -155,6 +179,13 @@ func WithAllowAutoTopicCreation() PushOption {
155179
}
156180
}
157181

182+
// WithBalancer customizes the Pusher with the given balancer.
183+
func WithBalancer(balancer kafka.Balancer) PushOption {
184+
return func(options *pushOptions) {
185+
options.balancer = balancer
186+
}
187+
}
188+
158189
// WithSyncPush enables the Pusher to push messages synchronously.
159190
func WithSyncPush() PushOption {
160191
return func(options *pushOptions) {

0 commit comments

Comments
 (0)