Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions src/core/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,115 @@ fn publish_timeout(bus: &Bus, name: &str, dur: Duration, attempt: u32) {
.with_attempt(attempt),
);
}

#[cfg(test)]
mod tests {
use super::*;
use std::future::Future;
use std::pin::Pin;

type BoxFut = Pin<Box<dyn Future<Output = Result<(), TaskError>> + Send + 'static>>;

struct SlowTask;

impl Task for SlowTask {
fn name(&self) -> &str {
"slow-task"
}

fn spawn(&self, _ctx: CancellationToken) -> BoxFut {
Box::pin(async {
tokio::time::sleep(Duration::from_secs(3600)).await;
Ok(())
})
}
}

struct OkTask;

impl Task for OkTask {
fn name(&self) -> &str {
"ok-task"
}

fn spawn(&self, _ctx: CancellationToken) -> BoxFut {
Box::pin(async { Ok(()) })
}
}

struct FailTask;

impl Task for FailTask {
fn name(&self) -> &str {
"fail-task"
}

fn spawn(&self, _ctx: CancellationToken) -> BoxFut {
Box::pin(async {
Err(TaskError::Fail {
reason: "boom".into(),
})
})
}
}

#[tokio::test]
async fn test_timeout_returns_timeout_variant_not_fail() {
let bus = Bus::new(16);
let parent = CancellationToken::new();
let timeout = Some(Duration::from_millis(50));

let result = run_once(&SlowTask, &parent, timeout, 1, &bus).await;

match result {
Err(TaskError::Timeout { timeout: dur }) => {
assert_eq!(dur, Duration::from_millis(50));
}
Err(TaskError::Fail { reason }) => {
panic!("timeout should return TaskError::Timeout, not TaskError::Fail: {reason}");
}
other => {
panic!("expected TaskError::Timeout, got: {other:?}");
}
}
}

#[tokio::test]
async fn test_success_returns_ok() {
let bus = Bus::new(16);
let parent = CancellationToken::new();

let result = run_once(&OkTask, &parent, None, 1, &bus).await;
assert!(result.is_ok());
}

#[tokio::test]
async fn test_failure_returns_fail_variant() {
let bus = Bus::new(16);
let parent = CancellationToken::new();

let result = run_once(&FailTask, &parent, None, 1, &bus).await;

assert!(
matches!(result, Err(TaskError::Fail { .. })),
"expected TaskError::Fail, got: {result:?}"
);
}

#[tokio::test]
async fn test_timeout_publishes_timeout_hit_event() {
let bus = Bus::new(16);
let mut rx = bus.subscribe();
let parent = CancellationToken::new();

let _ = run_once(&SlowTask, &parent, Some(Duration::from_millis(50)), 1, &bus).await;

let mut saw_timeout_hit = false;
while let Ok(ev) = rx.try_recv() {
if matches!(ev.kind, EventKind::TimeoutHit) {
saw_timeout_hit = true;
}
}
assert!(saw_timeout_hit, "expected TimeoutHit event to be published");
}
}
8 changes: 0 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,3 @@ impl TaskError {
matches!(self, TaskError::Fatal { .. })
}
}

impl From<tokio::time::error::Elapsed> for TaskError {
fn from(e: tokio::time::error::Elapsed) -> Self {
TaskError::Fail {
reason: e.to_string(),
}
}
}