Skip to content

Commit 936078a

Browse files
committed
rate: fix limiter becoming invalid under high rate and concurrency
The rate limiter could become ineffective when the request rate or concurrency were high. This is because we allowed time reverse jump. This change modifies the the time calculation of the limiter. If input an old time, we don't change time of limiter. Fixes golang/go#65508
1 parent 2b4e439 commit 936078a

File tree

2 files changed

+46
-5
lines changed

2 files changed

+46
-5
lines changed

rate/rate.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,9 @@ func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration)
372372
r.timeToAct = t.Add(waitDuration)
373373

374374
// Update state
375-
lim.last = t
375+
if t.After(lim.last) {
376+
lim.last = t
377+
}
376378
lim.tokens = tokens
377379
lim.lastEvent = r.timeToAct
378380
}

rate/rate_test.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ var (
6161
t3 = t0.Add(time.Duration(3) * d)
6262
t4 = t0.Add(time.Duration(4) * d)
6363
t5 = t0.Add(time.Duration(5) * d)
64+
t6 = t0.Add(time.Duration(6) * d)
6465
t9 = t0.Add(time.Duration(9) * d)
6566
)
6667

@@ -125,7 +126,7 @@ func TestLimiterJumpBackwards(t *testing.T) {
125126
{t0, 1, 1, true},
126127
{t0, 0, 1, false},
127128
{t0, 0, 1, false},
128-
{t1, 1, 1, true}, // got a token
129+
// {t1, 1, 1, true}, // got a token
129130
{t1, 0, 1, false},
130131
{t1, 0, 1, false},
131132
{t2, 1, 1, true}, // got another token
@@ -423,13 +424,13 @@ func TestReserveJumpBack(t *testing.T) {
423424

424425
runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
425426
runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst
426-
runReserve(t, lim, request{t2, 2, t3, true})
427+
runReserve(t, lim, request{t2, 2, t4, true})
427428
// burst size is 2, so n=3 always fails, and the state of lim should not be changed
428429
runReserve(t, lim, request{t0, 3, time.Time{}, false})
429-
runReserve(t, lim, request{t2, 1, t4, true})
430+
runReserve(t, lim, request{t2, 1, t5, true})
430431
// the maxReserve is not enough so it fails, and the state of lim should not be changed
431432
runReserveMax(t, lim, request{t0, 2, time.Time{}, false}, d)
432-
runReserve(t, lim, request{t2, 1, t5, true})
433+
runReserve(t, lim, request{t2, 1, t6, true})
433434
}
434435

435436
func TestReserveJumpBackCancel(t *testing.T) {
@@ -655,3 +656,41 @@ func TestTinyLimit(t *testing.T) {
655656
t.Errorf("Limit(1e-10, 1) want false when already used")
656657
}
657658
}
659+
660+
func TestLimiter(t *testing.T) {
661+
runTestLimiter := func(concurrency int, totalRate float64, burst int, duration time.Duration) {
662+
limiter := NewLimiter(Limit(totalRate), burst)
663+
var allowed int64
664+
var wg sync.WaitGroup
665+
ctx, _ := context.WithTimeout(context.Background(), duration)
666+
667+
for range concurrency {
668+
wg.Add(1)
669+
go func() {
670+
defer wg.Done()
671+
for {
672+
select {
673+
case <-ctx.Done():
674+
return
675+
default:
676+
err := limiter.Wait(ctx)
677+
if err == nil {
678+
atomic.AddInt64(&allowed, 1)
679+
} else {
680+
return
681+
}
682+
}
683+
}
684+
}()
685+
}
686+
687+
wg.Wait()
688+
most := int64(totalRate*duration.Seconds()) + int64(burst)
689+
if allowed > most {
690+
t.Errorf("limit failed.At most, it should be:%d,but in fact:%d", most, allowed)
691+
}
692+
}
693+
runTestLimiter(1000, 100000, 100000, 5*time.Second)
694+
runTestLimiter(1000, 100000, 10000, 5*time.Second)
695+
runTestLimiter(100, 100000, 10000, 5*time.Second)
696+
}

0 commit comments

Comments
 (0)