Skip to content

Commit 19862e9

Browse files
Noah Adamsadamsnoah98
authored andcommitted
Added pipelining_strategy:Serial|FanOut option for to support partition consumption
1 parent c7e2e03 commit 19862e9

File tree

12 files changed

+504
-75
lines changed

12 files changed

+504
-75
lines changed

go.work.sum

Lines changed: 146 additions & 2 deletions
Large diffs are not rendered by default.

kafkajobs/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,16 @@ func (c *config) InitDefault(l *zap.Logger) ([]kgo.Opt, error) {
309309
kgo.ConsumePartitions(partitions)
310310
}
311311
}
312+
313+
switch c.ConsumerOpts.PipeliningStrategy {
314+
case SerialPipelining:
315+
case FanOutPipelining:
316+
case "":
317+
c.ConsumerOpts.PipeliningStrategy = FanOutPipelining
318+
default:
319+
return nil, errors.Errorf("unknown pipelining strategy: %s", c.ConsumerOpts.PipeliningStrategy)
320+
}
321+
312322
}
313323

314324
if c.Ping == nil {

kafkajobs/item.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kafkajobs
22

33
import (
44
"maps"
5+
"sync"
56
"sync/atomic"
67

78
"github.com/goccy/go-json"
@@ -34,6 +35,7 @@ type Item struct {
3435
commitsCh chan *kgo.Record
3536
requeueCh chan *Item
3637
record *kgo.Record
38+
doneWg *sync.WaitGroup
3739
}
3840

3941
// Options carry information about how to handle a given job.
@@ -115,6 +117,11 @@ func (i *Item) Ack() error {
115117
if atomic.LoadUint64(i.stopped) == 1 {
116118
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
117119
}
120+
121+
if i.doneWg != nil {
122+
i.doneWg.Done()
123+
}
124+
118125
select {
119126
case i.commitsCh <- i.record:
120127
return nil
@@ -124,14 +131,18 @@ func (i *Item) Ack() error {
124131
}
125132

126133
func (i *Item) Nack() error {
127-
return nil
134+
return i.NackWithOptions(false, 0)
128135
}
129136

130137
func (i *Item) NackWithOptions(requeue bool, _ int) error {
131138
if atomic.LoadUint64(i.stopped) == 1 {
132139
return errors.Str("failed to NackWithOptions the JOB, the pipeline is probably stopped")
133140
}
134141

142+
if i.doneWg != nil {
143+
i.doneWg.Done()
144+
}
145+
135146
if requeue {
136147
err := i.Requeue(nil, 0)
137148
if err != nil {
@@ -157,6 +168,9 @@ func (i *Item) Copy() *Item {
157168
Offset: i.Options.Offset,
158169
}
159170

171+
// Requeued items must get a fresh gating WaitGroup assigned by the caller.
172+
item.doneWg = nil
173+
160174
return item
161175
}
162176

kafkajobs/listener.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import (
44
"context"
55
"encoding/binary"
66
"errors"
7+
"go.opentelemetry.io/otel"
8+
"go.opentelemetry.io/otel/propagation"
9+
"sync"
710
"sync/atomic"
811

912
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
1013
"github.com/roadrunner-server/events"
1114
"github.com/twmb/franz-go/pkg/kerr"
1215
"github.com/twmb/franz-go/pkg/kgo"
13-
"go.opentelemetry.io/otel"
14-
"go.opentelemetry.io/otel/propagation"
1516
"go.uber.org/zap"
1617
)
1718

@@ -137,24 +138,49 @@ func (d *Driver) listen() error {
137138
}
138139
}
139140

140-
fetches.EachRecord(func(r *kgo.Record) {
141-
item := fromConsumer(r, d.requeueCh, d.recordsCh, &d.stopped)
142-
143-
ctxT, span := d.tracer.Tracer(tracerName).Start(otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "kafka_listener")
144-
d.prop.Inject(ctxT, propagation.HeaderCarrier(item.headers))
145-
146-
d.pq.Insert(item)
147-
148-
span.End()
149-
})
141+
switch d.cfg.ConsumerOpts.PipeliningStrategy {
142+
case SerialPipelining:
143+
fetchWg := &sync.WaitGroup{}
144+
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
145+
fetchWg.Add(1)
146+
itemWg := &sync.WaitGroup{}
147+
148+
go func() {
149+
defer fetchWg.Done()
150+
partition.EachRecord(func(r *kgo.Record) {
151+
itemWg.Add(1)
152+
item := fromConsumer(r, d.requeueCh, d.recordsCh, itemWg, &d.stopped)
153+
d.insertTracedItem(item)
154+
itemWg.Wait()
155+
})
156+
}()
157+
})
158+
fetchWg.Wait()
159+
case FanOutPipelining:
160+
fetches.EachRecord(func(r *kgo.Record) {
161+
item := fromConsumer(r, d.requeueCh, d.recordsCh, nil, &d.stopped)
162+
d.insertTracedItem(item)
163+
})
164+
default:
165+
return errors.New("unknown consumer pipeliningStrategy")
166+
}
150167

151168
if d.cfg.GroupOpts != nil {
152169
d.kafkaClient.AllowRebalance()
153170
}
154171
}
155172
}
156173

157-
func fromConsumer(msg *kgo.Record, reqCh chan *Item, commCh chan *kgo.Record, stopped *uint64) *Item {
174+
func (d *Driver) insertTracedItem(item *Item) {
175+
ctxT, span := d.tracer.Tracer(tracerName).Start(otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "kafka_listener")
176+
d.prop.Inject(ctxT, propagation.HeaderCarrier(item.headers))
177+
178+
d.pq.Insert(item)
179+
180+
span.End()
181+
}
182+
183+
func fromConsumer(msg *kgo.Record, reqCh chan *Item, commCh chan *kgo.Record, doneWg *sync.WaitGroup, stopped *uint64) *Item {
158184
/*
159185
RRJob string = "rr_job"
160186
RRHeaders string = "rr_headers"
@@ -203,6 +229,7 @@ func fromConsumer(msg *kgo.Record, reqCh chan *Item, commCh chan *kgo.Record, st
203229
stopped: stopped,
204230
requeueCh: reqCh,
205231
commitsCh: commCh,
232+
doneWg: doneWg,
206233
record: msg,
207234

208235
Options: &Options{

kafkajobs/opts.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,18 @@ type ConsumerOpts struct {
129129
ConsumeRegexp bool `mapstructure:"consume_regexp" json:"consume_regexp"`
130130
MaxFetchMessageSize int32 `mapstructure:"max_fetch_message_size" json:"max_fetch_message_size"`
131131
MinFetchMessageSize int32 `mapstructure:"min_fetch_message_size" json:"min_fetch_message_size"`
132+
PipeliningStrategy PipeliningStrategy `mapstructure:"pipelining_strategy" json:"pipelining_strategy"`
132133
ConsumePartitions map[string]map[int32]*Offset `mapstructure:"consume_partitions" json:"consume_partitions"`
133134
ConsumerOffset *Offset `mapstructure:"consumer_offset" json:"consumer_offset"`
134135
}
135136

137+
type PipeliningStrategy string
138+
139+
const (
140+
SerialPipelining PipeliningStrategy = "Serial"
141+
FanOutPipelining PipeliningStrategy = "FanOut"
142+
)
143+
136144
type ClientAuthType string
137145

138146
const (

schema.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,15 @@
132132
"consumer_offset": {
133133
"$ref": "#/definitions/Offset"
134134
},
135+
"pipelining_strategy": {
136+
"description": "The consumption strategy for messages within a single partition.",
137+
"type": "string",
138+
"enum": [
139+
"Serial",
140+
"FanOut"
141+
],
142+
"default": "FanOut"
143+
},
135144
"consume_partitions": {
136145
"type": "object",
137146
"minProperties": 1,
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
version: '3'
2+
3+
rpc:
4+
listen: tcp://127.0.0.1:6001
5+
6+
server:
7+
command: "php php_test_files/jobs/jobs_ok_log_item_rand_sleep.php"
8+
relay: "pipes"
9+
relay_timeout: "20s"
10+
11+
logs:
12+
level: debug
13+
encoding: console
14+
mode: development
15+
16+
kafka:
17+
brokers: ["127.0.0.1:9092"]
18+
19+
jobs:
20+
num_pollers: 10
21+
pipeline_size: 100000
22+
timeout: 100
23+
pool:
24+
num_workers: 10
25+
allocate_timeout: 60s
26+
destroy_timeout: 60s
27+
28+
pipelines:
29+
test-produce:
30+
driver: kafka
31+
config:
32+
priority: 1
33+
34+
auto_create_topics_enable: true
35+
36+
group_options:
37+
group_id: bar
38+
block_rebalance_on_poll: true
39+
40+
producer_options:
41+
max_message_bytes: 1000
42+
required_acks: LeaderAck
43+
compression_codec: snappy
44+
disable_idempotent: true
45+
46+
test-consume-1:
47+
driver: kafka
48+
config:
49+
priority: 1
50+
51+
group_options:
52+
group_id: bar
53+
block_rebalance_on_poll: true
54+
55+
consumer_options:
56+
topics: [ "serial-1", "serial-2" ]
57+
pipelining_strategy: Serial
58+
consumer_offset:
59+
type: AtStart
60+
test-consume-2:
61+
driver: kafka
62+
config:
63+
priority: 1
64+
65+
group_options:
66+
group_id: bar
67+
block_rebalance_on_poll: true
68+
69+
consumer_options:
70+
topics: [ "serial-3"]
71+
pipelining_strategy: Serial
72+
consumer_offset:
73+
type: AtStart
74+
consume: [ "test-consume-1", "test-consume-2" ]
75+

0 commit comments

Comments
 (0)