Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
94383f9
sasl password key as a config parameter (#6)
rashi-chandola Dec 15, 2021
09df3ef
adapter and prometheus registry
Mar 17, 2022
20641fc
load from config
Mar 24, 2022
b3e7734
test on k8s
Mar 29, 2022
1b942be
check on flow
Apr 4, 2022
0bb0236
check on flow
Apr 5, 2022
b9b97e0
expose port
Apr 7, 2022
ec6405a
currentoffset populated
Apr 11, 2022
80eef9e
config from adapter template
Apr 18, 2022
2f79778
config from adapter template
Apr 19, 2022
00ab668
config from adapter template
Apr 19, 2022
820092f
remove vendor changes
Apr 20, 2022
7a3a4e3
remove vendor changes
Apr 20, 2022
46dc551
remove vendor changes
Apr 20, 2022
a83235c
pr changes
Apr 22, 2022
9b33121
PR changes 2
Apr 25, 2022
f9bfffa
cosmos metrics (#8)
karthik0708 Apr 25, 2022
97e4d41
zk optimization
Apr 28, 2022
e08fc84
offset at source and sink
Apr 29, 2022
194edee
Merge branch 'develop' of github.com:karthik0708/go-dmux into current…
Apr 29, 2022
017f9e5
offset at source and sink
Apr 29, 2022
51c28d0
offset at source and sink
Apr 29, 2022
ea15322
offset at source and sink
Apr 29, 2022
63b3a23
interfaces implementations
May 2, 2022
853b243
dependency injection
May 9, 2022
7eeb78b
log payload
May 12, 2022
42e1847
multitopic reading fix
May 18, 2022
4d102c3
comments
May 23, 2022
30631f8
comments
May 23, 2022
e3f5828
seperate metrics at registry
Jun 2, 2022
36f282f
seperate metrics at registry
Jun 2, 2022
e5d2494
structured prometheus implementation
Jun 2, 2022
bedb5e2
structured prometheus implementation
Jun 2, 2022
94f40d2
simplified arguments
Jun 3, 2022
ab54bb0
simplified arguments
Jun 3, 2022
00bc1ec
removed lastDetail map
Jun 7, 2022
b973a74
removed lastDetail map
Jun 7, 2022
ac92064
removed lastDetail map
Jun 7, 2022
41a60fb
removed fetching from collector
Jun 7, 2022
2b8248b
removed fetching from collector
Jun 8, 2022
d53a29d
collector and map implementation
Jun 10, 2022
1714af2
Merge pull request #1 from flipkart-incubator/main
karthik0708 Jun 20, 2022
39e8015
merged main
Jun 20, 2022
0dc6a13
merged main
Jun 20, 2022
2fa4ab8
merged main
Jun 21, 2022
26d22bf
merge fix
Jun 21, 2022
d89d674
edge cases
Jun 22, 2022
7511896
PR changes
Jun 29, 2022
17e8272
simplified map
Jun 30, 2022
746dbdf
metricConf
Jun 30, 2022
b3f0225
offsets with connection name
Jul 26, 2022
5dc02cf
producer offset from broker
Aug 2, 2022
a225431
producer offset from broker
Aug 2, 2022
697a7c9
connectionName and producer offset
Aug 4, 2022
f655d88
goroutine avoiding
Aug 4, 2022
16ba03d
producer offset
Aug 9, 2022
fefcfc4
graceful shutdown for reading offset
Aug 10, 2022
06e1977
lag
Sep 22, 2022
f6c836b
lag
Sep 23, 2022
486ad21
lag
Sep 23, 2022
1aec04d
current offset
Sep 24, 2022
52aabdb
current offset
Sep 25, 2022
9f801a4
throtlle
Sep 26, 2022
fd8e048
simpler passing of configs
Sep 26, 2022
fa5f7a2
simpler passing of configs
Sep 26, 2022
7ad02a4
throttle
Sep 26, 2022
a2f60c6
deadlock
Sep 27, 2022
ed0ee3f
throttle
Sep 27, 2022
8564804
throttle
Sep 27, 2022
1b1ea2f
default interval
Sep 27, 2022
63ff8a2
throttle
Sep 28, 2022
4494175
Merge branch 'currentOffset' into throttle
Sep 28, 2022
c40b14b
sinkOffset
Sep 28, 2022
7c507b1
throttle
Sep 28, 2022
cd49a9f
throttle
Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"topic": "sample-topic",
"kafka_version_major": 2,
"force_restart": true,
"read_newest": true
"read_newest": true,
"throttle_threshold": 2
},
"pending_acks": 1000000,
"sink": {
Expand All @@ -40,6 +41,7 @@
}
],
"metric_port": 9999,
"offset_polling_interval": "1s",
"logging": {
"type": "file",
"config": {
Expand Down
21 changes: 14 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"encoding/json"
"github.com/go-dmux/core"
"io/ioutil"
"log"
"os"
"time"

"github.com/go-dmux/connection"
"github.com/go-dmux/logging"
Expand Down Expand Up @@ -37,19 +39,23 @@ func (c ConnectionType) getConfig(data []byte) interface{} {
}

//Start invokes Run of the respective connection in a go routine
func (c ConnectionType) Start(conf interface{}, enableDebug bool) {
func (c ConnectionType) Start(conf interface{}, enableDebug bool, name string, offsetPollinginterval time.Duration) {
switch c {
case KafkaHTTP:
connObj := &connection.KafkaHTTPConn{
EnableDebugLog: enableDebug,
Conf: conf,
Name: name,
EnableDebugLog: enableDebug,
Conf: conf,
PollingInterval: offsetPollinginterval,
}
log.Println("Starting ", KafkaHTTP)
connObj.Run()
case KafkaFoxtrot:
connObj := &connection.KafkaFoxtrotConn{
EnableDebugLog: enableDebug,
Conf: conf,
Name: name,
EnableDebugLog: enableDebug,
Conf: conf,
PollingInterval: offsetPollinginterval,
}
log.Println("Starting ", KafkaFoxtrot)
connObj.Run()
Expand All @@ -70,8 +76,9 @@ type DmuxConf struct {
Name string `json:"name"`
DMuxItems []DmuxItem `json:"dmuxItems"`
// DMuxMap map[string]KafkaHTTPConnConfig `json:"dmuxMap"`
MetricPort int `json:"metric_port"`
Logging logging.LogConf `json:"logging"`
MetricPort int `json:"metric_port"`
OffsetPollingInterval core.Duration `json:"offset_polling_interval"`
Logging logging.LogConf `json:"logging"`
}

//DmuxItem struct defines name and type of connection
Expand Down
12 changes: 9 additions & 3 deletions connection/kafka_foxtrot_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/go-dmux/core"
Expand All @@ -22,8 +23,10 @@ type KafkaFoxtrotConnConfig struct {

//KafkaFoxtrotConn struct to abstract this connections Run
type KafkaFoxtrotConn struct {
EnableDebugLog bool
Conf interface{}
Name string
EnableDebugLog bool
Conf interface{}
PollingInterval time.Duration
}

//CustomURLKey place holder name, which will be replaced by kafka key
Expand All @@ -47,11 +50,14 @@ func (c *KafkaFoxtrotConn) Run() {
}
kafkaMsgFactory := getKafkaFoxtrotFactory()
src := source.GetKafkaSource(conf.Source, kafkaMsgFactory)
offsetTracker := source.GetKafkaOffsetTracker(conf.PendingAcks, src)
offsetTracker := source.GetKafkaOffsetTracker(conf.PendingAcks, src, c.Name)
hook := GetKafkaHook(offsetTracker, c.EnableDebugLog)
sk := sink.GetHTTPSink(conf.Dmux.Size, conf.Sink)
sk.RegisterHook(hook)
sk.SetConnectionName(c.Name)
src.RegisterHook(hook)
src.SetConnectionName(c.Name)
src.SetPollinginterval(c.PollingInterval)

//hash distribution
h := GetKafkaMsgHasher()
Expand Down
12 changes: 9 additions & 3 deletions connection/kafka_http_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/go-dmux/core"
Expand All @@ -27,8 +28,10 @@ type KafkaHTTPConnConfig struct {

//KafkaHTTPConn struct to abstract this connections Run
type KafkaHTTPConn struct {
EnableDebugLog bool
Conf interface{}
Name string
EnableDebugLog bool
Conf interface{}
PollingInterval time.Duration
}

func (c *KafkaHTTPConn) getConfiguration() *KafkaHTTPConnConfig {
Expand All @@ -49,11 +52,14 @@ func (c *KafkaHTTPConn) Run() {
}
kafkaMsgFactory := getKafkaHTTPFactory()
src := source.GetKafkaSource(conf.Source, kafkaMsgFactory)
offsetTracker := source.GetKafkaOffsetTracker(conf.PendingAcks, src)
offsetTracker := source.GetKafkaOffsetTracker(conf.PendingAcks, src, c.Name)
hook := GetKafkaHook(offsetTracker, c.EnableDebugLog)
sk := sink.GetHTTPSink(conf.Dmux.Size, conf.Sink)
sk.RegisterHook(hook)
sk.SetConnectionName(c.Name)
src.RegisterHook(hook)
src.SetConnectionName(c.Name)
src.SetPollinginterval(c.PollingInterval)

//hash distribution
h := GetKafkaMsgHasher()
Expand Down
5 changes: 5 additions & 0 deletions http/http_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type HTTPSink struct {
client *http.Client
hook HTTPSinkHook
conf HTTPSinkConf
name string
}

//HTTPSinkConf holds config to HTTPSink
Expand Down Expand Up @@ -92,6 +93,10 @@ func (h *HTTPSink) RegisterHook(hook HTTPSinkHook) {
h.hook = hook
}

func (h *HTTPSink) SetConnectionName(name string) {
h.name = name
}

//HTTPMsg is an interface which incoming data should implment for HttpSink to
//work
type HTTPMsg interface {
Expand Down
37 changes: 30 additions & 7 deletions kafka/consumer-group/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package consumergroup
import (
"errors"
"fmt"
"github.com/go-dmux/metrics"
"github.com/prometheus/client_golang/prometheus"
"sync"
"time"

"github.com/go-dmux/kafka/kazoo-go"
"github.com/Shopify/sarama"
"github.com/go-dmux/kafka/kazoo-go"
)

var (
Expand Down Expand Up @@ -83,7 +85,7 @@ type ConsumerGroup struct {
}

// Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config, connectionName string, brokerList *[]string) (cg *ConsumerGroup, err error) {

if name == "" {
return nil, sarama.ConfigurationError("Empty consumergroup name")
Expand Down Expand Up @@ -116,6 +118,8 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
if err != nil {
kz.Close()
return
} else {
*brokerList = brokers
}

group := kz.Consumergroup(name)
Expand Down Expand Up @@ -176,7 +180,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)

go cg.topicListConsumer(topics)
go cg.topicListConsumer(connectionName, topics)

return
}
Expand Down Expand Up @@ -250,7 +254,7 @@ func (cg *ConsumerGroup) FlushOffsets() error {
return cg.offsetManager.Flush()
}

func (cg *ConsumerGroup) topicListConsumer(topics []string) {
func (cg *ConsumerGroup) topicListConsumer(connectionName string, topics []string) {
for {
select {
case <-cg.stopper:
Expand All @@ -271,7 +275,7 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {

for _, topic := range topics {
cg.wg.Add(1)
go cg.topicConsumer(topic, cg.messages, cg.errors, stopper)
go cg.topicConsumer(connectionName, topic, cg.messages, cg.errors, stopper)
}

select {
Expand Down Expand Up @@ -299,7 +303,7 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {
}
}

func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error, stopper <-chan struct{}) {
func (cg *ConsumerGroup) topicConsumer(connectionName string, topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error, stopper <-chan struct{}) {
defer cg.wg.Done()

select {
Expand Down Expand Up @@ -340,7 +344,15 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
// Consume all the assigned partitions
var wg sync.WaitGroup
for _, pid := range myPartitions {

//Create PartitionInfo and send it for ingestion through the partition channel
//In case of re-balancing this function will be triggered again and the latest information will be sent

metricName := connectionName + "." + topic + "." + cg.instance.ID + "." + time.Now().Format(time.RFC850)
metrics.Reg.Ingest(metrics.Metric{
MetricType: prometheus.GaugeValue,
MetricName: "partition_owned." + metricName,
MetricValue: int64(pid.ID),
})
wg.Add(1)
go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper)
}
Expand Down Expand Up @@ -510,3 +522,14 @@ partitionConsumerLoop:
cg.Logf("%s/%d :: %s\n", topic, partition, err)
}
}

//GetConsumerOffset fetches the highest offset for a partition
func (c *ConsumerGroup) GetConsumerOffset(topic string, partition int32) (int64, error) {
mark := c.consumer.HighWaterMarks()
if partitions, ok := mark[topic]; ok {
if offset, ok1 := partitions[partition]; ok1 {
return offset, nil
}
}
return -1, errors.New("could not get offset")
}
Loading