-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: exponential backoff for clients (KIP-580) #3099
base: main
Are you sure you want to change the base?
Conversation
ac48413
to
2210250
Compare
// | ||
// backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second) | ||
// config.Producer.Retry.BackoffFunc = backoffFunc | ||
func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose, I would like to see a version that can take sarama.NewExponentialBackoff(config)
so that the appropriate fields from the config will be extracted and used without me needing to dig them out.
We probably also need a config.Producer.Retry.BackoffMax
.
PS: I guess we cannot actually provide a simple config
option, because each of the backoffs has its own sub-struct. 🤦♀️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense! I also considered making sarama.NewExponentialBackoff(config)
, but as you pointed out in the PS section, each backoff belongs to its own sub-struct, which makes a simple config-based approach impractical.
This is exactly why I made NewExponentialBackoff
a standalone function—neither a method of Config
nor a function that takes Config
as an input parameter. This approach avoids ambiguity and ensures flexibility across different contexts (producer, metadata, transaction manager) without forcing unnecessary dependencies.
Regarding config.Producer.Retry.BackoffMax
, I chose not to introduce it because it would require adding a similar field to every retry-related sub-struct (Metadata
, TransactionManager
), making the API more tedious to maintain. Keeping the backoff logic encapsulated in a separate function simplifies the design while still allowing explicit configuration per use case.
Let me know your thoughts!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t know that there is a significantly stronger maintenance in adding the BackoffMax
. Is it a lot of “toil”? Yes, but as you note, there is already a retry struct in a lot of the other config structs. So, the choice has kind of already been made.
utils.go
Outdated
|
||
// NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter. | ||
// It follows KIP-580, implementing the formula: | ||
// MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(retries - 1)) * random(0.8, 1.2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s extremely important that when copy-pasting from a standard, you copy-paste it exactly. My point: the correct formula is:
MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(failures - 1)) * random(0.8, 1.2))
That word failures
rather than retries
is important here, because—bad news—the func(retries, maxRetries int) time.Duration
function that we return? It’s being called with a decrementing retries
, and not an incrementing failures
. 😭
I’m not sure exactly how we can work around this generally… since the number of starting retries is potentially quite flexible (in particular: async_producer sets it to an initial pp.highwatermark -1
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out! Here’s what I found:
Formula Correction
Updated the copied formula to exactly match KIP-580. Thanks for catching that!
Verification of retries Behavior in Sarama
Although pp.flushRetryBuffers
flushes messages with decrementing retries, I ran tests and confirmed with additional logs that retries
- starts with 1 when
pp.backoff(msg.retries)
is called for the first time, and - is incrementing,
at least in the producer case. Given this, the concern about decrementing retries shouldn’t apply in this context.
Line 1374 in 9ae475a
msg.retries++ |
Flexible retries in Special Cases
The scenario where retries is more dynamic (e.g., pp.highwatermark - 1) appears to only occur for flagged (fin) messages being sent back to the input channel.
Line 732 in 9ae475a
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1} |
Since these messages serve a special purpose, precise backoff duration is less critical for them. That said, if keeping a consistent backoff behavior for all cases is desirable, I’m open to discussing potential adjustments.
Given these findings, I believe our current implementation correctly follows KIP-580 in most cases. Let me know if you think any further checks are needed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♀️ I appear to have also missed where the function is receiving the maxRetries
anyways. So, even if the retries were decrementing, we would still be able to determine the correct incrementing failures.
2cf05e9
to
d8df714
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good improvements. I have some additional thoughts.
utils.go
Outdated
backoff = max(backoff, 0) | ||
maxBackoff = max(maxBackoff, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted from how how KIP-580 says defaults should be set, this should probably be:
if backoff <= 0 {
backoff = defaultBackoff // 100 ms
}
if maxBackoff <= 0 {
maxBackoff = defaultMaxBackoff // 1000 ms
}
As a single liner this looks like:
backoff = cmp.Or(max(backoff, 0), defaultBackoff)
Where the max(backoff, 0)
clamps negatives to the zero value, which cmp.Or
then skips over. Unfortunately, cmp.Or
was introduced in go1.22.0
, so not something we can use without bumping the go.mod go version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. I choose the if statements.
50d5176
to
756051e
Compare
// | ||
// backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second) | ||
// config.Producer.Retry.BackoffFunc = backoffFunc | ||
func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t know that there is a significantly stronger maintenance in adding the BackoffMax
. Is it a lot of “toil”? Yes, but as you note, there is already a retry struct in a lot of the other config structs. So, the choice has kind of already been made.
} | ||
} | ||
|
||
func TestExponentialBackoffDefaults(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
51b3117
to
6ddfd8b
Compare
Signed-off-by: Wenli Wan <[email protected]>
6ddfd8b
to
ff2dc5f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I can no longer think of anything worth commenting about.
Thank you, @puellanivis . Can @dnwe review this PR? |
Thanks to @puellanivis for already doing a thorough review against the KIP-580 spec here, that was really helpful! I've looked over the changes myself and they seem good. Really my only qualm is that we don't immediately have a clean way to match the Java client and make exponential backoff the default behaviour in config.go I've also asked @prestona to review the changes prior to merging |
Configurable Exponential Backoff for Clients
This PR introduces a configurable exponential backoff function for the producer, admin client, and transaction manager, aligning with KIP-580 specifications.
Key Changes
NewExponentialBackoff(initialBackoff, maxBackoff)
as a static function, rather than a method ofConfig
.Config
contains multiple backoff settings (e.g., for the admin client, transaction manager, and producer), attaching the function toConfig
would introduce ambiguity.AsyncProducer
.Inspiration & References
Testing
✅ Verified in
TestAsyncProducerWithExponentialBackoffDurations
, ensuring:maxBackoff
.Would love feedback! 🚀