Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/core/alive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ impl AliveTracker {
/// during graceful shutdown (tasks that didn't stop within grace period).
pub async fn snapshot(&self) -> Vec<String> {
let state = self.state.read().await;
let alive: Vec<String> = state
let mut alive: Vec<String> = state
.iter()
.filter(|(_, ts)| ts.alive)
.map(|(name, _)| name.clone())
.collect();
alive.sort_unstable();
alive
}

Expand Down
36 changes: 13 additions & 23 deletions src/core/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,15 @@ impl Registry {
/// Spawns an actor and registers its handle.
async fn spawn_and_register(&self, spec: TaskSpec) {
let task_name = spec.task().name().to_string();
{
let tasks = self.tasks.read().await;
if tasks.contains_key(&task_name) {
self.bus.publish(
Event::new(EventKind::TaskFailed)
.with_task(task_name)
.with_reason("task_already_exists"),
);
return;
}

let mut tasks = self.tasks.write().await;
if tasks.contains_key(&task_name) {
self.bus.publish(
Event::new(EventKind::TaskFailed)
.with_task(task_name)
.with_reason("task_already_exists"),
);
return;
}

let task_token = self.runtime_token.child_token();
Expand All @@ -210,23 +209,14 @@ impl Registry {
cancel: task_token,
};

let mut tasks = self.tasks.write().await;
let was_empty = tasks.is_empty();
let inserted = tasks.insert(task_name.clone(), handle).is_none();
tasks.insert(task_name.clone(), handle);
let len_after = tasks.len();
drop(tasks);

if inserted {
self.notify_after_insert(was_empty, len_after);
self.bus
.publish(Event::new(EventKind::TaskAdded).with_task(task_name));
} else {
self.bus.publish(
Event::new(EventKind::TaskFailed)
.with_task(task_name)
.with_reason("task_already_exists_race"),
);
}
self.notify_after_insert(was_empty, len_after);
self.bus
.publish(Event::new(EventKind::TaskAdded).with_task(task_name));
}

/// Removes a task and cancels its token.
Expand Down
66 changes: 27 additions & 39 deletions src/core/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,50 +390,38 @@ impl Supervisor {
wait_for: Duration,
) -> Result<bool, RuntimeError> {
let target = name.to_string();
let start = tokio::time::Instant::now();
let mut last_poll = tokio::time::Instant::now();
let poll_interval = Duration::from_millis(100);

loop {
if start.elapsed() >= wait_for {
return Err(RuntimeError::TaskRemoveTimeout {
name: target,
timeout: wait_for,
});
}
if last_poll.elapsed() >= poll_interval {
let tasks = self.registry.list().await;
if !tasks.contains(&target) {
return Ok(true);
}
last_poll = tokio::time::Instant::now();
}

let recv_timeout = poll_interval
.checked_sub(last_poll.elapsed())
.unwrap_or(Duration::from_millis(10));

match tokio::time::timeout(recv_timeout, rx.recv()).await {
Ok(Ok(ev))
if matches!(ev.kind, EventKind::TaskRemoved)
&& ev.task.as_deref() == Some(&target) =>
{
return Ok(true);
}
Ok(Ok(_)) => {}
Ok(Err(broadcast::error::RecvError::Closed)) => {
let tasks = self.registry.list().await;
return Ok(!tasks.contains(&target));
}
Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
let tasks = self.registry.list().await;
if !tasks.contains(&target) {
let wait_for_event = async {
loop {
match rx.recv().await {
Ok(ev)
if matches!(ev.kind, EventKind::TaskRemoved)
&& ev.task.as_deref() == Some(target.as_str()) =>
{
return Ok(true);
}
last_poll = tokio::time::Instant::now();
Ok(_) => {}
Err(broadcast::error::RecvError::Lagged(_)) => {
// We may have missed the TaskRemoved event; check the registry.
let tasks = self.registry.list().await;
if !tasks.contains(&target) {
return Ok(true);
}
}
Err(broadcast::error::RecvError::Closed) => {
let tasks = self.registry.list().await;
return Ok(!tasks.contains(&target));
}
}
Err(_elapsed) => {}
}
};

match timeout(wait_for, wait_for_event).await {
Ok(result) => result,
Err(_) => Err(RuntimeError::TaskRemoveTimeout {
name: target,
timeout: wait_for,
}),
}
}
}