Skip to content

Commit e01000d

Browse files
authored
Merge pull request #202 from withchao/main
fix: kafka
2 parents 1fdc1d8 + f43c0d1 commit e01000d

File tree

6 files changed

+151
-291
lines changed

6 files changed

+151
-291
lines changed

mq/kafka/config.go

+84
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,90 @@
1414

1515
package kafka
1616

17+
import (
18+
"bytes"
19+
"strings"
20+
21+
"github.com/IBM/sarama"
22+
"github.com/openimsdk/tools/errs"
23+
)
24+
25+
func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) {
26+
kfk := sarama.NewConfig()
27+
kfk.Version = sarama.V2_0_0_0
28+
kfk.Consumer.Offsets.Initial = initial
29+
kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable
30+
kfk.Consumer.Return.Errors = false
31+
if conf.Username != "" || conf.Password != "" {
32+
kfk.Net.SASL.Enable = true
33+
kfk.Net.SASL.User = conf.Username
34+
kfk.Net.SASL.Password = conf.Password
35+
}
36+
if conf.TLS.EnableTLS {
37+
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
38+
if err != nil {
39+
return nil, err
40+
}
41+
kfk.Net.TLS.Config = tls
42+
kfk.Net.TLS.Enable = true
43+
}
44+
return kfk, nil
45+
}
46+
47+
func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) {
48+
cg, err := sarama.NewConsumerGroup(addr, groupID, conf)
49+
if err != nil {
50+
return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf)
51+
}
52+
return cg, nil
53+
}
54+
55+
func BuildProducerConfig(conf Config) (*sarama.Config, error) {
56+
kfk := sarama.NewConfig()
57+
kfk.Producer.Return.Successes = true
58+
kfk.Producer.Return.Errors = true
59+
kfk.Producer.Partitioner = sarama.NewHashPartitioner
60+
if conf.Username != "" || conf.Password != "" {
61+
kfk.Net.SASL.Enable = true
62+
kfk.Net.SASL.User = conf.Username
63+
kfk.Net.SASL.Password = conf.Password
64+
}
65+
switch strings.ToLower(conf.ProducerAck) {
66+
case "no_response":
67+
kfk.Producer.RequiredAcks = sarama.NoResponse
68+
case "wait_for_local":
69+
kfk.Producer.RequiredAcks = sarama.WaitForLocal
70+
case "wait_for_all":
71+
kfk.Producer.RequiredAcks = sarama.WaitForAll
72+
default:
73+
kfk.Producer.RequiredAcks = sarama.WaitForAll
74+
}
75+
if conf.CompressType == "" {
76+
kfk.Producer.Compression = sarama.CompressionNone
77+
} else {
78+
if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil {
79+
return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType)
80+
}
81+
}
82+
if conf.TLS.EnableTLS {
83+
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
84+
if err != nil {
85+
return nil, err
86+
}
87+
kfk.Net.TLS.Config = tls
88+
kfk.Net.TLS.Enable = true
89+
}
90+
return kfk, nil
91+
}
92+
93+
func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) {
94+
producer, err := sarama.NewSyncProducer(addr, conf)
95+
if err != nil {
96+
return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf)
97+
}
98+
return producer, nil
99+
}
100+
17101
type TLSConfig struct {
18102
EnableTLS bool `yaml:"enableTLS"`
19103
CACrt string `yaml:"caCrt"`

mq/kafka/consumer_group.go

-69
This file was deleted.

mq/kafka/kafka_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package kafka
2+
3+
import "testing"
4+
5+
func TestProducer(t *testing.T) {
6+
7+
}

mq/kafka/mq_consumer_group.go

+60-54
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@ package kafka
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"math/rand"
8+
"strings"
9+
"sync"
510

611
"github.com/IBM/sarama"
712
"github.com/openimsdk/tools/log"
13+
"github.com/openimsdk/tools/mcontext"
814
"github.com/openimsdk/tools/mq"
915
)
1016

1117
func NewMConsumerGroupV2(ctx context.Context, conf *Config, groupID string, topics []string, autoCommitEnable bool) (mq.Consumer, error) {
12-
config, err := BuildProducerConfig(*conf)
18+
config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable)
1319
if err != nil {
1420
return nil, err
1521
}
@@ -21,27 +27,26 @@ func NewMConsumerGroupV2(ctx context.Context, conf *Config, groupID string, topi
2127
topics: topics,
2228
groupID: groupID,
2329
consumer: group,
24-
session: make(chan *sessionClaim, 1),
25-
idle: make(chan struct{}, 1),
30+
msg: make(chan *consumerMessage, 64),
2631
}
27-
mcg.idle <- struct{}{}
32+
mcg.ctx, mcg.cancel = context.WithCancel(ctx)
2833
mcg.loopConsume()
2934
return mcg, nil
3035
}
3136

32-
type sessionClaim struct {
33-
session sarama.ConsumerGroupSession
34-
claim sarama.ConsumerGroupClaim
37+
type consumerMessage struct {
38+
Msg *sarama.ConsumerMessage
39+
Session sarama.ConsumerGroupSession
3540
}
3641

3742
type mqConsumerGroup struct {
3843
topics []string
3944
groupID string
4045
consumer sarama.ConsumerGroup
46+
ctx context.Context
4147
cancel context.CancelFunc
42-
idle chan struct{}
43-
session chan *sessionClaim
44-
curr *sessionClaim
48+
msg chan *consumerMessage
49+
lock sync.Mutex
4550
}
4651

4752
func (*mqConsumerGroup) Setup(sarama.ConsumerGroupSession) error { return nil }
@@ -50,65 +55,66 @@ func (*mqConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
5055
return nil
5156
}
5257

58+
func (x *mqConsumerGroup) closeMsgChan() {
59+
select {
60+
case <-x.ctx.Done():
61+
if x.lock.TryLock() {
62+
close(x.msg)
63+
x.lock.Unlock()
64+
}
65+
default:
66+
}
67+
}
68+
5369
func (x *mqConsumerGroup) loopConsume() {
54-
var ctx context.Context
55-
ctx, x.cancel = context.WithCancel(context.Background())
5670
go func() {
57-
defer func() {
58-
close(x.session)
59-
}()
71+
defer x.closeMsgChan()
72+
ctx := mcontext.SetOperationID(x.ctx, fmt.Sprintf("consumer_group_%s_%s_%d", strings.Join(x.topics, "_"), x.groupID, rand.Uint32()))
6073
for {
61-
select {
62-
case <-ctx.Done():
63-
return
64-
case x.idle <- struct{}{}:
65-
}
66-
if err := x.consumer.Consume(ctx, x.topics, x); err != nil {
74+
if err := x.consumer.Consume(x.ctx, x.topics, x); err != nil {
75+
switch {
76+
case errors.Is(err, context.Canceled):
77+
return
78+
case errors.Is(err, sarama.ErrClosedConsumerGroup):
79+
return
80+
}
6781
log.ZWarn(ctx, "consume err", err, "topic", x.topics, "groupID", x.groupID)
6882
}
6983
}
7084
}()
7185
}
7286

7387
func (x *mqConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
74-
x.session <- &sessionClaim{session, claim}
75-
return nil
76-
}
77-
78-
func (x *mqConsumerGroup) Subscribe(ctx context.Context, fn mq.Handler) error {
88+
defer x.closeMsgChan()
89+
msg := claim.Messages()
7990
for {
80-
curr := x.curr
81-
if curr == nil {
82-
select {
83-
case <-ctx.Done():
84-
return ctx.Err()
85-
case val, ok := <-x.session:
86-
if !ok {
87-
return sarama.ErrClosedConsumerGroup
88-
}
89-
curr = val
90-
x.curr = val
91-
}
92-
}
9391
select {
94-
case <-ctx.Done():
95-
return ctx.Err()
96-
case val, ok := <-curr.claim.Messages():
92+
case <-x.ctx.Done():
93+
return context.Canceled
94+
case val, ok := <-msg:
9795
if !ok {
98-
x.curr = nil
99-
select {
100-
case <-x.idle:
101-
default:
102-
}
103-
continue
96+
return nil
10497
}
105-
ctx := GetContextWithMQHeader(val.Headers)
106-
if err := fn(ctx, string(val.Key), val.Value); err != nil {
107-
return err
108-
}
109-
curr.session.MarkMessage(val, "")
110-
curr.session.Commit()
98+
x.msg <- &consumerMessage{Msg: val, Session: session}
99+
}
100+
}
101+
}
102+
103+
func (x *mqConsumerGroup) Subscribe(ctx context.Context, fn mq.Handler) error {
104+
select {
105+
case <-ctx.Done():
106+
return context.Cause(ctx)
107+
case msg, ok := <-x.msg:
108+
if !ok {
109+
return sarama.ErrClosedConsumerGroup
110+
}
111+
ctx := GetContextWithMQHeader(msg.Msg.Headers)
112+
if err := fn(ctx, string(msg.Msg.Key), msg.Msg.Value); err != nil {
113+
return err
111114
}
115+
msg.Session.MarkMessage(msg.Msg, "")
116+
msg.Session.Commit()
117+
return nil
112118
}
113119
}
114120

0 commit comments

Comments
 (0)