Skip to content

Commit 7dcba06

Browse files
authored
Merge pull request #70 from MarkJoyMa/feat/kq_trace
feat: kq support trace
2 parents 9dfbd48 + ad61cc3 commit 7dcba06

File tree

10 files changed

+266
-21
lines changed

10 files changed

+266
-21
lines changed

example/kq/consumer/queue.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/zeromicro/go-queue/kq"
@@ -11,7 +12,7 @@ func main() {
1112
var c kq.KqConf
1213
conf.MustLoad("config.yaml", &c)
1314

14-
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
15+
q := kq.MustNewQueue(c, kq.WithHandle(func(ctx context.Context, k, v string) error {
1516
fmt.Printf("=> %s\n", v)
1617
return nil
1718
}))

example/kq/producer/produce.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"log"
@@ -41,7 +42,7 @@ func main() {
4142
}
4243

4344
fmt.Println(string(body))
44-
if err := pusher.Push(string(body)); err != nil {
45+
if err := pusher.Push(context.Background(), string(body)); err != nil {
4546
log.Fatal(err)
4647
}
4748
}

go.mod

+5-1
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ require (
77
github.com/nats-io/stan.go v0.10.4
88
github.com/rabbitmq/amqp091-go v1.9.0
99
github.com/segmentio/kafka-go v0.4.38
10+
github.com/stretchr/testify v1.9.0
1011
github.com/zeromicro/go-zero v1.6.3
12+
go.opentelemetry.io/otel v1.19.0
1113
)
1214

1315
require (
1416
github.com/beorn7/perks v1.0.1 // indirect
1517
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
1618
github.com/cespare/xxhash/v2 v2.2.0 // indirect
19+
github.com/davecgh/go-spew v1.1.1 // indirect
1720
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
1821
github.com/fatih/color v1.16.0 // indirect
1922
github.com/go-logr/logr v1.3.0 // indirect
@@ -33,13 +36,13 @@ require (
3336
github.com/openzipkin/zipkin-go v0.4.2 // indirect
3437
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
3538
github.com/pierrec/lz4/v4 v4.1.17 // indirect
39+
github.com/pmezard/go-difflib v1.0.0 // indirect
3640
github.com/prometheus/client_golang v1.18.0 // indirect
3741
github.com/prometheus/client_model v0.5.0 // indirect
3842
github.com/prometheus/common v0.45.0 // indirect
3943
github.com/prometheus/procfs v0.12.0 // indirect
4044
github.com/redis/go-redis/v9 v9.4.0 // indirect
4145
github.com/spaolacci/murmur3 v1.1.0 // indirect
42-
go.opentelemetry.io/otel v1.19.0 // indirect
4346
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
4447
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
4548
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
@@ -60,4 +63,5 @@ require (
6063
google.golang.org/grpc v1.62.0 // indirect
6164
google.golang.org/protobuf v1.32.0 // indirect
6265
gopkg.in/yaml.v2 v2.4.0 // indirect
66+
gopkg.in/yaml.v3 v3.0.1 // indirect
6367
)

go.sum

+2-1
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,16 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b
148148
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
149149
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
150150
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
151-
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
152151
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
152+
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
153153
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
154154
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
155155
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
156156
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
157157
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
158158
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
159159
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
160+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
160161
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
161162
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
162163
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=

kq/internal/message.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package internal
2+
3+
import "github.com/segmentio/kafka-go"
4+
5+
type Message struct {
6+
*kafka.Message
7+
}
8+
9+
func NewMessage(msg *kafka.Message) *Message {
10+
return &Message{Message: msg}
11+
}
12+
13+
func (m *Message) GetHeader(key string) string {
14+
for _, h := range m.Headers {
15+
if h.Key == key {
16+
return string(h.Value)
17+
}
18+
}
19+
return ""
20+
}
21+
22+
func (m *Message) SetHeader(key, val string) {
23+
// Ensure uniqueness of keys
24+
for i := 0; i < len(m.Headers); i++ {
25+
if m.Headers[i].Key == key {
26+
m.Headers = append(m.Headers[:i], m.Headers[i+1:]...)
27+
i--
28+
}
29+
}
30+
m.Headers = append(m.Headers, kafka.Header{
31+
Key: key,
32+
Value: []byte(val),
33+
})
34+
}

kq/internal/message_test.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package internal
2+
3+
import (
4+
"testing"
5+
6+
"github.com/segmentio/kafka-go"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestMessageGetHeader(t *testing.T) {
11+
testCases := []struct {
12+
name string
13+
msg *Message
14+
key string
15+
expected string
16+
}{
17+
{
18+
name: "exists",
19+
msg: &Message{
20+
Message: &kafka.Message{Headers: []kafka.Header{
21+
{Key: "foo", Value: []byte("bar")},
22+
}}},
23+
key: "foo",
24+
expected: "bar",
25+
},
26+
{
27+
name: "not exists",
28+
msg: &Message{Message: &kafka.Message{Headers: []kafka.Header{}}},
29+
key: "foo",
30+
expected: "",
31+
},
32+
}
33+
34+
for _, tc := range testCases {
35+
t.Run(tc.name, func(t *testing.T) {
36+
result := tc.msg.GetHeader(tc.key)
37+
assert.Equal(t, tc.expected, result)
38+
})
39+
}
40+
}
41+
42+
func TestMessageSetHeader(t *testing.T) {
43+
msg := &Message{Message: &kafka.Message{Headers: []kafka.Header{
44+
{Key: "foo", Value: []byte("bar")}},
45+
}}
46+
47+
msg.SetHeader("foo", "bar2")
48+
msg.SetHeader("foo2", "bar2")
49+
msg.SetHeader("foo2", "bar3")
50+
msg.SetHeader("foo3", "bar4")
51+
52+
assert.ElementsMatch(t, msg.Headers, []kafka.Header{
53+
{Key: "foo", Value: []byte("bar2")},
54+
{Key: "foo2", Value: []byte("bar3")},
55+
{Key: "foo3", Value: []byte("bar4")},
56+
})
57+
}

kq/internal/trace.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package internal
2+
3+
import "go.opentelemetry.io/otel/propagation"
4+
5+
var _ propagation.TextMapCarrier = (*MessageCarrier)(nil)
6+
7+
// MessageCarrier injects and extracts traces from a types.Message.
8+
type MessageCarrier struct {
9+
msg *Message
10+
}
11+
12+
// NewMessageCarrier returns a new MessageCarrier.
13+
func NewMessageCarrier(msg *Message) MessageCarrier {
14+
return MessageCarrier{msg: msg}
15+
}
16+
17+
// Get returns the value associated with the passed key.
18+
func (m MessageCarrier) Get(key string) string {
19+
return m.msg.GetHeader(key)
20+
}
21+
22+
// Set stores the key-value pair.
23+
func (m MessageCarrier) Set(key string, value string) {
24+
m.msg.SetHeader(key, value)
25+
}
26+
27+
// Keys lists the keys stored in this carrier.
28+
func (m MessageCarrier) Keys() []string {
29+
out := make([]string, len(m.msg.Headers))
30+
for i, h := range m.msg.Headers {
31+
out[i] = h.Key
32+
}
33+
34+
return out
35+
}

kq/internal/trace_test.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package internal
2+
3+
import (
4+
"testing"
5+
6+
"github.com/segmentio/kafka-go"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestMessageCarrierGet(t *testing.T) {
11+
testCases := []struct {
12+
name string
13+
carrier MessageCarrier
14+
key string
15+
expected string
16+
}{
17+
{
18+
name: "exists",
19+
carrier: NewMessageCarrier(&Message{&kafka.Message{Headers: []kafka.Header{
20+
{Key: "foo", Value: []byte("bar")},
21+
}}}),
22+
key: "foo",
23+
expected: "bar",
24+
},
25+
{
26+
name: "not exists",
27+
carrier: NewMessageCarrier(&Message{&kafka.Message{Headers: []kafka.Header{}}}),
28+
key: "foo",
29+
expected: "",
30+
},
31+
}
32+
33+
for _, tc := range testCases {
34+
t.Run(tc.name, func(t *testing.T) {
35+
result := tc.carrier.Get(tc.key)
36+
assert.Equal(t, tc.expected, result)
37+
})
38+
}
39+
}
40+
41+
func TestMessageCarrierSet(t *testing.T) {
42+
msg := Message{&kafka.Message{Headers: []kafka.Header{
43+
{Key: "foo", Value: []byte("bar")},
44+
}}}
45+
carrier := MessageCarrier{msg: &msg}
46+
47+
carrier.Set("foo", "bar2")
48+
carrier.Set("foo2", "bar2")
49+
carrier.Set("foo2", "bar3")
50+
carrier.Set("foo3", "bar4")
51+
52+
assert.ElementsMatch(t, carrier.msg.Headers, []kafka.Header{
53+
{Key: "foo", Value: []byte("bar2")},
54+
{Key: "foo2", Value: []byte("bar3")},
55+
{Key: "foo3", Value: []byte("bar4")},
56+
})
57+
}
58+
59+
func TestMessageCarrierKeys(t *testing.T) {
60+
testCases := []struct {
61+
name string
62+
carrier MessageCarrier
63+
expected []string
64+
}{
65+
{
66+
name: "one",
67+
carrier: MessageCarrier{msg: &Message{&kafka.Message{Headers: []kafka.Header{
68+
{Key: "foo", Value: []byte("bar")},
69+
}}}},
70+
expected: []string{"foo"},
71+
},
72+
{
73+
name: "none",
74+
carrier: MessageCarrier{msg: &Message{&kafka.Message{Headers: []kafka.Header{}}}},
75+
expected: []string{},
76+
},
77+
{
78+
name: "many",
79+
carrier: MessageCarrier{msg: &Message{&kafka.Message{Headers: []kafka.Header{
80+
{Key: "foo", Value: []byte("bar")},
81+
{Key: "baz", Value: []byte("quux")},
82+
}}}},
83+
expected: []string{"foo", "baz"},
84+
},
85+
}
86+
87+
for _, tc := range testCases {
88+
t.Run(tc.name, func(t *testing.T) {
89+
result := tc.carrier.Keys()
90+
assert.Equal(t, tc.expected, result)
91+
})
92+
}
93+
}

kq/pusher.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"time"
77

88
"github.com/segmentio/kafka-go"
9+
"github.com/zeromicro/go-queue/kq/internal"
910
"github.com/zeromicro/go-zero/core/executors"
1011
"github.com/zeromicro/go-zero/core/logx"
12+
"go.opentelemetry.io/otel"
1113
)
1214

1315
type (
@@ -96,15 +98,21 @@ func (p *Pusher) Name() string {
9698
}
9799

98100
// Push sends a message to the Kafka topic.
99-
func (p *Pusher) Push(v string) error {
101+
func (p *Pusher) Push(ctx context.Context, v string) error {
100102
msg := kafka.Message{
101103
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), // current timestamp
102104
Value: []byte(v),
103105
}
106+
107+
// wrap message into message carrier
108+
mc := internal.NewMessageCarrier(internal.NewMessage(&msg))
109+
// inject trace context into message
110+
otel.GetTextMapPropagator().Inject(ctx, mc)
111+
104112
if p.executor != nil {
105113
return p.executor.Add(msg, len(v))
106114
} else {
107-
return p.producer.WriteMessages(context.Background(), msg)
115+
return p.producer.WriteMessages(ctx, msg)
108116
}
109117
}
110118

@@ -129,8 +137,8 @@ func WithAllowAutoTopicCreation() PushOption {
129137
}
130138
}
131139

132-
// WithEnableSyncPush enables sync push.
133-
func WithEnableSyncPush() PushOption {
140+
// WithSyncPush enables the Pusher to push messages synchronously.
141+
func WithSyncPush() PushOption {
134142
return func(options *pushOptions) {
135143
options.enableSyncPush = true
136144
}

0 commit comments

Comments
 (0)