@@ -2,21 +2,29 @@ package kq
2
2
3
3
import (
4
4
"context"
5
+ "crypto/tls"
6
+ "crypto/x509"
7
+ "errors"
5
8
"io"
6
9
"log"
10
+ "os"
7
11
"time"
8
12
9
13
"github.com/segmentio/kafka-go"
10
14
_ "github.com/segmentio/kafka-go/gzip"
11
15
_ "github.com/segmentio/kafka-go/lz4"
12
16
"github.com/segmentio/kafka-go/sasl/plain"
13
17
_ "github.com/segmentio/kafka-go/snappy"
18
+ "github.com/zeromicro/go-queue/kq/internal"
19
+ "github.com/zeromicro/go-zero/core/contextx"
20
+ "github.com/zeromicro/go-zero/core/logc"
14
21
"github.com/zeromicro/go-zero/core/logx"
15
22
"github.com/zeromicro/go-zero/core/queue"
16
23
"github.com/zeromicro/go-zero/core/service"
17
24
"github.com/zeromicro/go-zero/core/stat"
18
25
"github.com/zeromicro/go-zero/core/threading"
19
26
"github.com/zeromicro/go-zero/core/timex"
27
+ "go.opentelemetry.io/otel"
20
28
)
21
29
22
30
const (
@@ -26,12 +34,12 @@ const (
26
34
)
27
35
28
36
type (
29
- ConsumeHandle func (key , value string ) error
37
+ ConsumeHandle func (ctx context. Context , key , value string ) error
30
38
31
- ConsumeErrorHandler func (msg kafka.Message , err error )
39
+ ConsumeErrorHandler func (ctx context. Context , msg kafka.Message , err error )
32
40
33
41
ConsumeHandler interface {
34
- Consume (key , value string ) error
42
+ Consume (ctx context. Context , key , value string ) error
35
43
}
36
44
37
45
queueOptions struct {
51
59
channel chan kafka.Message
52
60
producerRoutines * threading.RoutineGroup
53
61
consumerRoutines * threading.RoutineGroup
62
+ commitRunner * threading.StableRunner [kafka.Message , kafka.Message ]
54
63
metrics * stat.Metrics
55
64
errorHandler ConsumeErrorHandler
56
65
}
@@ -121,9 +130,26 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
121
130
},
122
131
}
123
132
}
133
+ if len (c .CaFile ) > 0 {
134
+ caCert , err := os .ReadFile (c .CaFile )
135
+ if err != nil {
136
+ log .Fatal (err )
137
+ }
138
+
139
+ caCertPool := x509 .NewCertPool ()
140
+ ok := caCertPool .AppendCertsFromPEM (caCert )
141
+ if ! ok {
142
+ log .Fatal (err )
143
+ }
144
+
145
+ readerConfig .Dialer .TLS = & tls.Config {
146
+ RootCAs : caCertPool ,
147
+ InsecureSkipVerify : true ,
148
+ }
149
+ }
124
150
consumer := kafka .NewReader (readerConfig )
125
151
126
- return & kafkaQueue {
152
+ q := & kafkaQueue {
127
153
c : c ,
128
154
consumer : consumer ,
129
155
handler : handler ,
@@ -133,25 +159,51 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
133
159
metrics : options .metrics ,
134
160
errorHandler : options .errorHandler ,
135
161
}
162
+ if c .CommitInOrder {
163
+ q .commitRunner = threading .NewStableRunner (func (msg kafka.Message ) kafka.Message {
164
+ if err := q .consumeOne (context .Background (), string (msg .Key ), string (msg .Value )); err != nil {
165
+ if q .errorHandler != nil {
166
+ q .errorHandler (context .Background (), msg , err )
167
+ }
168
+ }
169
+
170
+ return msg
171
+ })
172
+ }
173
+
174
+ return q
136
175
}
137
176
138
177
func (q * kafkaQueue ) Start () {
139
- q . startConsumers ()
140
- q . startProducers ()
178
+ if q . c . CommitInOrder {
179
+ go q . commitInOrder ()
141
180
142
- q .producerRoutines .Wait ()
143
- close (q .channel )
144
- q .consumerRoutines .Wait ()
181
+ if err := q .consume (func (msg kafka.Message ) {
182
+ if e := q .commitRunner .Push (msg ); e != nil {
183
+ logx .Error (e )
184
+ }
185
+ }); err != nil {
186
+ logx .Error (err )
187
+ }
188
+ } else {
189
+ q .startConsumers ()
190
+ q .startProducers ()
191
+ q .producerRoutines .Wait ()
192
+ close (q .channel )
193
+ q .consumerRoutines .Wait ()
194
+
195
+ logx .Infof ("Consumer %s is closed" , q .c .Name )
196
+ }
145
197
}
146
198
147
199
func (q * kafkaQueue ) Stop () {
148
200
q .consumer .Close ()
149
201
logx .Close ()
150
202
}
151
203
152
- func (q * kafkaQueue ) consumeOne (key , val string ) error {
204
+ func (q * kafkaQueue ) consumeOne (ctx context. Context , key , val string ) error {
153
205
startTime := timex .Now ()
154
- err := q .handler .Consume (key , val )
206
+ err := q .handler .Consume (ctx , key , val )
155
207
q .metrics .Add (stat.Task {
156
208
Duration : timex .Since (startTime ),
157
209
})
@@ -162,18 +214,25 @@ func (q *kafkaQueue) startConsumers() {
162
214
for i := 0 ; i < q .c .Processors ; i ++ {
163
215
q .consumerRoutines .Run (func () {
164
216
for msg := range q .channel {
165
- if err := q .consumeOne (string (msg .Key ), string (msg .Value )); err != nil {
217
+ // wrap message into message carrier
218
+ mc := internal .NewMessageCarrier (internal .NewMessage (& msg ))
219
+ // extract trace context from message
220
+ ctx := otel .GetTextMapPropagator ().Extract (context .Background (), mc )
221
+ // remove deadline and error control
222
+ ctx = contextx .ValueOnlyFrom (ctx )
223
+
224
+ if err := q .consumeOne (ctx , string (msg .Key ), string (msg .Value )); err != nil {
166
225
if q .errorHandler != nil {
167
- q .errorHandler (msg , err )
226
+ q .errorHandler (ctx , msg , err )
168
227
}
169
228
170
229
if ! q .c .ForceCommit {
171
230
continue
172
231
}
173
232
}
174
233
175
- if err := q .consumer .CommitMessages (context . Background () , msg ); err != nil {
176
- logx .Errorf ("commit failed, error: %v" , err )
234
+ if err := q .consumer .CommitMessages (ctx , msg ); err != nil {
235
+ logc .Errorf (ctx , "commit failed, error: %v" , err )
177
236
}
178
237
}
179
238
})
@@ -182,25 +241,50 @@ func (q *kafkaQueue) startConsumers() {
182
241
183
242
func (q * kafkaQueue ) startProducers () {
184
243
for i := 0 ; i < q .c .Consumers ; i ++ {
244
+ i := i
185
245
q .producerRoutines .Run (func () {
186
- for {
187
- msg , err := q .consumer .FetchMessage (context .Background ())
188
- // io.EOF means consumer closed
189
- // io.ErrClosedPipe means committing messages on the consumer,
190
- // kafka will refire the messages on uncommitted messages, ignore
191
- if err == io .EOF || err == io .ErrClosedPipe {
192
- return
193
- }
194
- if err != nil {
195
- logx .Errorf ("Error on reading message, %q" , err .Error ())
196
- continue
197
- }
246
+ if err := q .consume (func (msg kafka.Message ) {
198
247
q .channel <- msg
248
+ }); err != nil {
249
+ logx .Infof ("Consumer %s-%d is closed, error: %q" , q .c .Name , i , err .Error ())
250
+ return
199
251
}
200
252
})
201
253
}
202
254
}
203
255
256
+ func (q * kafkaQueue ) consume (handle func (msg kafka.Message )) error {
257
+ for {
258
+ msg , err := q .consumer .FetchMessage (context .Background ())
259
+ // io.EOF means consumer closed
260
+ // io.ErrClosedPipe means committing messages on the consumer,
261
+ // kafka will refire the messages on uncommitted messages, ignore
262
+ if err == io .EOF || errors .Is (err , io .ErrClosedPipe ) {
263
+ return err
264
+ }
265
+ if err != nil {
266
+ logx .Errorf ("Error on reading message, %q" , err .Error ())
267
+ continue
268
+ }
269
+
270
+ handle (msg )
271
+ }
272
+ }
273
+
274
+ func (q * kafkaQueue ) commitInOrder () {
275
+ for {
276
+ msg , err := q .commitRunner .Get ()
277
+ if err != nil {
278
+ logx .Error (err )
279
+ return
280
+ }
281
+
282
+ if err := q .consumer .CommitMessages (context .Background (), msg ); err != nil {
283
+ logx .Errorf ("commit failed, error: %v" , err )
284
+ }
285
+ }
286
+ }
287
+
204
288
func (q kafkaQueues ) Start () {
205
289
for _ , each := range q .queues {
206
290
q .group .Add (each )
@@ -252,8 +336,8 @@ type innerConsumeHandler struct {
252
336
handle ConsumeHandle
253
337
}
254
338
255
- func (ch innerConsumeHandler ) Consume (k , v string ) error {
256
- return ch .handle (k , v )
339
+ func (ch innerConsumeHandler ) Consume (ctx context. Context , k , v string ) error {
340
+ return ch .handle (ctx , k , v )
257
341
}
258
342
259
343
func ensureQueueOptions (c KqConf , options * queueOptions ) {
@@ -270,8 +354,8 @@ func ensureQueueOptions(c KqConf, options *queueOptions) {
270
354
options .metrics = stat .NewMetrics (c .Name )
271
355
}
272
356
if options .errorHandler == nil {
273
- options .errorHandler = func (msg kafka.Message , err error ) {
274
- logx .Errorf ("consume: %s, error: %v" , string (msg .Value ), err )
357
+ options .errorHandler = func (ctx context. Context , msg kafka.Message , err error ) {
358
+ logc .Errorf (ctx , "consume: %s, error: %v" , string (msg .Value ), err )
275
359
}
276
360
}
277
361
}
0 commit comments