Skip to content

feat(kafka): expose more kafka_franz parameters#4028

Open
dyurchanka wants to merge 2 commits intoredpanda-data:mainfrom
dyurchanka:kafka/hardcoded-limits
Open

feat(kafka): expose more kafka_franz parameters#4028
dyurchanka wants to merge 2 commits intoredpanda-data:mainfrom
dyurchanka:kafka/hardcoded-limits

Conversation

@dyurchanka
Copy link

@dyurchanka dyurchanka commented Feb 26, 2026

Hi, I would like to introduce several changes to kafka_franz output (shared with redpanda output) to expose more go client parameters into redpanda-connect. My main use case is low throughput of kafka output (I tried all, sarama, franz, redpanda). I stopped on kafka_franz for now (I know it is deprecated, but actually redpanda uses franz under the hood) and main issue is default limitation of 10k buffer in kafka_franz which results in limited throughput over latency networks. As well as minor concurrency bug in redpanda output. Also default values for all new params are same as before (equal to kafka_franz defaults)

1: Add configurable acks field to redpanda/kafka_franz output

File: internal/impl/kafka/franz_writer.go

Problem: The redpanda and kafka_franz outputs hardcode acks=all (all ISR replicas) with no way to configure it. When idempotent_write: false, users may want acks=leader (acks=1) for higher throughput at the cost of durability, or acks=none (acks=0) for fire-and-forget scenarios. Previously, the franz-go default of acks=all was always used regardless of the idempotent_write setting.

Fix: Add a new acks field to FranzProducerFields() with three options:

Value Kafka equivalent Behavior
all acks=-1 Wait for all in-sync replicas (default, required for idempotent writes)
leader acks=1 Wait for leader only (available when idempotent_write: false)
none acks=0 Fire-and-forget (available when idempotent_write: false)

Validation: If idempotent_write: true and acks is set to anything other than all, the config fails with an error at both lint time and runtime. This matches the Kafka protocol requirement that idempotent producers must use acks=all.

Config example:

output:
  redpanda:
    seed_brokers: ["localhost:9092"]
    topic: my-topic
    idempotent_write: false
    acks: leader  # only valid when idempotent_write is false

Changes:

  1. Added kfwFieldAcks constant and StringAnnotatedEnumField with all, leader, none options (default: all)
  2. Added validation in FranzProducerOptsFromConfig: error if idempotent_write: true && acks != "all"
  3. Maps config values to franz-go options: kgo.AllISRAcks(), kgo.LeaderAck(), kgo.NoAck()
  4. Added lint rule: this.idempotent_write == true && this.acks.or("all") != "all" for early config validation

Backward compatible: Default is all, matching the previous implicit behavior.


2: Expose producer buffer and in-flight tuning fields

File: internal/impl/kafka/franz_writer.go

Problem: The franz-go producer has several critical tuning parameters hardcoded to defaults and not exposed in the redpanda/kafka_franz output config:

Parameter franz-go default Issue
MaxBufferedRecords 10,000 Produce() blocks when hit. For high-throughput pipelines, records arrive faster than they drain, causing the caller goroutine to stall.
MaxBufferedBytes 0 (unlimited) No byte-level memory guard. 10k large records could consume gigabytes.
MaxProduceRequestsInflightPerBroker 1 Only one produce request per broker TCP connection at a time. The pipeline idles during the full broker round-trip, wasting the ability to pipeline requests.
RecordRetries unlimited On persistent broker failure (i/o timeout), franz-go retries every record forever. WriteBatch hangs because callbacks never fire, the process becomes a zombie — alive but non-functional.
RecordDeliveryTimeout unlimited Same zombie problem as above but time-based. Equivalent to Kafka's delivery.timeout.ms.

Fix: Add five new fields to FranzProducerLimitsFields():

output:
  redpanda:
    # New fields (shown with defaults):
    max_buffered_records: 10000       # max records buffered before Produce() blocks
    max_buffered_bytes: "0"           # max bytes buffered (0 = unlimited, only record limit applies)
    max_in_flight_requests: 1         # max produce requests per broker TCP connection
    record_retries: 0                 # max retries per record (0 = unlimited)
    record_delivery_timeout: "0s"     # max time a record can sit in buffer (0s = unlimited)

Validation:

  • max_buffered_records must be >= 1
  • max_buffered_bytes accepts human-readable sizes ("256MB", "50mib"), 0 disables
  • max_in_flight_requests must be >= 1; with idempotent_write: true, franz-go internally caps at 5 (Kafka v1+) or 1 (Kafka < v1)
  • record_retries 0 = unlimited (default), > 0 = fail after N retries
  • record_delivery_timeout 0s = unlimited (default), > 0 = fail after duration
  • With idempotent_write: true, both retry/timeout options are only enforced when safe (no invalid sequence numbers)

Thanks

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@mmatczuk
Copy link
Contributor

Commits

  1. feat(kafka): expose more kafka_franz parameters — format violation: feat(kafka): uses a conventional commit prefix instead of a system name. Should be kafka: or kafka(franz): per commit policy (system name first, not feat/fix/etc. prefixes).
  2. Granularity: the commit mixes two independent changes — new config fields (acks, max_buffered_records, max_buffered_bytes, max_in_flight_requests, record_retries, record_delivery_timeout) and a sync.Mutexsync.RWMutex refactor with double-checked locking in output_redpanda.go. These should be separate commits.

Review
One bug found: missing overflow guard on max_buffered_bytes.

  1. Missing overflow check on max_buffered_bytes uint64→int casthumanize.ParseBytes returns uint64, cast to int without upper-bound validation. Existing fields max_message_bytes and broker_write_max_bytes both validate before casting. Add an equivalent check.

@dyurchanka dyurchanka force-pushed the kafka/hardcoded-limits branch from 3e8e30e to e9b5df1 Compare February 26, 2026 14:16
@dyurchanka
Copy link
Author

replaced commit with suggested semantic. Removed redpanda plugin changes out of this PR and edited PR description

@dyurchanka
Copy link
Author

ah, some other changed were pushed, give me a sec, will fix

@dyurchanka dyurchanka force-pushed the kafka/hardcoded-limits branch from e9b5df1 to c915715 Compare February 26, 2026 14:38
…max_in_flight_requests, record_retries, record_delivery_timeout
@dyurchanka dyurchanka force-pushed the kafka/hardcoded-limits branch from c915715 to f977a0b Compare February 26, 2026 14:46
@dyurchanka
Copy link
Author

Must be OK now for review) Sorry

@mmatczuk
Copy link
Contributor

Commits

  1. 2c8c7e18a163 (kafka: fix linting errors and regenerate docs) mixes unrelated work (linting fix in franz_writer.go + doc regeneration across 6 files) and is effectively a fixup of f977a0b0 since the linting errors were introduced by that commit. Should be squashed into commit 1, with the result being a single commit containing the implementation, tests, and regenerated docs.

Review
Clean implementation. Constants, enum field type, validation, and kgo option mapping are all correct. No bugs or security issues found.

  1. Missing test coverage for runtime validation: The new runtime validation in FranzProducerLimitsOptsFromConfig (max_buffered_records >= 1, max_in_flight_requests >= 1, max_buffered_bytes overflow) is not exercised by the existing lint-level tests.

Copy link
Contributor

@mmatczuk mmatczuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dyurchanka
Copy link
Author

LGTM

So I confused. It is approved, but do I need to push changes based your last comment?

@mmatczuk
Copy link
Contributor

@dyurchanka all good from my end, I'd like to get others to review that before merging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants