Skip to content

Commit f977a0b

Browse files
kafka(franz): expose acks, max_buffered_records, max_buffered_bytes, max_in_flight_requests, record_retries, record_delivery_timeout
1 parent 641fabf commit f977a0b

2 files changed

Lines changed: 202 additions & 13 deletions

File tree

internal/impl/kafka/franz_writer.go

Lines changed: 136 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,17 @@ const (
3737
// Producer fields
3838
kfwFieldPartitioner = "partitioner"
3939
kfwFieldIdempotentWrite = "idempotent_write"
40+
kfwFieldAcks = "acks"
4041
kfwFieldCompression = "compression"
4142
kfwFieldAllowAutoTopicCreation = "allow_auto_topic_creation"
42-
kfwFieldTimeout = "timeout"
43-
kfwFieldMaxMessageBytes = "max_message_bytes"
44-
kfwFieldBrokerWriteMaxBytes = "broker_write_max_bytes"
43+
kfwFieldTimeout = "timeout"
44+
kfwFieldMaxMessageBytes = "max_message_bytes"
45+
kfwFieldBrokerWriteMaxBytes = "broker_write_max_bytes"
46+
kfwFieldMaxBufferedRecords = "max_buffered_records"
47+
kfwFieldMaxBufferedBytes = "max_buffered_bytes"
48+
kfwFieldMaxInFlightRequestsPerBrkr = "max_in_flight_requests"
49+
kfwFieldRecordRetries = "record_retries"
50+
kfwFieldRecordDeliveryTimeout = "record_delivery_timeout"
4551
)
4652

4753
// FranzProducerLimitsFields returns a slice of fields specifically for
@@ -68,6 +74,42 @@ func FranzProducerLimitsFields() []*service.ConfigField {
6874
Default("100MiB").
6975
Example("128MB").
7076
Example("50mib"),
77+
service.NewIntField(kfwFieldMaxBufferedRecords).
78+
Description("The maximum number of records the client will buffer in memory before blocking. " +
79+
"When this limit is reached, `Produce()` calls will block until buffered records are delivered and space frees up. " +
80+
"Increase this value for high-throughput pipelines to avoid back-pressure stalls.").
81+
Advanced().
82+
Default(10000),
83+
service.NewStringField(kfwFieldMaxBufferedBytes).
84+
Description("The maximum number of bytes the client will buffer in memory before blocking. " +
85+
"When this limit is reached, `Produce()` calls will block until buffered records are delivered. " +
86+
"Set to `0` to disable the byte-level limit (only `max_buffered_records` applies). " +
87+
"This limit is checked after `max_buffered_records`.").
88+
Advanced().
89+
Default("0").
90+
Example("256MB").
91+
Example("50mib"),
92+
service.NewIntField(kfwFieldMaxInFlightRequestsPerBrkr).
93+
Description("The maximum number of produce requests in flight per broker connection. " +
94+
"When `idempotent_write` is enabled, this is capped at 5 by the Kafka protocol (and at 1 for Kafka < v1.0.0). " +
95+
"When `idempotent_write` is disabled, higher values improve throughput by pipelining requests but may cause out-of-order delivery.").
96+
Advanced().
97+
Default(1),
98+
service.NewIntField(kfwFieldRecordRetries).
99+
Description("The maximum number of times a record produce is retried on failure before the record is failed. " +
100+
"When a record fails, all records buffered in the same partition are also failed to preserve gapless ordering. " +
101+
"Set to `0` for unlimited retries (the default). " +
102+
"With `idempotent_write` enabled, retries are only enforced when safe to do so without creating invalid sequence numbers.").
103+
Advanced().
104+
Default(0),
105+
service.NewDurationField(kfwFieldRecordDeliveryTimeout).
106+
Description("The maximum time a record can sit in the producer buffer before it is failed, roughly equivalent to Kafka's `delivery.timeout.ms`. " +
107+
"This is evaluated before writing a request or after a produce response. " +
108+
"When a record times out, all records in the same partition are also failed. " +
109+
"Set to `0s` for no timeout (the default). " +
110+
"With `idempotent_write` enabled, timeouts are only enforced when safe to do so without creating invalid sequence numbers.").
111+
Advanced().
112+
Default("0s"),
71113
}
72114
}
73115

@@ -84,16 +126,25 @@ func FranzProducerFields() []*service.ConfigField {
84126
}).
85127
Description("Override the default murmur2 hashing partitioner.").
86128
Advanced().Optional(),
87-
service.NewBoolField(kfwFieldIdempotentWrite).
88-
Description("Enable the idempotent write producer option. " +
89-
"When enabled, the producer initializes a producer ID and uses it to guarantee exactly-once semantics per partition (no duplicates on retries). " +
90-
"This requires the `IDEMPOTENT_WRITE` permission on the `CLUSTER` resource. " +
91-
"If your cluster does not grant this permission or uses ACLs restrictively, disable this option. " +
92-
"Note: Idempotent writes are strictly a win for data integrity but may be unavailable in restricted environments " +
93-
"(e.g., some managed Kafka services, Redpanda with strict ACLs). " +
94-
"Disabling this option is safe and only affects retry behavior—duplicates may occur on producer retries, but the pipeline will continue to function normally.").
95-
Default(true).
96-
Advanced(),
129+
service.NewBoolField(kfwFieldIdempotentWrite).
130+
Description("Enable the idempotent write producer option. " +
131+
"When enabled, the producer initializes a producer ID and uses it to guarantee exactly-once semantics per partition (no duplicates on retries). " +
132+
"This requires the `IDEMPOTENT_WRITE` permission on the `CLUSTER` resource. " +
133+
"If your cluster does not grant this permission or uses ACLs restrictively, disable this option. " +
134+
"Note: Idempotent writes are strictly a win for data integrity but may be unavailable in restricted environments " +
135+
"(e.g., some managed Kafka services, Redpanda with strict ACLs). " +
136+
"Disabling this option is safe and only affects retry behavior—duplicates may occur on producer retries, but the pipeline will continue to function normally.").
137+
Default(true).
138+
Advanced(),
139+
service.NewStringAnnotatedEnumField(kfwFieldAcks, map[string]string{
140+
"all": "Wait for all in-sync replicas to acknowledge (acks=-1). Required when idempotent_write is enabled.",
141+
"none": "Do not wait for any acknowledgement (acks=0). Highest throughput but messages may be lost.",
142+
"leader": "Wait for the leader broker to acknowledge (acks=1). Messages are lost if the leader fails before replication.",
143+
}).
144+
Description("The number of acknowledgements the leader broker must receive from ISR brokers before responding to the produce request. " +
145+
"When `idempotent_write` is enabled this must be set to `all`.").
146+
Default("all").
147+
Advanced(),
97148
service.NewStringEnumField(kfwFieldCompression, "lz4", "snappy", "gzip", "none", "zstd").
98149
Description("Optionally set an explicit compression type. The default preference is to use snappy when the broker supports it, and fall back to none if not.").
99150
Optional().
@@ -144,6 +195,56 @@ func FranzProducerLimitsOptsFromConfig(conf *service.ParsedConfig) ([]kgo.Opt, e
144195
}
145196
opts = append(opts, kgo.ProduceRequestTimeout(timeout))
146197

198+
maxBufferedRecords, err := conf.FieldInt(kfwFieldMaxBufferedRecords)
199+
if err != nil {
200+
return nil, err
201+
}
202+
if maxBufferedRecords < 1 {
203+
return nil, fmt.Errorf("invalid max_buffered_records %d, must be at least 1", maxBufferedRecords)
204+
}
205+
opts = append(opts, kgo.MaxBufferedRecords(maxBufferedRecords))
206+
207+
maxBufferedBytesStr, err := conf.FieldString(kfwFieldMaxBufferedBytes)
208+
if err != nil {
209+
return nil, err
210+
}
211+
var maxBufferedBytes uint64
212+
maxBufferedBytes, err = humanize.ParseBytes(maxBufferedBytesStr)
213+
if err != nil {
214+
return nil, fmt.Errorf("failed to parse max_buffered_bytes: %w", err)
215+
}
216+
if maxBufferedBytes > uint64(math.MaxInt) {
217+
return nil, fmt.Errorf("invalid max_buffered_bytes, must not exceed %v", math.MaxInt)
218+
}
219+
if maxBufferedBytes > 0 {
220+
opts = append(opts, kgo.MaxBufferedBytes(int(maxBufferedBytes)))
221+
}
222+
223+
maxInFlightRequests, err := conf.FieldInt(kfwFieldMaxInFlightRequestsPerBrkr)
224+
if err != nil {
225+
return nil, err
226+
}
227+
if maxInFlightRequests < 1 {
228+
return nil, fmt.Errorf("invalid max_in_flight_requests %d, must be at least 1", maxInFlightRequests)
229+
}
230+
opts = append(opts, kgo.MaxProduceRequestsInflightPerBroker(maxInFlightRequests))
231+
232+
recordRetries, err := conf.FieldInt(kfwFieldRecordRetries)
233+
if err != nil {
234+
return nil, err
235+
}
236+
if recordRetries > 0 {
237+
opts = append(opts, kgo.RecordRetries(recordRetries))
238+
}
239+
240+
recordDeliveryTimeout, err := conf.FieldDuration(kfwFieldRecordDeliveryTimeout)
241+
if err != nil {
242+
return nil, err
243+
}
244+
if recordDeliveryTimeout > 0 {
245+
opts = append(opts, kgo.RecordDeliveryTimeout(recordDeliveryTimeout))
246+
}
247+
147248
return opts, nil
148249
}
149250

@@ -211,10 +312,31 @@ func FranzProducerOptsFromConfig(conf *service.ParsedConfig) ([]kgo.Opt, error)
211312
if err != nil {
212313
return nil, err
213314
}
315+
316+
acksStr, err := conf.FieldString(kfwFieldAcks)
317+
if err != nil {
318+
return nil, err
319+
}
320+
321+
if idempotentWrite && acksStr != "all" {
322+
return nil, fmt.Errorf("idempotent_write requires acks to be \"all\", got %q", acksStr)
323+
}
324+
214325
if !idempotentWrite {
215326
opts = append(opts, kgo.DisableIdempotentWrite())
216327
}
217328

329+
switch acksStr {
330+
case "all":
331+
opts = append(opts, kgo.RequiredAcks(kgo.AllISRAcks()))
332+
case "leader":
333+
opts = append(opts, kgo.RequiredAcks(kgo.LeaderAck()))
334+
case "none":
335+
opts = append(opts, kgo.RequiredAcks(kgo.NoAck()))
336+
default:
337+
return nil, fmt.Errorf("unknown acks value: %q", acksStr)
338+
}
339+
218340
allowAutoTopicCreation, err := conf.FieldBool(kfwFieldAllowAutoTopicCreation)
219341
if err != nil {
220342
return nil, err
@@ -275,6 +397,7 @@ func FranzWriterConfigLints() string {
275397
this.partitioner == "manual" && this.partition.or("") == "" => "a partition must be specified when the partitioner is set to manual"
276398
this.partitioner != "manual" && this.partition.or("") != "" => "a partition cannot be specified unless the partitioner is set to manual"
277399
this.timestamp.or("") != "" && this.timestamp_ms.or("") != "" => "both timestamp and timestamp_ms cannot be specified simultaneously"
400+
this.idempotent_write == true && this.acks.or("all") != "all" => "idempotent_write requires acks to be set to all"
278401
}`
279402
}
280403

internal/impl/kafka/output_kafka_franz_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,72 @@ kafka_franz:
6767
`,
6868
errContains: "a partition cannot be specified unless the partitioner is set to manual",
6969
},
70+
{
71+
name: "idempotent write with acks all",
72+
conf: `
73+
kafka_franz:
74+
seed_brokers: [ foo:1234 ]
75+
topic: foo
76+
idempotent_write: true
77+
acks: all
78+
`,
79+
},
80+
{
81+
name: "idempotent write with acks leader",
82+
conf: `
83+
kafka_franz:
84+
seed_brokers: [ foo:1234 ]
85+
topic: foo
86+
idempotent_write: true
87+
acks: leader
88+
`,
89+
errContains: "idempotent_write requires acks to be set to all",
90+
},
91+
{
92+
name: "idempotent write with acks none",
93+
conf: `
94+
kafka_franz:
95+
seed_brokers: [ foo:1234 ]
96+
topic: foo
97+
idempotent_write: true
98+
acks: none
99+
`,
100+
errContains: "idempotent_write requires acks to be set to all",
101+
},
102+
{
103+
name: "non-idempotent with acks leader",
104+
conf: `
105+
kafka_franz:
106+
seed_brokers: [ foo:1234 ]
107+
topic: foo
108+
idempotent_write: false
109+
acks: leader
110+
`,
111+
},
112+
{
113+
name: "non-idempotent with acks none",
114+
conf: `
115+
kafka_franz:
116+
seed_brokers: [ foo:1234 ]
117+
topic: foo
118+
idempotent_write: false
119+
acks: none
120+
`,
121+
},
122+
{
123+
name: "custom producer limits",
124+
conf: `
125+
kafka_franz:
126+
seed_brokers: [ foo:1234 ]
127+
topic: foo
128+
idempotent_write: false
129+
max_buffered_records: 50000
130+
max_buffered_bytes: "128MB"
131+
max_in_flight_requests: 5
132+
record_retries: 10
133+
record_delivery_timeout: "30s"
134+
`,
135+
},
70136
}
71137

72138
for _, test := range testCases {

0 commit comments

Comments
 (0)