Skip to content

Commit 00dd2b0

Browse files
committed
feat: commit in order
1 parent 328f46c commit 00dd2b0

File tree

4 files changed

+97
-39
lines changed

4 files changed

+97
-39
lines changed

go.mod

+8-8
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/nats-io/stan.go v0.10.4
88
github.com/rabbitmq/amqp091-go v1.9.0
99
github.com/segmentio/kafka-go v0.4.38
10-
github.com/zeromicro/go-zero v1.6.3
10+
github.com/zeromicro/go-zero v1.6.4
1111
)
1212

1313
require (
@@ -19,7 +19,7 @@ require (
1919
github.com/go-logr/logr v1.3.0 // indirect
2020
github.com/go-logr/stdr v1.2.2 // indirect
2121
github.com/gogo/protobuf v1.3.2 // indirect
22-
github.com/golang/protobuf v1.5.3 // indirect
22+
github.com/golang/protobuf v1.5.4 // indirect
2323
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect
2424
github.com/klauspost/compress v1.17.7 // indirect
2525
github.com/mattn/go-colorable v0.1.13 // indirect
@@ -31,7 +31,7 @@ require (
3131
github.com/nats-io/nkeys v0.4.7 // indirect
3232
github.com/nats-io/nuid v1.0.1 // indirect
3333
github.com/openzipkin/zipkin-go v0.4.2 // indirect
34-
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
34+
github.com/pelletier/go-toml/v2 v2.2.0 // indirect
3535
github.com/pierrec/lz4/v4 v4.1.17 // indirect
3636
github.com/prometheus/client_golang v1.18.0 // indirect
3737
github.com/prometheus/client_model v0.5.0 // indirect
@@ -52,12 +52,12 @@ require (
5252
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
5353
go.uber.org/automaxprocs v1.5.3 // indirect
5454
golang.org/x/crypto v0.22.0 // indirect
55-
golang.org/x/net v0.21.0 // indirect
55+
golang.org/x/net v0.24.0 // indirect
5656
golang.org/x/sys v0.19.0 // indirect
5757
golang.org/x/text v0.14.0 // indirect
58-
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
59-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
60-
google.golang.org/grpc v1.62.0 // indirect
61-
google.golang.org/protobuf v1.32.0 // indirect
58+
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
59+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
60+
google.golang.org/grpc v1.63.0 // indirect
61+
google.golang.org/protobuf v1.33.0 // indirect
6262
gopkg.in/yaml.v2 v2.4.0 // indirect
6363
)

go.sum

+10
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
4343
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
4444
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
4545
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
46+
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
4647
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
4748
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
4849
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@@ -116,6 +117,7 @@ github.com/openzipkin/zipkin-go v0.4.2/go.mod h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hP
116117
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
117118
github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
118119
github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
120+
github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
119121
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
120122
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
121123
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
@@ -150,13 +152,15 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
150152
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
151153
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
152154
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
155+
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
153156
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
154157
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
155158
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
156159
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
157160
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
158161
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
159162
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
163+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
160164
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
161165
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
162166
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
@@ -168,6 +172,7 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
168172
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
169173
github.com/zeromicro/go-zero v1.6.3 h1:OL0NnHD5LdRNDolfcK9vUkJt7K8TcBE3RkzfM8poOVw=
170174
github.com/zeromicro/go-zero v1.6.3/go.mod h1:XZL435ZxVi9MSXXtw2MRQhHgx6OoX3++MRMOE9xU70c=
175+
github.com/zeromicro/go-zero v1.6.4/go.mod h1:dQ39Zoz20/6x/SUhFXyEEg8lWjl+CO3dzg8Je2xG63Q=
171176
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
172177
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
173178
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
@@ -222,6 +227,7 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
222227
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
223228
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
224229
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
230+
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
225231
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
226232
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
227233
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -271,10 +277,13 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
271277
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ=
272278
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU=
273279
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA=
280+
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8=
274281
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM=
275282
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
283+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY=
276284
google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk=
277285
google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
286+
google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
278287
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
279288
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
280289
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -285,6 +294,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
285294
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
286295
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
287296
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
297+
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
288298
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
289299
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
290300
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

kq/config.go

+14-13
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,18 @@ const (
99

1010
type KqConf struct {
1111
service.ServiceConf
12-
Brokers []string
13-
Group string
14-
Topic string
15-
CaFile string `json:",optional"`
16-
Offset string `json:",options=first|last,default=last"`
17-
Conns int `json:",default=1"`
18-
Consumers int `json:",default=8"`
19-
Processors int `json:",default=8"`
20-
MinBytes int `json:",default=10240"` // 10K
21-
MaxBytes int `json:",default=10485760"` // 10M
22-
Username string `json:",optional"`
23-
Password string `json:",optional"`
24-
ForceCommit bool `json:",default=true"`
12+
Brokers []string
13+
Group string
14+
Topic string
15+
CaFile string `json:",optional"`
16+
Offset string `json:",options=first|last,default=last"`
17+
Conns int `json:",default=1"`
18+
Consumers int `json:",default=8"`
19+
Processors int `json:",default=8"`
20+
MinBytes int `json:",default=10240"` // 10K
21+
MaxBytes int `json:",default=10485760"` // 10M
22+
Username string `json:",optional"`
23+
Password string `json:",optional"`
24+
ForceCommit bool `json:",default=true"`
25+
CommitInOrder bool `json:",default=false"`
2526
}

kq/queue.go

+65-18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7+
"errors"
78
"io"
89
"log"
910
"os"
@@ -54,6 +55,7 @@ type (
5455
channel chan kafka.Message
5556
producerRoutines *threading.RoutineGroup
5657
consumerRoutines *threading.RoutineGroup
58+
commitRunner *threading.StableRunner[kafka.Message, kafka.Message]
5759
metrics *stat.Metrics
5860
errorHandler ConsumeErrorHandler
5961
}
@@ -143,7 +145,7 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
143145
}
144146
consumer := kafka.NewReader(readerConfig)
145147

146-
return &kafkaQueue{
148+
q := &kafkaQueue{
147149
c: c,
148150
consumer: consumer,
149151
handler: handler,
@@ -153,15 +155,37 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
153155
metrics: options.metrics,
154156
errorHandler: options.errorHandler,
155157
}
158+
q.commitRunner = threading.NewStableRunner(func(msg kafka.Message) kafka.Message {
159+
if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
160+
if q.errorHandler != nil {
161+
q.errorHandler(msg, err)
162+
}
163+
}
164+
165+
return msg
166+
})
167+
168+
return q
156169
}
157170

158171
func (q *kafkaQueue) Start() {
159-
q.startConsumers()
160-
q.startProducers()
172+
if q.c.CommitInOrder {
173+
go q.commitInOrder()
161174

162-
q.producerRoutines.Wait()
163-
close(q.channel)
164-
q.consumerRoutines.Wait()
175+
if err := q.consume(func(msg kafka.Message) {
176+
if e := q.commitRunner.Push(msg); e != nil {
177+
logx.Error(e)
178+
}
179+
}); err != nil {
180+
logx.Error(err)
181+
}
182+
} else {
183+
q.startConsumers()
184+
q.startProducers()
185+
q.producerRoutines.Wait()
186+
close(q.channel)
187+
q.consumerRoutines.Wait()
188+
}
165189
}
166190

167191
func (q *kafkaQueue) Stop() {
@@ -203,24 +227,47 @@ func (q *kafkaQueue) startConsumers() {
203227
func (q *kafkaQueue) startProducers() {
204228
for i := 0; i < q.c.Consumers; i++ {
205229
q.producerRoutines.Run(func() {
206-
for {
207-
msg, err := q.consumer.FetchMessage(context.Background())
208-
// io.EOF means consumer closed
209-
// io.ErrClosedPipe means committing messages on the consumer,
210-
// kafka will refire the messages on uncommitted messages, ignore
211-
if err == io.EOF || err == io.ErrClosedPipe {
212-
return
213-
}
214-
if err != nil {
215-
logx.Errorf("Error on reading message, %q", err.Error())
216-
continue
217-
}
230+
if err := q.consume(func(msg kafka.Message) {
218231
q.channel <- msg
232+
}); err != nil {
233+
return
219234
}
220235
})
221236
}
222237
}
223238

239+
func (q *kafkaQueue) consume(handle func(msg kafka.Message)) error {
240+
for {
241+
msg, err := q.consumer.FetchMessage(context.Background())
242+
// io.EOF means consumer closed
243+
// io.ErrClosedPipe means committing messages on the consumer,
244+
// kafka will refire the messages on uncommitted messages, ignore
245+
if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
246+
return err
247+
}
248+
if err != nil {
249+
logx.Errorf("Error on reading message, %q", err.Error())
250+
continue
251+
}
252+
253+
handle(msg)
254+
}
255+
}
256+
257+
func (q *kafkaQueue) commitInOrder() {
258+
for {
259+
msg, err := q.commitRunner.Get()
260+
if err != nil {
261+
logx.Error(err)
262+
return
263+
}
264+
265+
if err := q.consumer.CommitMessages(context.Background(), msg); err != nil {
266+
logx.Errorf("commit failed, error: %v", err)
267+
}
268+
}
269+
}
270+
224271
func (q kafkaQueues) Start() {
225272
for _, each := range q.queues {
226273
q.group.Add(each)

0 commit comments

Comments
 (0)