Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ impl TaskActor {
/// restarting, even with `RestartPolicy::Always`.
pub async fn run(self, runtime_token: CancellationToken) -> ActorExitReason {
let task_name: Arc<str> = Arc::from(self.task.name().to_owned());
let mut prev_delay: Option<Duration> = None;
let mut attempt: u32 = 0;
let mut backoff_attempt: u32 = 0;

loop {
if runtime_token.is_cancelled() {
Expand Down Expand Up @@ -182,6 +182,7 @@ impl TaskActor {

let child = runtime_token.child_token();
attempt += 1;

self.bus.publish(
Event::new(EventKind::TaskStarting)
.with_task(task_name.clone())
Expand All @@ -199,7 +200,7 @@ impl TaskActor {
drop(permit);
match res {
Ok(()) => {
prev_delay = None;
backoff_attempt = 0;

match self.params.restart {
RestartPolicy::Always { interval } => {
Expand Down Expand Up @@ -257,8 +258,8 @@ impl TaskActor {
return ActorExitReason::PolicyExhausted;
}

let delay = self.params.backoff.next(prev_delay);
prev_delay = Some(delay);
let delay = self.params.backoff.next(backoff_attempt);
backoff_attempt += 1;

self.bus.publish(
Event::new(EventKind::BackoffScheduled)
Expand Down
2 changes: 1 addition & 1 deletion src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
//! Err(Fatal) β†’ publish ActorDead & exit
//! Err(Canceled) β†’ exit (cooperative shutdown)
//! Err(Timeout/Fail) β†’ if policy allows retry:
//! - delay = backoff.next(prev); publish BackoffScheduled{delay}; sleep
//! - delay = backoff.next(attempt); publish BackoffScheduled{delay}; sleep
//! - else publish ActorExhausted & exit
//! }
//! }
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
//! β”‚ └─ Err ──► publish TaskFailed{ task, error, attempt }
//! β”‚ β”œβ”€ RestartPolicy::Never ─► ActorExhausted, exit
//! β”‚ └─ RestartPolicy::OnFailure/Always:
//! β”‚ β”œβ”€ compute delay = backoff.next(prev_delay)
//! β”‚ β”œβ”€ compute delay = backoff.next(backoff_attempt)
//! β”‚ β”œβ”€ publish BackoffScheduled{ delay, attempt }
//! β”‚ β”œβ”€ sleep(delay) (cancellable)
//! β”‚ └─ continue
Expand Down
238 changes: 146 additions & 92 deletions src/policies/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
//! - [`BackoffPolicy::first`] the initial delay;
//! - [`BackoffPolicy::max`] the maximum delay cap.
//!
//! The delay for attempt `n` is computed as `first Γ— factor^n`, clamped to `max`,
//! then jitter is applied. Because the base delay is derived purely from the attempt
//! number, jitter output never feeds back into subsequent calculations β€” this prevents
//! the negative feedback loop that causes delays to shrink over time.
//!
//! # Example
//! ```rust
//! use std::time::Duration;
Expand All @@ -18,15 +23,14 @@
//! jitter: JitterPolicy::None,
//! };
//!
//! // First attempt - uses 'first' (clamped to max)
//! assert_eq!(backoff.next(None), Duration::from_millis(100));
//! // Attempt 0 β€” uses 'first' (100ms), clamped to max
//! assert_eq!(backoff.next(0), Duration::from_millis(100));
//!
//! // Second attempt - multiplied by factor (100ms * 2.0 = 200ms)
//! assert_eq!(backoff.next(Some(Duration::from_millis(100))), Duration::from_millis(200));
//! // Attempt 1 β€” first Γ— factor^1 = 200ms
//! assert_eq!(backoff.next(1), Duration::from_millis(200));
//!
//! // When previous delay exceeds max, result is capped at max
//! // (20s * 2.0 = 40s, but capped at max=10s)
//! assert_eq!(backoff.next(Some(Duration::from_secs(20))), Duration::from_secs(10));
//! // Attempt 10 β€” 100ms Γ— 2^10 = 102_400ms β†’ capped at max=10s
//! assert_eq!(backoff.next(10), Duration::from_secs(10));
//! ```

use std::time::Duration;
Expand Down Expand Up @@ -67,33 +71,28 @@ impl Default for BackoffPolicy {
}

impl BackoffPolicy {
/// Computes the next delay based on the previous one.
/// Computes the delay for the given attempt number (0-indexed).
///
/// - If `prev` is `None`, returns `first` **clamped to `max`**.
/// - Otherwise multiplies the previous delay by [`BackoffPolicy::factor`], and caps it at [`BackoffPolicy::max`].
/// The base delay is `first Γ— factor^attempt`, clamped to [`BackoffPolicy::max`].
/// Jitter is applied to the clamped base, but the result is **never** fed back
/// into subsequent calculations β€” each attempt derives its base independently.
///
/// # Notes
/// - If `factor` is less than 1.0, delays decrease over time (not typical).
/// - If `factor` is less than 1.0, delays decrease with higher attempts (not typical).
/// - If `factor` equals 1.0, delay remains constant at `first` (up to `max`).
/// - If `factor` is greater than 1.0, delays grow exponentially.
pub fn next(&self, prev: Option<Duration>) -> Duration {
let unclamped = match prev {
None => self.first,
Some(d) => {
let mul = d.as_secs_f64() * self.factor;
if !mul.is_finite() {
self.max
} else {
d.mul_f64(self.factor)
}
}
};
/// - If `factor` is greater than 1.0, delays grow exponentially up to `max`.
pub fn next(&self, attempt: u32) -> Duration {
let max_secs = self.max.as_secs_f64();
let clamped_exp = attempt.min(i32::MAX as u32) as i32;
let unclamped_secs = self.first.as_secs_f64() * self.factor.powi(clamped_exp);

let base =
if !unclamped_secs.is_finite() || unclamped_secs < 0.0 || unclamped_secs > max_secs {
self.max
} else {
Duration::from_secs_f64(unclamped_secs)
};

let base = if unclamped > self.max {
self.max
} else {
unclamped
};
match self.jitter {
JitterPolicy::Decorrelated => {
self.jitter
Expand All @@ -110,14 +109,14 @@ mod tests {
use std::time::Duration;

#[test]
fn test_first_delay_no_jitter() {
fn test_attempt_zero_returns_first() {
let policy = BackoffPolicy {
first: Duration::from_millis(100),
max: Duration::from_secs(30),
factor: 2.0,
jitter: JitterPolicy::None,
};
assert_eq!(policy.next(None), Duration::from_millis(100));
assert_eq!(policy.next(0), Duration::from_millis(100));
}

#[test]
Expand All @@ -129,17 +128,40 @@ mod tests {
jitter: JitterPolicy::None,
};

let d1 = policy.next(None);
assert_eq!(d1, Duration::from_millis(100));

let d2 = policy.next(Some(d1));
assert_eq!(d2, Duration::from_millis(200));
assert_eq!(policy.next(0), Duration::from_millis(100));
assert_eq!(policy.next(1), Duration::from_millis(200));
assert_eq!(policy.next(2), Duration::from_millis(400));
assert_eq!(policy.next(3), Duration::from_millis(800));
assert_eq!(policy.next(4), Duration::from_millis(1600));
}

let d3 = policy.next(Some(d2));
assert_eq!(d3, Duration::from_millis(400));
#[test]
fn test_constant_factor() {
let policy = BackoffPolicy {
first: Duration::from_millis(500),
max: Duration::from_secs(30),
factor: 1.0,
jitter: JitterPolicy::None,
};
for attempt in 0..10 {
assert_eq!(
policy.next(attempt),
Duration::from_millis(500),
"attempt {} should be constant at 500ms",
attempt
);
}
}

let d4 = policy.next(Some(d3));
assert_eq!(d4, Duration::from_millis(800));
#[test]
fn test_clamped_to_max() {
let policy = BackoffPolicy {
first: Duration::from_millis(100),
max: Duration::from_secs(1),
factor: 2.0,
jitter: JitterPolicy::None,
};
assert_eq!(policy.next(10), Duration::from_secs(1));
}

#[test]
Expand All @@ -150,77 +172,87 @@ mod tests {
factor: 2.0,
jitter: JitterPolicy::None,
};
let d1 = policy.next(None);
assert_eq!(d1, Duration::from_secs(5));
assert_eq!(policy.next(0), Duration::from_secs(5));
}

#[test]
fn test_monotonic_growth_with_equal_jitter() {
fn test_full_jitter_no_negative_feedback() {
let policy = BackoffPolicy {
first: Duration::from_millis(100),
max: Duration::from_secs(30),
factor: 2.0,
jitter: JitterPolicy::Equal,
jitter: JitterPolicy::Full,
};

let mut prev = None;
let mut prev_delay = Duration::ZERO;

for i in 0..20 {
let delay = policy.next(prev);
if i > 5 {
assert!(
delay >= Duration::from_millis(10),
"iteration {}: delay {:?} is suspiciously low (prev: {:?})",
i,
delay,
prev_delay
);
}
prev_delay = delay;
prev = Some(delay);
for attempt in 5..15 {
let base_ms = 100.0 * 2.0f64.powi(attempt as i32);
let delay = policy.next(attempt);
assert!(
delay <= Duration::from_millis(base_ms as u64),
"attempt {}: delay {:?} exceeds base {}ms",
attempt,
delay,
base_ms
);
}
}

#[test]
fn test_decorrelated_jitter_no_negative_feedback() {
fn test_equal_jitter_no_negative_feedback() {
let policy = BackoffPolicy {
first: Duration::from_millis(100),
max: Duration::from_secs(30),
factor: 2.0,
jitter: JitterPolicy::Decorrelated,
jitter: JitterPolicy::Equal,
};

let mut prev = None;
let mut min_seen = Duration::from_secs(999);
let mut max_seen = Duration::ZERO;
for i in 0..100 {
let delay = policy.next(prev);

min_seen = min_seen.min(delay);
max_seen = max_seen.max(delay);

for attempt in 0..15 {
let base_ms = (100.0 * 2.0f64.powi(attempt as i32)).min(30_000.0);
let half = base_ms / 2.0;
let delay = policy.next(attempt);
assert!(
delay >= Duration::from_millis(50), // с запасом на jitter
"iteration {}: delay {:?} too low (min_seen: {:?})",
i,
delay >= Duration::from_millis(half as u64),
"attempt {}: delay {:?} < half of base {}ms",
attempt,
delay,
min_seen
base_ms
);
assert!(
delay <= Duration::from_millis(base_ms as u64),
"attempt {}: delay {:?} > base {}ms",
attempt,
delay,
base_ms
);
if i > 10 {
assert!(
delay >= Duration::from_millis(200),
"iteration {}: delay {:?} suspiciously low after warmup",
i,
delay
);
}
prev = Some(delay);
}
println!("Decorrelated stats: min={:?}, max={:?}", min_seen, max_seen);
}

#[test]
fn test_decorrelated_jitter_grows_with_attempts() {
let policy = BackoffPolicy {
first: Duration::from_millis(100),
max: Duration::from_secs(30),
factor: 2.0,
jitter: JitterPolicy::Decorrelated,
};

let mut min_late = Duration::from_secs(999);
let mut max_late = Duration::ZERO;
for _ in 0..100 {
let delay = policy.next(8);
min_late = min_late.min(delay);
max_late = max_late.max(delay);
}

assert!(
min_late >= Duration::from_millis(100),
"min_late {:?} below floor",
min_late
);
assert!(
max_seen > min_seen * 3,
"Range too narrow for decorrelated jitter"
max_late >= Duration::from_secs(5),
"max_late {:?} suspiciously low, range too narrow",
max_late
);
}

Expand All @@ -232,8 +264,8 @@ mod tests {
factor: 1.0,
jitter: JitterPolicy::Full,
};
for _ in 0..50 {
let delay = policy.next(Some(Duration::from_millis(1000)));
for attempt in 0..50 {
let delay = policy.next(attempt);
assert!(delay <= Duration::from_millis(1000));
}
}
Expand All @@ -243,13 +275,35 @@ mod tests {
let policy = BackoffPolicy {
first: Duration::from_millis(1000),
max: Duration::from_secs(30),
factor: 1.0, // constant base
factor: 1.0,
jitter: JitterPolicy::Equal,
};
for _ in 0..50 {
let delay = policy.next(Some(Duration::from_millis(1000)));
for attempt in 0..50 {
let delay = policy.next(attempt);
assert!(delay >= Duration::from_millis(500));
assert!(delay <= Duration::from_millis(1000));
}
}

#[test]
fn test_huge_attempt_clamps_to_max() {
let policy = BackoffPolicy {
first: Duration::from_millis(100),
max: Duration::from_secs(60),
factor: 2.0,
jitter: JitterPolicy::None,
};
assert_eq!(policy.next(100), Duration::from_secs(60));
}

#[test]
fn test_non_finite_overflow_clamps_to_max() {
let policy = BackoffPolicy {
first: Duration::from_millis(100),
max: Duration::from_secs(10),
factor: 2.0,
jitter: JitterPolicy::None,
};
assert_eq!(policy.next(u32::MAX), Duration::from_secs(10));
}
}
Loading