Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exponential backoff for clients (KIP-580) #3099

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package sarama

import (
"errors"
"github.com/stretchr/testify/assert"
"log"
"math"
"os"
Expand All @@ -18,6 +17,7 @@ import (

"github.com/fortytw2/leaktest"
"github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -638,6 +638,68 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
}
}

func TestAsyncProducerWithExponentialBackoffDurations(t *testing.T) {
var backoffDurations []time.Duration
var mu sync.Mutex

topic := "my_topic"
maxBackoff := 2 * time.Second
config := NewTestConfig()

innerBackoffFunc := NewExponentialBackoff(defaultRetryBackoff, maxBackoff)
backoffFunc := func(retries, maxRetries int) time.Duration {
duration := innerBackoffFunc(retries, maxRetries)
mu.Lock()
backoffDurations = append(backoffDurations, duration)
mu.Unlock()
return duration
}

config.Producer.Flush.Messages = 5
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 3
config.Producer.Retry.BackoffFunc = backoffFunc

broker := NewMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition(topic, 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataResponse)

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

failResponse := new(ProduceResponse)
failResponse.AddTopicPartition(topic, 0, ErrNotLeaderForPartition)
successResponse := new(ProduceResponse)
successResponse.AddTopicPartition(topic, 0, ErrNoError)

broker.Returns(failResponse)
broker.Returns(metadataResponse)
broker.Returns(failResponse)
broker.Returns(metadataResponse)
broker.Returns(successResponse)

for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: topic, Value: StringEncoder("test")}
}

expectResults(t, producer, 5, 0)
closeProducer(t, producer)
broker.Close()

assert.Greater(t, backoffDurations[0], time.Duration(0),
"Expected first backoff duration to be greater than 0")
for i := 1; i < len(backoffDurations); i++ {
assert.Greater(t, backoffDurations[i], time.Duration(0))
assert.GreaterOrEqual(t, backoffDurations[i], backoffDurations[i-1])
assert.LessOrEqual(t, backoffDurations[i], maxBackoff)
}
}

// https://github.com/IBM/sarama/issues/2129
func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
// Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
Expand Down
43 changes: 43 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@ package sarama
import (
"bufio"
"fmt"
"math/rand"
"net"
"regexp"
"time"
)

const (
defaultRetryBackoff = 100 * time.Millisecond
defaultRetryMaxBackoff = 1000 * time.Millisecond
)

type none struct{}
Expand Down Expand Up @@ -344,3 +351,39 @@ func (v KafkaVersion) String() string {

return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
}

// NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter.
// It follows KIP-580, implementing the formula:
// MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(failures - 1)) * random(0.8, 1.2))
// This ensures retries start with `backoff` and exponentially increase until `maxBackoff`, with added jitter.
// The behavior when `failures = 0` is not explicitly defined in KIP-580 and is left to implementation discretion.
//
// Example usage:
//
// backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second)
// config.Producer.Retry.BackoffFunc = backoffFunc
func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration {
Copy link
Contributor

@puellanivis puellanivis Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose, I would like to see a version that can take sarama.NewExponentialBackoff(config) so that the appropriate fields from the config will be extracted and used without me needing to dig them out.

We probably also need a config.Producer.Retry.BackoffMax.

PS: I guess we cannot actually provide a simple config option, because each of the backoffs has its own sub-struct. 🤦‍♀️

Copy link
Contributor Author

@wanwenli wanwenli Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! I also considered making sarama.NewExponentialBackoff(config), but as you pointed out in the PS section, each backoff belongs to its own sub-struct, which makes a simple config-based approach impractical.

This is exactly why I made NewExponentialBackoff a standalone function—neither a method of Config nor a function that takes Config as an input parameter. This approach avoids ambiguity and ensures flexibility across different contexts (producer, metadata, transaction manager) without forcing unnecessary dependencies.

Regarding config.Producer.Retry.BackoffMax, I chose not to introduce it because it would require adding a similar field to every retry-related sub-struct (Metadata, TransactionManager), making the API more tedious to maintain. Keeping the backoff logic encapsulated in a separate function simplifies the design while still allowing explicit configuration per use case.

Let me know your thoughts!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t know that there is a significantly stronger maintenance in adding the BackoffMax. Is it a lot of “toil”? Yes, but as you note, there is already a retry struct in a lot of the other config structs. So, the choice has kind of already been made.

if backoff <= 0 {
backoff = defaultRetryBackoff
}
if maxBackoff <= 0 {
maxBackoff = defaultRetryMaxBackoff
}

if backoff > maxBackoff {
Logger.Println("Warning: backoff is greater than maxBackoff, using maxBackoff instead.")
backoff = maxBackoff
}

return func(retries, maxRetries int) time.Duration {
if retries <= 0 {
return backoff
}

calculatedBackoff := backoff * time.Duration(1<<(retries-1))
jitter := 0.8 + 0.4*rand.Float64()
calculatedBackoff = time.Duration(float64(calculatedBackoff) * jitter)

return min(calculatedBackoff, maxBackoff)
}
}
49 changes: 48 additions & 1 deletion utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

package sarama

import "testing"
import (
"testing"
"time"
)

func TestVersionCompare(t *testing.T) {
if V0_8_2_0.IsAtLeast(V0_8_2_1) {
Expand Down Expand Up @@ -95,3 +98,47 @@ func TestVersionParsing(t *testing.T) {
}
}
}

func TestExponentialBackoffValidCases(t *testing.T) {
testCases := []struct {
retries int
maxRetries int
minBackoff time.Duration
maxBackoffExpected time.Duration
}{
{1, 5, 80 * time.Millisecond, 120 * time.Millisecond},
{3, 5, 320 * time.Millisecond, 480 * time.Millisecond},
{5, 5, 1280 * time.Millisecond, 1920 * time.Millisecond},
}

for _, tc := range testCases {
backoffFunc := NewExponentialBackoff(100*time.Millisecond, 2*time.Second)
backoff := backoffFunc(tc.retries, tc.maxRetries)
if backoff < tc.minBackoff || backoff > tc.maxBackoffExpected {
t.Errorf("backoff(%d, %d): expected between %v and %v, got %v", tc.retries, tc.maxRetries, tc.minBackoff, tc.maxBackoffExpected, backoff)
}
}
}

func TestExponentialBackoffDefaults(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

testCases := []struct {
backoff time.Duration
maxBackoff time.Duration
}{
{-100 * time.Millisecond, 2 * time.Second},
{100 * time.Millisecond, -2 * time.Second},
{-100 * time.Millisecond, -2 * time.Second},
{0 * time.Millisecond, 2 * time.Second},
{100 * time.Millisecond, 0 * time.Second},
{0 * time.Millisecond, 0 * time.Second},
}

for _, tc := range testCases {
backoffFunc := NewExponentialBackoff(tc.backoff, tc.maxBackoff)
backoff := backoffFunc(2, 5)
if backoff < defaultRetryBackoff || backoff > defaultRetryMaxBackoff {
t.Errorf("backoff(%v, %v): expected between %v and %v, got %v",
tc.backoff, tc.maxBackoff, defaultRetryBackoff, defaultRetryMaxBackoff, backoff)
}
}
}
Loading