Skip to content

Commit e84a53c

Browse files
author
Noah Adams
committed
Added pipelining_strategy:Serial|FanOut option for to support partition consumption
1 parent c7e2e03 commit e84a53c

File tree

12 files changed

+494
-75
lines changed

12 files changed

+494
-75
lines changed

go.work.sum

Lines changed: 146 additions & 2 deletions
Large diffs are not rendered by default.

kafkajobs/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,10 @@ func (c *config) InitDefault(l *zap.Logger) ([]kgo.Opt, error) {
309309
kgo.ConsumePartitions(partitions)
310310
}
311311
}
312+
313+
if c.ConsumerOpts.PipeliningStrategy == "" {
314+
c.ConsumerOpts.PipeliningStrategy = FanOutPipelining
315+
}
312316
}
313317

314318
if c.Ping == nil {

kafkajobs/item.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kafkajobs
22

33
import (
44
"maps"
5+
"sync"
56
"sync/atomic"
67

78
"github.com/goccy/go-json"
@@ -34,6 +35,7 @@ type Item struct {
3435
commitsCh chan *kgo.Record
3536
requeueCh chan *Item
3637
record *kgo.Record
38+
doneWg *sync.WaitGroup
3739
}
3840

3941
// Options carry information about how to handle a given job.
@@ -115,6 +117,11 @@ func (i *Item) Ack() error {
115117
if atomic.LoadUint64(i.stopped) == 1 {
116118
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
117119
}
120+
121+
if i.doneWg != nil {
122+
i.doneWg.Done()
123+
}
124+
118125
select {
119126
case i.commitsCh <- i.record:
120127
return nil
@@ -124,14 +131,18 @@ func (i *Item) Ack() error {
124131
}
125132

126133
func (i *Item) Nack() error {
127-
return nil
134+
return i.NackWithOptions(false, 0)
128135
}
129136

130137
func (i *Item) NackWithOptions(requeue bool, _ int) error {
131138
if atomic.LoadUint64(i.stopped) == 1 {
132139
return errors.Str("failed to NackWithOptions the JOB, the pipeline is probably stopped")
133140
}
134141

142+
if i.doneWg != nil {
143+
i.doneWg.Done()
144+
}
145+
135146
if requeue {
136147
err := i.Requeue(nil, 0)
137148
if err != nil {

kafkajobs/listener.go

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import (
44
"context"
55
"encoding/binary"
66
"errors"
7+
"go.opentelemetry.io/otel"
8+
"go.opentelemetry.io/otel/propagation"
9+
"sync"
710
"sync/atomic"
811

912
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
1013
"github.com/roadrunner-server/events"
1114
"github.com/twmb/franz-go/pkg/kerr"
1215
"github.com/twmb/franz-go/pkg/kgo"
13-
"go.opentelemetry.io/otel"
14-
"go.opentelemetry.io/otel/propagation"
1516
"go.uber.org/zap"
1617
)
1718

@@ -137,24 +138,50 @@ func (d *Driver) listen() error {
137138
}
138139
}
139140

140-
fetches.EachRecord(func(r *kgo.Record) {
141-
item := fromConsumer(r, d.requeueCh, d.recordsCh, &d.stopped)
142-
143-
ctxT, span := d.tracer.Tracer(tracerName).Start(otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "kafka_listener")
144-
d.prop.Inject(ctxT, propagation.HeaderCarrier(item.headers))
145-
146-
d.pq.Insert(item)
147-
148-
span.End()
149-
})
141+
d.log.Warn("handling fetch errors")
142+
switch d.cfg.ConsumerOpts.PipeliningStrategy {
143+
case SerialPipelining:
144+
fetchWg := &sync.WaitGroup{}
145+
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
146+
fetchWg.Add(1)
147+
itemWg := &sync.WaitGroup{}
148+
149+
go func() {
150+
defer fetchWg.Done()
151+
partition.EachRecord(func(r *kgo.Record) {
152+
itemWg.Add(1)
153+
item := fromConsumer(r, d.requeueCh, d.recordsCh, itemWg, &d.stopped)
154+
d.insertTracedItem(item)
155+
itemWg.Wait()
156+
})
157+
}()
158+
})
159+
fetchWg.Wait()
160+
case FanOutPipelining:
161+
fetches.EachRecord(func(r *kgo.Record) {
162+
item := fromConsumer(r, d.requeueCh, d.recordsCh, nil, &d.stopped)
163+
d.insertTracedItem(item)
164+
})
165+
default:
166+
return errors.New("unknown consumer pipeliningStrategy")
167+
}
150168

151169
if d.cfg.GroupOpts != nil {
152170
d.kafkaClient.AllowRebalance()
153171
}
154172
}
155173
}
156174

157-
func fromConsumer(msg *kgo.Record, reqCh chan *Item, commCh chan *kgo.Record, stopped *uint64) *Item {
175+
func (d *Driver) insertTracedItem(item *Item) {
176+
ctxT, span := d.tracer.Tracer(tracerName).Start(otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "kafka_listener")
177+
d.prop.Inject(ctxT, propagation.HeaderCarrier(item.headers))
178+
179+
d.pq.Insert(item)
180+
181+
span.End()
182+
}
183+
184+
func fromConsumer(msg *kgo.Record, reqCh chan *Item, commCh chan *kgo.Record, doneWg *sync.WaitGroup, stopped *uint64) *Item {
158185
/*
159186
RRJob string = "rr_job"
160187
RRHeaders string = "rr_headers"
@@ -203,6 +230,7 @@ func fromConsumer(msg *kgo.Record, reqCh chan *Item, commCh chan *kgo.Record, st
203230
stopped: stopped,
204231
requeueCh: reqCh,
205232
commitsCh: commCh,
233+
doneWg: doneWg,
206234
record: msg,
207235

208236
Options: &Options{

kafkajobs/opts.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,18 @@ type ConsumerOpts struct {
129129
ConsumeRegexp bool `mapstructure:"consume_regexp" json:"consume_regexp"`
130130
MaxFetchMessageSize int32 `mapstructure:"max_fetch_message_size" json:"max_fetch_message_size"`
131131
MinFetchMessageSize int32 `mapstructure:"min_fetch_message_size" json:"min_fetch_message_size"`
132+
PipeliningStrategy PipeliningStrategy `mapstructure:"pipelining_strategy" json:"pipelining_strategy"`
132133
ConsumePartitions map[string]map[int32]*Offset `mapstructure:"consume_partitions" json:"consume_partitions"`
133134
ConsumerOffset *Offset `mapstructure:"consumer_offset" json:"consumer_offset"`
134135
}
135136

137+
type PipeliningStrategy string
138+
139+
const (
140+
SerialPipelining PipeliningStrategy = "Serial"
141+
FanOutPipelining PipeliningStrategy = "FanOut"
142+
)
143+
136144
type ClientAuthType string
137145

138146
const (

schema.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,15 @@
132132
"consumer_offset": {
133133
"$ref": "#/definitions/Offset"
134134
},
135+
"pipelining_strategy": {
136+
"description": "The consumption strategy for messages within a single partition.",
137+
"type": "string",
138+
"enum": [
139+
"Serial",
140+
"FanOut"
141+
],
142+
"default": "FanOut"
143+
},
135144
"consume_partitions": {
136145
"type": "object",
137146
"minProperties": 1,
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
version: '3'
2+
3+
rpc:
4+
listen: tcp://127.0.0.1:6001
5+
6+
server:
7+
command: "php php_test_files/jobs/jobs_ok_log_item_rand_sleep.php"
8+
relay: "pipes"
9+
relay_timeout: "20s"
10+
11+
logs:
12+
level: debug
13+
encoding: console
14+
mode: development
15+
16+
kafka:
17+
brokers: ["127.0.0.1:9092"]
18+
19+
jobs:
20+
num_pollers: 10
21+
pipeline_size: 100000
22+
timeout: 100
23+
pool:
24+
num_workers: 10
25+
allocate_timeout: 60s
26+
destroy_timeout: 60s
27+
28+
pipelines:
29+
test-produce:
30+
driver: kafka
31+
config:
32+
priority: 1
33+
34+
auto_create_topics_enable: true
35+
36+
group_options:
37+
group_id: bar
38+
block_rebalance_on_poll: true
39+
40+
producer_options:
41+
max_message_bytes: 1000
42+
required_acks: LeaderAck
43+
compression_codec: snappy
44+
disable_idempotent: true
45+
46+
test-consume-1:
47+
driver: kafka
48+
config:
49+
priority: 1
50+
51+
group_options:
52+
group_id: bar
53+
block_rebalance_on_poll: true
54+
55+
consumer_options:
56+
topics: [ "serial-1", "serial-2" ]
57+
pipelining_strategy: Serial
58+
consumer_offset:
59+
type: AtStart
60+
test-consume-2:
61+
driver: kafka
62+
config:
63+
priority: 1
64+
65+
group_options:
66+
group_id: bar
67+
block_rebalance_on_poll: true
68+
69+
consumer_options:
70+
topics: [ "serial-3"]
71+
pipelining_strategy: Serial
72+
consumer_offset:
73+
type: AtStart
74+
consume: [ "test-consume-1", "test-consume-2" ]

0 commit comments

Comments
 (0)