Skip to content

Commit f2c9e37

Browse files
committed
throttler jitter little copy over bad delayer abstraction
1 parent e903a2e commit f2c9e37

File tree

5 files changed

+35
-44
lines changed

5 files changed

+35
-44
lines changed

README.MD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ thr := NewThrottlerAll( // throttles only if all children throttle
133133
In gohalt v0.4.0 breaking change is introduced to replace all untyped errors with two major error types:
134134
- `ErrorThreshold` which defines error type that occurs if throttler reaches specified threshold.
135135
- `ErrorInternal` which defines error type that occurs if throttler internal error happens.
136-
You can find list of returning error types for all existing throttlers in throttlers table bellow or in documentation.
136+
You can find list of returning error types for all existing throttlers in throttlers table bellow or in documentation.
137137
**Note:** not every gohalt throttler must return error; some throttlers might cause different side effects like logging or call to `time.Sleep` instead.
138138

139139
## Throttlers

delayers.go

Lines changed: 0 additions & 15 deletions
This file was deleted.

executors.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ func loop(period time.Duration, run Runnable) Runnable {
3939

4040
func delayed(after time.Duration, run Runnable) Runnable {
4141
return func(ctx context.Context) error {
42-
sleep(ctx, after)
42+
if err := sleep(ctx, after); err != nil {
43+
return err
44+
}
4345
return run(ctx)
4446
}
4547
}
@@ -138,3 +140,8 @@ func gorun(ctx context.Context, r Runnable) {
138140
_ = r(ctx)
139141
}()
140142
}
143+
144+
func sleep(_ context.Context, dur time.Duration) error {
145+
time.Sleep(dur)
146+
return nil
147+
}

throttlers.go

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,18 @@ func NewThrottlerWait(duration time.Duration) Throttler {
6262
}
6363

6464
func (thr twait) Acquire(ctx context.Context) error {
65-
sleep(ctx, thr.duration)
66-
return nil
65+
return sleep(ctx, thr.duration)
6766
}
6867

6968
func (thr twait) Release(context.Context) error {
7069
return nil
7170
}
7271

7372
type tsquare struct {
74-
delayer delayer
75-
duration uint64
76-
initial time.Duration
77-
limit time.Duration
78-
current uint64
79-
reset bool
73+
initial time.Duration
74+
limit time.Duration
75+
current uint64
76+
reset bool
8077
}
8178

8279
// NewThrottlerSquare creates new throttler instance that
@@ -85,7 +82,7 @@ type tsquare struct {
8582
// If reset is set then after throttler riches the specified duration limit
8683
// next multiplier value will be reseted.
8784
func NewThrottlerSquare(initial time.Duration, limit time.Duration, reset bool) Throttler {
88-
return &tsquare{delayer: sleep, initial: initial, limit: limit, reset: reset}
85+
return &tsquare{initial: initial, limit: limit, reset: reset}
8986
}
9087

9188
func (thr *tsquare) Acquire(ctx context.Context) error {
@@ -97,9 +94,7 @@ func (thr *tsquare) Acquire(ctx context.Context) error {
9794
atomicSet(&thr.current, 0)
9895
}
9996
}
100-
atomicSet(&thr.duration, uint64(duration))
101-
thr.delayer(ctx, duration)
102-
return nil
97+
return sleep(ctx, duration)
10398
}
10499

105100
func (thr *tsquare) Release(context.Context) error {
@@ -108,8 +103,11 @@ func (thr *tsquare) Release(context.Context) error {
108103
}
109104

110105
type tjitter struct {
111-
*tsquare
112-
jitter float64
106+
initial time.Duration
107+
limit time.Duration
108+
current uint64
109+
reset bool
110+
jitter float64
113111
}
114112

115113
// NewThrottlerJitter creates new throttler instance that
@@ -124,25 +122,26 @@ func NewThrottlerJitter(initial time.Duration, limit time.Duration, reset bool,
124122
jitter = 1.0
125123
}
126124
jitter = 1.0 - jitter
127-
thr := &tjitter{jitter: jitter}
128-
thr.tsquare = NewThrottlerSquare(initial, limit, reset).(*tsquare)
129-
// we need to patch parent square to avoid sleeping
130-
// and use jittered calculated duration instead
131-
thr.delayer = vigil
125+
thr := &tjitter{initial: initial, limit: limit, reset: reset, jitter: jitter}
132126
return thr
133127
}
134128

135129
func (thr *tjitter) Acquire(ctx context.Context) error {
136-
_ = thr.tsquare.Acquire(ctx)
137-
duration := float64(atomicGet(&thr.duration))
138-
base := duration * thr.jitter
139-
side := (duration - base) * rand.Float64()
140-
sleep(ctx, time.Duration(base+side))
141-
return nil
130+
current := atomicBIncr(&thr.current)
131+
duration := thr.initial * time.Duration(current*current)
132+
if thr.limit > 0 && duration > thr.limit {
133+
duration = thr.limit
134+
if thr.reset {
135+
atomicSet(&thr.current, 0)
136+
}
137+
}
138+
base := float64(duration) * thr.jitter
139+
side := (float64(duration) - base) * rand.Float64()
140+
return sleep(ctx, time.Duration(base+side))
142141
}
143142

144143
func (thr *tjitter) Release(ctx context.Context) error {
145-
_ = thr.tsquare.Release(ctx)
144+
atomicBDecr(&thr.current)
146145
return nil
147146
}
148147

throttlers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (t *tcase) run(index int) (dur time.Duration, err error) {
6868
ts = time.Now()
6969
// force strict acquire order
7070
for index != int(atomicGet(&t.idx)) {
71-
sleep(ctx, time.Microsecond)
71+
_ = sleep(ctx, time.Microsecond)
7272
}
7373
// set additional timestamp only if present
7474
if index < len(t.tss) {

0 commit comments

Comments
 (0)