Skip to content

Commit 9dfbd48

Browse files
authored
Merge pull request #69 from MarkJoyMa/feat/kq_sync_push
feat: kq supports synchronous push
2 parents 328f46c + f05c60e commit 9dfbd48

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

kq/pusher.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ type (
2626
// executors.ChunkExecutor options
2727
chunkSize int
2828
flushInterval time.Duration
29+
30+
// enableSyncPush is used to enable sync push
31+
enableSyncPush bool
2932
}
3033
)
3134

@@ -46,6 +49,16 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
4649
// apply kafka.Writer options
4750
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation
4851

52+
pusher := &Pusher{
53+
producer: producer,
54+
topic: topic,
55+
}
56+
57+
// if enableSyncPush is true, return the pusher directly
58+
if options.enableSyncPush {
59+
return pusher
60+
}
61+
4962
// apply ChunkExecutor options
5063
var chunkOpts []executors.ChunkOption
5164
if options.chunkSize > 0 {
@@ -55,10 +68,6 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
5568
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
5669
}
5770

58-
pusher := &Pusher{
59-
producer: producer,
60-
topic: topic,
61-
}
6271
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
6372
chunk := make([]kafka.Message, len(tasks))
6473
for i := range tasks {
@@ -119,3 +128,10 @@ func WithAllowAutoTopicCreation() PushOption {
119128
options.allowAutoTopicCreation = true
120129
}
121130
}
131+
132+
// WithEnableSyncPush enables sync push.
133+
func WithEnableSyncPush() PushOption {
134+
return func(options *pushOptions) {
135+
options.enableSyncPush = true
136+
}
137+
}

0 commit comments

Comments
 (0)