diff --git a/src/core/actor.rs b/src/core/actor.rs index 0684cf5..9330c47 100644 --- a/src/core/actor.rs +++ b/src/core/actor.rs @@ -143,8 +143,8 @@ impl TaskActor { /// restarting, even with `RestartPolicy::Always`. pub async fn run(self, runtime_token: CancellationToken) -> ActorExitReason { let task_name: Arc = Arc::from(self.task.name().to_owned()); - let mut prev_delay: Option = None; let mut attempt: u32 = 0; + let mut backoff_attempt: u32 = 0; loop { if runtime_token.is_cancelled() { @@ -182,6 +182,7 @@ impl TaskActor { let child = runtime_token.child_token(); attempt += 1; + self.bus.publish( Event::new(EventKind::TaskStarting) .with_task(task_name.clone()) @@ -199,7 +200,7 @@ impl TaskActor { drop(permit); match res { Ok(()) => { - prev_delay = None; + backoff_attempt = 0; match self.params.restart { RestartPolicy::Always { interval } => { @@ -257,8 +258,8 @@ impl TaskActor { return ActorExitReason::PolicyExhausted; } - let delay = self.params.backoff.next(prev_delay); - prev_delay = Some(delay); + let delay = self.params.backoff.next(backoff_attempt); + backoff_attempt += 1; self.bus.publish( Event::new(EventKind::BackoffScheduled) diff --git a/src/core/mod.rs b/src/core/mod.rs index 34d9086..7fffee9 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -67,7 +67,7 @@ //! Err(Fatal) → publish ActorDead & exit //! Err(Canceled) → exit (cooperative shutdown) //! Err(Timeout/Fail) → if policy allows retry: -//! - delay = backoff.next(prev); publish BackoffScheduled{delay}; sleep +//! - delay = backoff.next(attempt); publish BackoffScheduled{delay}; sleep //! - else publish ActorExhausted & exit //! } //! } diff --git a/src/lib.rs b/src/lib.rs index 3981cc2..7e8b41b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,7 +72,7 @@ //! │ └─ Err ──► publish TaskFailed{ task, error, attempt } //! │ ├─ RestartPolicy::Never ─► ActorExhausted, exit //! │ └─ RestartPolicy::OnFailure/Always: -//! │ ├─ compute delay = backoff.next(prev_delay) +//! │ ├─ compute delay = backoff.next(backoff_attempt) //! │ ├─ publish BackoffScheduled{ delay, attempt } //! │ ├─ sleep(delay) (cancellable) //! │ └─ continue diff --git a/src/policies/backoff.rs b/src/policies/backoff.rs index 500c7a0..68529e8 100644 --- a/src/policies/backoff.rs +++ b/src/policies/backoff.rs @@ -6,6 +6,11 @@ //! - [`BackoffPolicy::first`] the initial delay; //! - [`BackoffPolicy::max`] the maximum delay cap. //! +//! The delay for attempt `n` is computed as `first × factor^n`, clamped to `max`, +//! then jitter is applied. Because the base delay is derived purely from the attempt +//! number, jitter output never feeds back into subsequent calculations — this prevents +//! the negative feedback loop that causes delays to shrink over time. +//! //! # Example //! ```rust //! use std::time::Duration; @@ -18,15 +23,14 @@ //! jitter: JitterPolicy::None, //! }; //! -//! // First attempt - uses 'first' (clamped to max) -//! assert_eq!(backoff.next(None), Duration::from_millis(100)); +//! // Attempt 0 — uses 'first' (100ms), clamped to max +//! assert_eq!(backoff.next(0), Duration::from_millis(100)); //! -//! // Second attempt - multiplied by factor (100ms * 2.0 = 200ms) -//! assert_eq!(backoff.next(Some(Duration::from_millis(100))), Duration::from_millis(200)); +//! // Attempt 1 — first × factor^1 = 200ms +//! assert_eq!(backoff.next(1), Duration::from_millis(200)); //! -//! // When previous delay exceeds max, result is capped at max -//! // (20s * 2.0 = 40s, but capped at max=10s) -//! assert_eq!(backoff.next(Some(Duration::from_secs(20))), Duration::from_secs(10)); +//! // Attempt 10 — 100ms × 2^10 = 102_400ms → capped at max=10s +//! assert_eq!(backoff.next(10), Duration::from_secs(10)); //! ``` use std::time::Duration; @@ -67,33 +71,28 @@ impl Default for BackoffPolicy { } impl BackoffPolicy { - /// Computes the next delay based on the previous one. + /// Computes the delay for the given attempt number (0-indexed). /// - /// - If `prev` is `None`, returns `first` **clamped to `max`**. - /// - Otherwise multiplies the previous delay by [`BackoffPolicy::factor`], and caps it at [`BackoffPolicy::max`]. + /// The base delay is `first × factor^attempt`, clamped to [`BackoffPolicy::max`]. + /// Jitter is applied to the clamped base, but the result is **never** fed back + /// into subsequent calculations — each attempt derives its base independently. /// /// # Notes - /// - If `factor` is less than 1.0, delays decrease over time (not typical). + /// - If `factor` is less than 1.0, delays decrease with higher attempts (not typical). /// - If `factor` equals 1.0, delay remains constant at `first` (up to `max`). - /// - If `factor` is greater than 1.0, delays grow exponentially. - pub fn next(&self, prev: Option) -> Duration { - let unclamped = match prev { - None => self.first, - Some(d) => { - let mul = d.as_secs_f64() * self.factor; - if !mul.is_finite() { - self.max - } else { - d.mul_f64(self.factor) - } - } - }; + /// - If `factor` is greater than 1.0, delays grow exponentially up to `max`. + pub fn next(&self, attempt: u32) -> Duration { + let max_secs = self.max.as_secs_f64(); + let clamped_exp = attempt.min(i32::MAX as u32) as i32; + let unclamped_secs = self.first.as_secs_f64() * self.factor.powi(clamped_exp); + + let base = + if !unclamped_secs.is_finite() || unclamped_secs < 0.0 || unclamped_secs > max_secs { + self.max + } else { + Duration::from_secs_f64(unclamped_secs) + }; - let base = if unclamped > self.max { - self.max - } else { - unclamped - }; match self.jitter { JitterPolicy::Decorrelated => { self.jitter @@ -110,14 +109,14 @@ mod tests { use std::time::Duration; #[test] - fn test_first_delay_no_jitter() { + fn test_attempt_zero_returns_first() { let policy = BackoffPolicy { first: Duration::from_millis(100), max: Duration::from_secs(30), factor: 2.0, jitter: JitterPolicy::None, }; - assert_eq!(policy.next(None), Duration::from_millis(100)); + assert_eq!(policy.next(0), Duration::from_millis(100)); } #[test] @@ -129,17 +128,40 @@ mod tests { jitter: JitterPolicy::None, }; - let d1 = policy.next(None); - assert_eq!(d1, Duration::from_millis(100)); - - let d2 = policy.next(Some(d1)); - assert_eq!(d2, Duration::from_millis(200)); + assert_eq!(policy.next(0), Duration::from_millis(100)); + assert_eq!(policy.next(1), Duration::from_millis(200)); + assert_eq!(policy.next(2), Duration::from_millis(400)); + assert_eq!(policy.next(3), Duration::from_millis(800)); + assert_eq!(policy.next(4), Duration::from_millis(1600)); + } - let d3 = policy.next(Some(d2)); - assert_eq!(d3, Duration::from_millis(400)); + #[test] + fn test_constant_factor() { + let policy = BackoffPolicy { + first: Duration::from_millis(500), + max: Duration::from_secs(30), + factor: 1.0, + jitter: JitterPolicy::None, + }; + for attempt in 0..10 { + assert_eq!( + policy.next(attempt), + Duration::from_millis(500), + "attempt {} should be constant at 500ms", + attempt + ); + } + } - let d4 = policy.next(Some(d3)); - assert_eq!(d4, Duration::from_millis(800)); + #[test] + fn test_clamped_to_max() { + let policy = BackoffPolicy { + first: Duration::from_millis(100), + max: Duration::from_secs(1), + factor: 2.0, + jitter: JitterPolicy::None, + }; + assert_eq!(policy.next(10), Duration::from_secs(1)); } #[test] @@ -150,77 +172,87 @@ mod tests { factor: 2.0, jitter: JitterPolicy::None, }; - let d1 = policy.next(None); - assert_eq!(d1, Duration::from_secs(5)); + assert_eq!(policy.next(0), Duration::from_secs(5)); } #[test] - fn test_monotonic_growth_with_equal_jitter() { + fn test_full_jitter_no_negative_feedback() { let policy = BackoffPolicy { first: Duration::from_millis(100), max: Duration::from_secs(30), factor: 2.0, - jitter: JitterPolicy::Equal, + jitter: JitterPolicy::Full, }; - let mut prev = None; - let mut prev_delay = Duration::ZERO; - - for i in 0..20 { - let delay = policy.next(prev); - if i > 5 { - assert!( - delay >= Duration::from_millis(10), - "iteration {}: delay {:?} is suspiciously low (prev: {:?})", - i, - delay, - prev_delay - ); - } - prev_delay = delay; - prev = Some(delay); + for attempt in 5..15 { + let base_ms = 100.0 * 2.0f64.powi(attempt as i32); + let delay = policy.next(attempt); + assert!( + delay <= Duration::from_millis(base_ms as u64), + "attempt {}: delay {:?} exceeds base {}ms", + attempt, + delay, + base_ms + ); } } #[test] - fn test_decorrelated_jitter_no_negative_feedback() { + fn test_equal_jitter_no_negative_feedback() { let policy = BackoffPolicy { first: Duration::from_millis(100), max: Duration::from_secs(30), factor: 2.0, - jitter: JitterPolicy::Decorrelated, + jitter: JitterPolicy::Equal, }; - let mut prev = None; - let mut min_seen = Duration::from_secs(999); - let mut max_seen = Duration::ZERO; - for i in 0..100 { - let delay = policy.next(prev); - - min_seen = min_seen.min(delay); - max_seen = max_seen.max(delay); - + for attempt in 0..15 { + let base_ms = (100.0 * 2.0f64.powi(attempt as i32)).min(30_000.0); + let half = base_ms / 2.0; + let delay = policy.next(attempt); assert!( - delay >= Duration::from_millis(50), // с запасом на jitter - "iteration {}: delay {:?} too low (min_seen: {:?})", - i, + delay >= Duration::from_millis(half as u64), + "attempt {}: delay {:?} < half of base {}ms", + attempt, delay, - min_seen + base_ms + ); + assert!( + delay <= Duration::from_millis(base_ms as u64), + "attempt {}: delay {:?} > base {}ms", + attempt, + delay, + base_ms ); - if i > 10 { - assert!( - delay >= Duration::from_millis(200), - "iteration {}: delay {:?} suspiciously low after warmup", - i, - delay - ); - } - prev = Some(delay); } - println!("Decorrelated stats: min={:?}, max={:?}", min_seen, max_seen); + } + + #[test] + fn test_decorrelated_jitter_grows_with_attempts() { + let policy = BackoffPolicy { + first: Duration::from_millis(100), + max: Duration::from_secs(30), + factor: 2.0, + jitter: JitterPolicy::Decorrelated, + }; + + let mut min_late = Duration::from_secs(999); + let mut max_late = Duration::ZERO; + for _ in 0..100 { + let delay = policy.next(8); + min_late = min_late.min(delay); + max_late = max_late.max(delay); + } + + assert!( + min_late >= Duration::from_millis(100), + "min_late {:?} below floor", + min_late + ); assert!( - max_seen > min_seen * 3, - "Range too narrow for decorrelated jitter" + max_late >= Duration::from_secs(5), + "max_late {:?} suspiciously low, range too narrow", + max_late ); } @@ -232,8 +264,8 @@ mod tests { factor: 1.0, jitter: JitterPolicy::Full, }; - for _ in 0..50 { - let delay = policy.next(Some(Duration::from_millis(1000))); + for attempt in 0..50 { + let delay = policy.next(attempt); assert!(delay <= Duration::from_millis(1000)); } } @@ -243,13 +275,35 @@ mod tests { let policy = BackoffPolicy { first: Duration::from_millis(1000), max: Duration::from_secs(30), - factor: 1.0, // constant base + factor: 1.0, jitter: JitterPolicy::Equal, }; - for _ in 0..50 { - let delay = policy.next(Some(Duration::from_millis(1000))); + for attempt in 0..50 { + let delay = policy.next(attempt); assert!(delay >= Duration::from_millis(500)); assert!(delay <= Duration::from_millis(1000)); } } + + #[test] + fn test_huge_attempt_clamps_to_max() { + let policy = BackoffPolicy { + first: Duration::from_millis(100), + max: Duration::from_secs(60), + factor: 2.0, + jitter: JitterPolicy::None, + }; + assert_eq!(policy.next(100), Duration::from_secs(60)); + } + + #[test] + fn test_non_finite_overflow_clamps_to_max() { + let policy = BackoffPolicy { + first: Duration::from_millis(100), + max: Duration::from_secs(10), + factor: 2.0, + jitter: JitterPolicy::None, + }; + assert_eq!(policy.next(u32::MAX), Duration::from_secs(10)); + } } diff --git a/src/policies/mod.rs b/src/policies/mod.rs index 3b403f6..fec7576 100644 --- a/src/policies/mod.rs +++ b/src/policies/mod.rs @@ -13,7 +13,7 @@ //! TaskSpec { restart: RestartPolicy, backoff: BackoffPolicy, timeout: Option } //! └─► core::actor::TaskActor uses: //! - restart to decide continue/exit -//! - backoff.next(prev_delay) to schedule the next attempt +//! - backoff.next(attempt) to schedule the next attempt //! ``` //! //! ## Defaults