Skip to content

Commit 4a99c09

Browse files
authored
Merge pull request #56 from ShaPoHun/SASL_SSL
support kafka connection type: SASL_SSL
2 parents c223788 + 418acc8 commit 4a99c09

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

Diff for: kq/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type KqConf struct {
1212
Brokers []string
1313
Group string
1414
Topic string
15+
CaFile string `json:",optional"`
1516
Offset string `json:",options=first|last,default=last"`
1617
Conns int `json:",default=1"`
1718
Consumers int `json:",default=8"`

Diff for: kq/queue.go

+20
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package kq
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
57
"io"
68
"log"
9+
"os"
710
"time"
811

912
"github.com/segmentio/kafka-go"
@@ -121,6 +124,23 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
121124
},
122125
}
123126
}
127+
if len(c.CaFile) > 0 {
128+
caCert, err := os.ReadFile(c.CaFile)
129+
if err != nil {
130+
log.Fatal(err)
131+
}
132+
133+
caCertPool := x509.NewCertPool()
134+
ok := caCertPool.AppendCertsFromPEM(caCert)
135+
if !ok {
136+
log.Fatal(err)
137+
}
138+
139+
readerConfig.Dialer.TLS = &tls.Config{
140+
RootCAs: caCertPool,
141+
InsecureSkipVerify: true,
142+
}
143+
}
124144
consumer := kafka.NewReader(readerConfig)
125145

126146
return &kafkaQueue{

0 commit comments

Comments
 (0)