Skip to content

Commit 1ff5d46

Browse files
committed
Add requeue + edgecase worker response tests for waitgroup stability in serial pipelining
Reuse waitgroups within listener to reduce allocations
1 parent 3c92a92 commit 1ff5d46

File tree

7 files changed

+325
-356
lines changed

7 files changed

+325
-356
lines changed

kafkajobs/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,6 @@ func (c *config) InitDefault(l *zap.Logger) ([]kgo.Opt, error) {
318318
default:
319319
return nil, errors.Errorf("unknown pipelining strategy: %s", c.ConsumerOpts.PipeliningStrategy)
320320
}
321-
322321
}
323322

324323
if c.Ping == nil {

kafkajobs/item.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,7 @@ func (i *Item) Ack() error {
118118
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
119119
}
120120

121-
if i.doneWg != nil {
122-
i.doneWg.Done()
123-
}
121+
i.done()
124122

125123
select {
126124
case i.commitsCh <- i.record:
@@ -139,9 +137,7 @@ func (i *Item) NackWithOptions(requeue bool, _ int) error {
139137
return errors.Str("failed to NackWithOptions the JOB, the pipeline is probably stopped")
140138
}
141139

142-
if i.doneWg != nil {
143-
i.doneWg.Done()
144-
}
140+
i.done()
145141

146142
if requeue {
147143
err := i.Requeue(nil, 0)
@@ -204,6 +200,13 @@ func (i *Item) Respond(_ []byte, _ string) error {
204200
return nil
205201
}
206202

203+
func (i *Item) done() {
204+
if i.doneWg != nil {
205+
i.doneWg.Done()
206+
i.doneWg = nil
207+
}
208+
}
209+
207210
func fromJob(job jobs.Message) *Item {
208211
return &Item{
209212
Job: job.Name(),

kafkajobs/listener.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ import (
44
"context"
55
"encoding/binary"
66
"errors"
7-
"go.opentelemetry.io/otel"
8-
"go.opentelemetry.io/otel/propagation"
97
"sync"
108
"sync/atomic"
119

1210
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
1311
"github.com/roadrunner-server/events"
1412
"github.com/twmb/franz-go/pkg/kerr"
1513
"github.com/twmb/franz-go/pkg/kgo"
14+
"go.opentelemetry.io/otel"
15+
"go.opentelemetry.io/otel/propagation"
1616
"go.uber.org/zap"
1717
)
1818

@@ -27,6 +27,12 @@ func (d *Driver) listen() error {
2727
ctx, d.kafkaCancelCtx = context.WithCancel(context.Background())
2828
d.mu.Unlock()
2929

30+
// only for serial pipelining
31+
pipelineWg := &sync.WaitGroup{}
32+
partitionWgPool := &sync.Pool{New: func() interface{} {
33+
return &sync.WaitGroup{}
34+
}}
35+
3036
defer func() {
3137
d.log.Debug("kafka listener stopped")
3238
}()
@@ -140,20 +146,20 @@ func (d *Driver) listen() error {
140146

141147
switch d.cfg.ConsumerOpts.PipeliningStrategy {
142148
case SerialPipelining:
143-
fetchWg := &sync.WaitGroup{}
144149
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
145-
itemWg := &sync.WaitGroup{}
150+
itemWg := partitionWgPool.Get().(*sync.WaitGroup)
146151

147-
fetchWg.Go(func() {
152+
pipelineWg.Go(func() {
148153
partition.EachRecord(func(r *kgo.Record) {
149154
itemWg.Add(1)
150155
item := fromConsumer(r, d.requeueCh, d.recordsCh, itemWg, &d.stopped)
151156
d.insertTracedItem(item)
152157
itemWg.Wait()
158+
partitionWgPool.Put(itemWg)
153159
})
154160
})
155161
})
156-
fetchWg.Wait()
162+
pipelineWg.Wait()
157163
case FanOutPipelining:
158164
fetches.EachRecord(func(r *kgo.Record) {
159165
item := fromConsumer(r, d.requeueCh, d.recordsCh, nil, &d.stopped)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
version: '3'
2+
3+
rpc:
4+
listen: tcp://127.0.0.1:6001
5+
6+
server:
7+
command: "php php_test_files/jobs/jobs_repeat_acknowledgements.php"
8+
relay: "pipes"
9+
relay_timeout: "20s"
10+
11+
logs:
12+
level: debug
13+
encoding: console
14+
mode: development
15+
16+
kafka:
17+
brokers: ["127.0.0.1:9092"]
18+
19+
jobs:
20+
num_pollers: 2
21+
pipeline_size: 100000
22+
timeout: 100
23+
pool:
24+
num_workers: 2
25+
allocate_timeout: 60s
26+
destroy_timeout: 60s
27+
28+
pipelines:
29+
test:
30+
driver: kafka
31+
config:
32+
priority: 1
33+
auto_create_topics_enable: true
34+
35+
group_options:
36+
group_id: bar
37+
block_rebalance_on_poll: true
38+
39+
producer_options:
40+
max_message_bytes: 1000
41+
required_acks: LeaderAck
42+
compression_codec: snappy
43+
disable_idempotent: true
44+
45+
consumer_options:
46+
topics: [ "serial-repeats-1" ]
47+
pipelining_strategy: Serial
48+
consumer_offset:
49+
type: AtStart
50+
consume: [ "test" ]
51+
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
version: '3'
2+
3+
rpc:
4+
listen: tcp://127.0.0.1:6001
5+
6+
server:
7+
command: "php php_test_files/jobs/jobs_err.php"
8+
relay: "pipes"
9+
relay_timeout: "20s"
10+
11+
logs:
12+
level: debug
13+
encoding: console
14+
mode: development
15+
16+
kafka:
17+
brokers: ["127.0.0.1:9092"]
18+
19+
jobs:
20+
num_pollers: 2
21+
pipeline_size: 100000
22+
timeout: 100
23+
pool:
24+
num_workers: 2
25+
allocate_timeout: 60s
26+
destroy_timeout: 60s
27+
28+
pipelines:
29+
test:
30+
driver: kafka
31+
config:
32+
priority: 1
33+
auto_create_topics_enable: true
34+
35+
group_options:
36+
group_id: bar
37+
block_rebalance_on_poll: true
38+
39+
producer_options:
40+
max_message_bytes: 1000
41+
required_acks: LeaderAck
42+
compression_codec: snappy
43+
disable_idempotent: true
44+
45+
consumer_options:
46+
topics: [ "serial-requeue-1" ]
47+
pipelining_strategy: Serial
48+
consumer_offset:
49+
type: AtStart
50+
51+
consume: [ "test" ]
52+

0 commit comments

Comments
 (0)