Skip to content

Commit dbbb9a3

Browse files
feat(mq): add key field
1 parent 4a99c09 commit dbbb9a3

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

example/kq/producer/produce.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"log"
@@ -41,7 +42,12 @@ func main() {
4142
}
4243

4344
fmt.Println(string(body))
44-
if err := pusher.Push(string(body)); err != nil {
45+
if err := pusher.Push(context.Background(), string(body)); err != nil {
46+
log.Fatal(err)
47+
}
48+
49+
fmt.Println(string(body))
50+
if err := pusher.KPush(context.Background(), "test", string(body)); err != nil {
4551
log.Fatal(err)
4652
}
4753
}

kq/pusher.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,29 @@ func (p *Pusher) Name() string {
8686
return p.topic
8787
}
8888

89+
// KPush sends a message to the Kafka topic.
90+
func (p *Pusher) KPush(ctx context.Context, k, v string) error {
91+
msg := kafka.Message{
92+
Key: []byte(k), // current timestamp
93+
Value: []byte(v),
94+
}
95+
if p.executor != nil {
96+
return p.executor.Add(msg, len(v))
97+
} else {
98+
return p.producer.WriteMessages(ctx, msg)
99+
}
100+
}
101+
89102
// Push sends a message to the Kafka topic.
90-
func (p *Pusher) Push(v string) error {
103+
func (p *Pusher) Push(ctx context.Context, v string) error {
91104
msg := kafka.Message{
92105
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), // current timestamp
93106
Value: []byte(v),
94107
}
95108
if p.executor != nil {
96109
return p.executor.Add(msg, len(v))
97110
} else {
98-
return p.producer.WriteMessages(context.Background(), msg)
111+
return p.producer.WriteMessages(ctx, msg)
99112
}
100113
}
101114

0 commit comments

Comments
 (0)