Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 146 additions & 2 deletions go.work.sum

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions kafkajobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,17 @@
kgo.ConsumePartitions(partitions)
}
}

switch c.ConsumerOpts.PipeliningStrategy {
case SerialPipelining:
case FanOutPipelining:
case "":
c.ConsumerOpts.PipeliningStrategy = FanOutPipelining
default:
return nil, errors.Errorf("unknown pipelining strategy: %s", c.ConsumerOpts.PipeliningStrategy)
}

}

Check failure on line 322 in kafkajobs/config.go

View workflow job for this annotation

GitHub Actions / Golang-CI (lint)

unnecessary trailing newline (whitespace)

if c.Ping == nil {
c.Ping = &Ping{
Expand Down
16 changes: 15 additions & 1 deletion kafkajobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafkajobs

import (
"maps"
"sync"
"sync/atomic"

"github.com/goccy/go-json"
Expand Down Expand Up @@ -34,6 +35,7 @@ type Item struct {
commitsCh chan *kgo.Record
requeueCh chan *Item
record *kgo.Record
doneWg *sync.WaitGroup
}

// Options carry information about how to handle a given job.
Expand Down Expand Up @@ -115,6 +117,11 @@ func (i *Item) Ack() error {
if atomic.LoadUint64(i.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}

if i.doneWg != nil {
i.doneWg.Done()
}

select {
case i.commitsCh <- i.record:
return nil
Expand All @@ -124,14 +131,18 @@ func (i *Item) Ack() error {
}

func (i *Item) Nack() error {
return nil
return i.NackWithOptions(false, 0)
}

func (i *Item) NackWithOptions(requeue bool, _ int) error {
if atomic.LoadUint64(i.stopped) == 1 {
return errors.Str("failed to NackWithOptions the JOB, the pipeline is probably stopped")
}

if i.doneWg != nil {
i.doneWg.Done()
}

if requeue {
err := i.Requeue(nil, 0)
if err != nil {
Expand All @@ -157,6 +168,9 @@ func (i *Item) Copy() *Item {
Offset: i.Options.Offset,
}

// Requeued items must get a fresh gating WaitGroup assigned by the caller.
item.doneWg = nil

return item
}

Expand Down
53 changes: 40 additions & 13 deletions kafkajobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
"context"
"encoding/binary"
"errors"
"go.opentelemetry.io/otel"

Check failure on line 7 in kafkajobs/listener.go

View workflow job for this annotation

GitHub Actions / Golang-CI (lint)

File is not properly formatted (goimports)
"go.opentelemetry.io/otel/propagation"
"sync"
"sync/atomic"

"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/events"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -137,24 +138,49 @@
}
}

fetches.EachRecord(func(r *kgo.Record) {
item := fromConsumer(r, d.requeueCh, d.recordsCh, &d.stopped)

ctxT, span := d.tracer.Tracer(tracerName).Start(otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "kafka_listener")
d.prop.Inject(ctxT, propagation.HeaderCarrier(item.headers))

d.pq.Insert(item)

span.End()
})
switch d.cfg.ConsumerOpts.PipeliningStrategy {
case SerialPipelining:
fetchWg := &sync.WaitGroup{}
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
fetchWg.Add(1)
itemWg := &sync.WaitGroup{}

go func() {
defer fetchWg.Done()
partition.EachRecord(func(r *kgo.Record) {
itemWg.Add(1)
item := fromConsumer(r, d.requeueCh, d.recordsCh, itemWg, &d.stopped)
d.insertTracedItem(item)
itemWg.Wait()
})
}()
})
fetchWg.Wait()
case FanOutPipelining:
fetches.EachRecord(func(r *kgo.Record) {
item := fromConsumer(r, d.requeueCh, d.recordsCh, nil, &d.stopped)
d.insertTracedItem(item)
})
default:
return errors.New("unknown consumer pipeliningStrategy")
}

if d.cfg.GroupOpts != nil {
d.kafkaClient.AllowRebalance()
}
}
}

func fromConsumer(msg *kgo.Record, reqCh chan *Item, commCh chan *kgo.Record, stopped *uint64) *Item {
func (d *Driver) insertTracedItem(item *Item) {
ctxT, span := d.tracer.Tracer(tracerName).Start(otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "kafka_listener")
d.prop.Inject(ctxT, propagation.HeaderCarrier(item.headers))

d.pq.Insert(item)

span.End()
}

func fromConsumer(msg *kgo.Record, reqCh chan *Item, commCh chan *kgo.Record, doneWg *sync.WaitGroup, stopped *uint64) *Item {
/*
RRJob string = "rr_job"
RRHeaders string = "rr_headers"
Expand Down Expand Up @@ -203,6 +229,7 @@
stopped: stopped,
requeueCh: reqCh,
commitsCh: commCh,
doneWg: doneWg,
record: msg,

Options: &Options{
Expand Down
8 changes: 8 additions & 0 deletions kafkajobs/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,18 @@ type ConsumerOpts struct {
ConsumeRegexp bool `mapstructure:"consume_regexp" json:"consume_regexp"`
MaxFetchMessageSize int32 `mapstructure:"max_fetch_message_size" json:"max_fetch_message_size"`
MinFetchMessageSize int32 `mapstructure:"min_fetch_message_size" json:"min_fetch_message_size"`
PipeliningStrategy PipeliningStrategy `mapstructure:"pipelining_strategy" json:"pipelining_strategy"`
ConsumePartitions map[string]map[int32]*Offset `mapstructure:"consume_partitions" json:"consume_partitions"`
ConsumerOffset *Offset `mapstructure:"consumer_offset" json:"consumer_offset"`
}

type PipeliningStrategy string

const (
SerialPipelining PipeliningStrategy = "Serial"
FanOutPipelining PipeliningStrategy = "FanOut"
)

type ClientAuthType string

const (
Expand Down
9 changes: 9 additions & 0 deletions schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@
"consumer_offset": {
"$ref": "#/definitions/Offset"
},
"pipelining_strategy": {
"description": "The consumption strategy for messages within a single partition.",
"type": "string",
"enum": [
"Serial",
"FanOut"
],
"default": "FanOut"
},
"consume_partitions": {
"type": "object",
"minProperties": 1,
Expand Down
75 changes: 75 additions & 0 deletions tests/configs/.rr-kafka-serial-consumption.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
version: '3'

rpc:
listen: tcp://127.0.0.1:6001

server:
command: "php php_test_files/jobs/jobs_ok_log_item_rand_sleep.php"
relay: "pipes"
relay_timeout: "20s"

logs:
level: debug
encoding: console
mode: development

kafka:
brokers: ["127.0.0.1:9092"]

jobs:
num_pollers: 10
pipeline_size: 100000
timeout: 100
pool:
num_workers: 10
allocate_timeout: 60s
destroy_timeout: 60s

pipelines:
test-produce:
driver: kafka
config:
priority: 1

auto_create_topics_enable: true

group_options:
group_id: bar
block_rebalance_on_poll: true

producer_options:
max_message_bytes: 1000
required_acks: LeaderAck
compression_codec: snappy
disable_idempotent: true

test-consume-1:
driver: kafka
config:
priority: 1

group_options:
group_id: bar
block_rebalance_on_poll: true

consumer_options:
topics: [ "serial-1", "serial-2" ]
pipelining_strategy: Serial
consumer_offset:
type: AtStart
test-consume-2:
driver: kafka
config:
priority: 1

group_options:
group_id: bar
block_rebalance_on_poll: true

consumer_options:
topics: [ "serial-3"]
pipelining_strategy: Serial
consumer_offset:
type: AtStart
consume: [ "test-consume-1", "test-consume-2" ]

Loading
Loading