Skip to content

Commit

Permalink
feat: exponential backoff for clients
Browse files Browse the repository at this point in the history
Signed-off-by: Wenli Wan <[email protected]>
  • Loading branch information
wanwenli committed Feb 13, 2025
1 parent 9ae475a commit 2210250
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 1 deletion.
60 changes: 60 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,66 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
}
}

func TestAsyncProducerWithExponentialBackoffDurations(t *testing.T) {
var backoffDurations []time.Duration
var mu sync.Mutex

topic := "my_topic"
maxBackoff := 2 * time.Second
config := NewTestConfig()

backoffFunc := func(retries, maxRetries int) time.Duration {
duration := NewExponentialBackoff(config.Producer.Retry.Backoff, maxBackoff)(retries, maxRetries)
mu.Lock()
backoffDurations = append(backoffDurations, duration)
mu.Unlock()
return duration
}

config.Producer.Flush.MaxMessages = 1
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 4
config.Producer.Retry.BackoffFunc = backoffFunc

broker := NewMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition(topic, 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
broker.Returns(metadataResponse)

producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

failResponse := new(ProduceResponse)
failResponse.AddTopicPartition(topic, 0, ErrNotLeaderForPartition)
successResponse := new(ProduceResponse)
successResponse.AddTopicPartition(topic, 0, ErrNoError)

broker.Returns(failResponse)
broker.Returns(metadataResponse)
broker.Returns(failResponse)
broker.Returns(metadataResponse)
broker.Returns(successResponse)

producer.Input() <- &ProducerMessage{Topic: topic, Value: StringEncoder("test")}

expectResults(t, producer, 1, 0)
closeProducer(t, producer)
broker.Close()

for i := 1; i < len(backoffDurations); i++ {
if backoffDurations[i] < backoffDurations[i-1] {
t.Errorf("expected backoff[%d] >= backoff[%d], got %v < %v", i, i-1, backoffDurations[i], backoffDurations[i-1])
}
if backoffDurations[i] > maxBackoff {
t.Errorf("backoff exceeded max: %v", backoffDurations[i])
}
}
}

// https://github.com/IBM/sarama/issues/2129
func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
// Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
Expand Down
36 changes: 36 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package sarama
import (
"bufio"
"fmt"
"math/rand"
"net"
"regexp"
"sync"
"time"
)

type none struct{}
Expand Down Expand Up @@ -344,3 +347,36 @@ func (v KafkaVersion) String() string {

return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
}

// NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter.
// It follows KIP-580, implementing the formula:
// MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(retries - 1)) * random(0.8, 1.2))
// This ensures retries start with `backoff` and exponentially increase until `maxBackoff`, with added jitter.
//
// Example usage:
//
// backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second)
// config.Producer.Retry.BackoffFunc = backoffFunc
func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration {
var rngMu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))

return func(retries, maxRetries int) time.Duration {
if retries <= 0 {
return backoff
}

calculatedBackoff := backoff * time.Duration(1<<(retries-1))

rngMu.Lock()
jitter := 0.8 + 0.4*rng.Float64()
rngMu.Unlock()

calculatedBackoff = time.Duration(float64(calculatedBackoff) * jitter)

if calculatedBackoff > maxBackoff {
return maxBackoff
}
return calculatedBackoff
}
}
43 changes: 42 additions & 1 deletion utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

package sarama

import "testing"
import (
"sync"
"testing"
"time"
)

func TestVersionCompare(t *testing.T) {
if V0_8_2_0.IsAtLeast(V0_8_2_1) {
Expand Down Expand Up @@ -95,3 +99,40 @@ func TestVersionParsing(t *testing.T) {
}
}
}

func TestExponentialBackoffCorrectness(t *testing.T) {
backoffFunc := NewExponentialBackoff(100*time.Millisecond, 2*time.Second)
testCases := []struct {
retries int
maxRetries int
minBackoff time.Duration
maxBackoff time.Duration
}{
{0, 5, 100 * time.Millisecond, 100 * time.Millisecond},
{1, 5, 80 * time.Millisecond, 120 * time.Millisecond},
{3, 5, 320 * time.Millisecond, 480 * time.Millisecond},
{5, 5, 1280 * time.Millisecond, 2 * time.Second},
}

for _, tc := range testCases {
backoff := backoffFunc(tc.retries, tc.maxRetries)
if backoff < tc.minBackoff || backoff > tc.maxBackoff {
t.Errorf("retries=%d: expected backoff between %v and %v, got %v", tc.retries, tc.minBackoff, tc.maxBackoff, backoff)
}
}
}

func TestExponentialBackoffRaceDetection(t *testing.T) {
backoffFunc := NewExponentialBackoff(100*time.Millisecond, 2*time.Second)
var wg sync.WaitGroup
concurrency := 1000

wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func(i int) {
defer wg.Done()
_ = backoffFunc(i%10, 5)
}(i)
}
wg.Wait()
}

0 comments on commit 2210250

Please sign in to comment.