From febc18ea7a701d7f15f783558334ed6c38680124 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 00:03:36 -0700 Subject: [PATCH 01/13] Added tick_event feature --- Cargo.toml | 5 ++- README.md | 8 ++-- src/lib.rs | 6 ++- src/split_ticked_async_executor.rs | 38 ++++++++++++------- src/ticked_async_executor.rs | 21 +++++----- ...mer.rs => ticked_timer_from_tick_event.rs} | 14 +++---- tests/tokio_tests.rs | 4 +- 7 files changed, 59 insertions(+), 37 deletions(-) rename src/{ticked_timer.rs => ticked_timer_from_tick_event.rs} (54%) diff --git a/Cargo.toml b/Cargo.toml index 09821bb..ed90d56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ticked_async_executor" -version = "0.2.1" +version = "0.3.0" authors = ["coder137"] edition = "2021" description = "Local executor that runs woken async tasks when it is ticked" @@ -9,6 +9,9 @@ repository = "https://github.com/coder137/ticked-async-executor" categories = ["asynchronous", "concurrency", "game-development", "simulation"] readme = "README.md" +[features] +tick_event = [] + [dependencies] async-task = "4.7" pin-project = "1" diff --git a/README.md b/README.md index b433167..9fb83b8 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Async Local Executor which executes woken tasks only when it is ticked +MSRV: 1.87 + # Usage ## Default Local Executor @@ -11,7 +13,7 @@ use ticked_async_executor::*; const DELTA: f64 = 1000.0 / 60.0; -let executor = TickedAsyncExecutor::default(); +let mut executor = TickedAsyncExecutor::default(); executor.spawn_local("MyIdentifier", async move {}).detach(); @@ -28,7 +30,7 @@ use ticked_async_executor::*; const DELTA: f64 = 1000.0 / 60.0; -let (spawner, ticker) = SplitTickedAsyncExecutor::default(); +let (spawner, mut ticker) = SplitTickedAsyncExecutor::default(); spawner.spawn_local("MyIdentifier", async move {}).detach(); @@ -45,7 +47,7 @@ use ticked_async_executor::*; const DELTA: f64 = 1000.0 / 60.0; -let executor = TickedAsyncExecutor::default(); +let mut executor = TickedAsyncExecutor::default(); executor.spawn_local("MyIdentifier1", async move {}).detach(); executor.spawn_local("MyIdentifier2", async move {}).detach(); diff --git a/src/lib.rs b/src/lib.rs index 2aeddef..695d86e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,5 +12,7 @@ pub use split_ticked_async_executor::*; mod ticked_async_executor; pub use ticked_async_executor::*; -mod ticked_timer; -pub use ticked_timer::*; +#[cfg(feature = "tick_event")] +mod ticked_timer_from_tick_event; +#[cfg(feature = "tick_event")] +pub use ticked_timer_from_tick_event::*; diff --git a/src/split_ticked_async_executor.rs b/src/split_ticked_async_executor.rs index 5371057..19396cd 100644 --- a/src/split_ticked_async_executor.rs +++ b/src/split_ticked_async_executor.rs @@ -6,7 +6,7 @@ use std::{ }, }; -use crate::{DroppableFuture, TaskIdentifier, TickedTimer}; +use crate::{DroppableFuture, TaskIdentifier}; #[derive(Debug)] pub enum TaskState { @@ -36,20 +36,25 @@ impl SplitTickedAsyncExecutor { let (tx_channel, rx_channel) = mpsc::channel(); let num_woken_tasks = Arc::new(AtomicUsize::new(0)); let num_spawned_tasks = Arc::new(AtomicUsize::new(0)); - let (tx_tick_event, rx_tick_event) = tokio::sync::watch::channel(1.0); + + #[cfg(feature = "tick_event")] + let (tick_event_tx, tick_event_rx) = tokio::sync::watch::channel(1.0); + let spawner = TickedAsyncExecutorSpawner { tx_channel, num_woken_tasks: num_woken_tasks.clone(), num_spawned_tasks: num_spawned_tasks.clone(), observer: observer.clone(), - rx_tick_event, + #[cfg(feature = "tick_event")] + tick_event_rx, }; let ticker = TickedAsyncExecutorTicker { rx_channel, num_woken_tasks, num_spawned_tasks, observer, - tx_tick_event, + #[cfg(feature = "tick_event")] + tick_event_tx, }; (spawner, ticker) } @@ -64,7 +69,9 @@ pub struct TickedAsyncExecutorSpawner { // Broadcast recv channel should be notified when there are new messages in the queue // Broadcast channel must also be able to remove older/stale messages (like a RingBuffer) observer: O, - rx_tick_event: tokio::sync::watch::Receiver, + + #[cfg(feature = "tick_event")] + tick_event_rx: tokio::sync::watch::Receiver, } impl TickedAsyncExecutorSpawner @@ -87,13 +94,15 @@ where task } - pub fn create_timer(&self) -> TickedTimer { - let tick_recv = self.rx_tick_event.clone(); - TickedTimer::new(tick_recv) + #[cfg(feature = "tick_event")] + pub fn create_timer(&self) -> crate::TickedTimerFromTickEvent { + let tick_recv = self.tick_event_rx.clone(); + crate::TickedTimerFromTickEvent::new(tick_recv) } + #[cfg(feature = "tick_event")] pub fn tick_channel(&self) -> tokio::sync::watch::Receiver { - self.rx_tick_event.clone() + self.tick_event_rx.clone() } pub fn num_tasks(&self) -> usize { @@ -139,15 +148,18 @@ pub struct TickedAsyncExecutorTicker { num_woken_tasks: Arc, num_spawned_tasks: Arc, observer: O, - tx_tick_event: tokio::sync::watch::Sender, + + #[cfg(feature = "tick_event")] + tick_event_tx: tokio::sync::watch::Sender, } impl TickedAsyncExecutorTicker where O: Fn(TaskState), { - pub fn tick(&self, delta: f64, limit: Option) { - let _r = self.tx_tick_event.send(delta); + pub fn tick(&mut self, delta: f64, limit: Option) { + #[cfg(feature = "tick_event")] + let _r = self.tick_event_tx.send(delta); let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); if let Some(limit) = limit { @@ -166,7 +178,7 @@ where .fetch_sub(num_woken_tasks, Ordering::Relaxed); } - pub fn wait_till_completed(&self, constant_delta: f64) { + pub fn wait_till_completed(&mut self, constant_delta: f64) { while self.num_spawned_tasks.load(Ordering::Relaxed) != 0 { self.tick(constant_delta, None); } diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index a014246..9d9a29e 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -2,7 +2,7 @@ use std::future::Future; use crate::{ SplitTickedAsyncExecutor, Task, TaskIdentifier, TaskState, TickedAsyncExecutorSpawner, - TickedAsyncExecutorTicker, TickedTimer, + TickedAsyncExecutorTicker, }; pub struct TickedAsyncExecutor { @@ -53,19 +53,21 @@ where /// Tick is !Sync i.e cannot be invoked from multiple threads /// /// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run` - pub fn tick(&self, delta: f64, limit: Option) { + pub fn tick(&mut self, delta: f64, limit: Option) { self.ticker.tick(delta, limit); } - pub fn create_timer(&self) -> TickedTimer { + #[cfg(feature = "tick_event")] + pub fn create_timer(&self) -> crate::TickedTimerFromTickEvent { self.spawner.create_timer() } + #[cfg(feature = "tick_event")] pub fn tick_channel(&self) -> tokio::sync::watch::Receiver { self.spawner.tick_channel() } - pub fn wait_till_completed(&self, delta: f64) { + pub fn wait_till_completed(&mut self, delta: f64) { self.ticker.wait_till_completed(delta); } } @@ -82,7 +84,7 @@ mod tests { const DELTA: f64 = 1.0 / 60.0; const LIMIT: Option = None; - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); executor.spawn_local("MyIdentifier", async move {}).detach(); @@ -93,7 +95,7 @@ mod tests { #[test] fn test_multiple_tasks() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); executor .spawn_local("A", async move { tokio::task::yield_now().await; @@ -115,7 +117,7 @@ mod tests { #[test] fn test_task_cancellation() { - let executor = TickedAsyncExecutor::new(|_state| println!("{_state:?}")); + let mut executor = TickedAsyncExecutor::new(|_state| println!("{_state:?}")); let task1 = executor.spawn_local("A", async move { loop { tokio::task::yield_now().await; @@ -143,9 +145,10 @@ mod tests { executor.wait_till_completed(DELTA); } + #[cfg(feature = "tick_event")] #[test] fn test_ticked_timer() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); for _ in 0..10 { let timer = executor.create_timer(); @@ -219,7 +222,7 @@ mod tests { #[test] fn test_limit() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); for i in 0..10 { executor .spawn_local(format!("{i}"), async move { diff --git a/src/ticked_timer.rs b/src/ticked_timer_from_tick_event.rs similarity index 54% rename from src/ticked_timer.rs rename to src/ticked_timer_from_tick_event.rs index ed108fe..87f242e 100644 --- a/src/ticked_timer.rs +++ b/src/ticked_timer_from_tick_event.rs @@ -1,21 +1,21 @@ -pub struct TickedTimer { - tick_recv: tokio::sync::watch::Receiver, +pub struct TickedTimerFromTickEvent { + tick_event_rx: tokio::sync::watch::Receiver, } -impl TickedTimer { - pub fn new(tick_recv: tokio::sync::watch::Receiver) -> Self { - Self { tick_recv } +impl TickedTimerFromTickEvent { + pub fn new(tick_event_rx: tokio::sync::watch::Receiver) -> Self { + Self { tick_event_rx } } pub async fn sleep_for(mut self, mut duration_in_ms: f64) { loop { - let _r = self.tick_recv.changed().await; + let _r = self.tick_event_rx.changed().await; if _r.is_err() { // This means that the executor supplying the delta channel has shutdown // We must stop waiting gracefully break; } - let current_dt = *self.tick_recv.borrow_and_update(); + let current_dt = *self.tick_event_rx.borrow_and_update(); duration_in_ms -= current_dt; if duration_in_ms <= 0.0 { break; diff --git a/tests/tokio_tests.rs b/tests/tokio_tests.rs index 1b2be1b..05c2ac7 100644 --- a/tests/tokio_tests.rs +++ b/tests/tokio_tests.rs @@ -4,7 +4,7 @@ const DELTA: f64 = 1000.0 / 60.0; #[test] fn test_tokio_join() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (tx2, mut rx2) = tokio::sync::mpsc::channel::(1); @@ -41,7 +41,7 @@ fn test_tokio_join() { #[test] fn test_tokio_select() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (_tx2, mut rx2) = tokio::sync::mpsc::channel::(1); From c37135fc7ce9769ca7aead41b3ff92125edeabfc Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 00:06:28 -0700 Subject: [PATCH 02/13] Minor update --- src/ticked_async_executor.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index 9d9a29e..e95f41b 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -75,7 +75,6 @@ where #[cfg(test)] mod tests { use super::*; - use std::time::{Duration, Instant}; const DELTA: f64 = 1000.0 / 60.0; @@ -148,6 +147,8 @@ mod tests { #[cfg(feature = "tick_event")] #[test] fn test_ticked_timer() { + use std::time::{Duration, Instant}; + let mut executor = TickedAsyncExecutor::default(); for _ in 0..10 { From c8ff8c6d4341fffdcbe7fe0a8e5b1ff15ba0758d Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 00:09:17 -0700 Subject: [PATCH 03/13] Updated naming from `create_timer` to `create_timer_from_tick_event` --- src/split_ticked_async_executor.rs | 2 +- src/ticked_async_executor.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/split_ticked_async_executor.rs b/src/split_ticked_async_executor.rs index 19396cd..188a9f1 100644 --- a/src/split_ticked_async_executor.rs +++ b/src/split_ticked_async_executor.rs @@ -95,7 +95,7 @@ where } #[cfg(feature = "tick_event")] - pub fn create_timer(&self) -> crate::TickedTimerFromTickEvent { + pub fn create_timer_from_tick_event(&self) -> crate::TickedTimerFromTickEvent { let tick_recv = self.tick_event_rx.clone(); crate::TickedTimerFromTickEvent::new(tick_recv) } diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index e95f41b..f40a443 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -58,8 +58,8 @@ where } #[cfg(feature = "tick_event")] - pub fn create_timer(&self) -> crate::TickedTimerFromTickEvent { - self.spawner.create_timer() + pub fn create_timer_from_tick_event(&self) -> crate::TickedTimerFromTickEvent { + self.spawner.create_timer_from_tick_event() } #[cfg(feature = "tick_event")] @@ -152,7 +152,7 @@ mod tests { let mut executor = TickedAsyncExecutor::default(); for _ in 0..10 { - let timer = executor.create_timer(); + let timer = executor.create_timer_from_tick_event(); executor .spawn_local("LocalTimer", async move { timer.sleep_for(256.0).await; @@ -178,14 +178,14 @@ mod tests { ); // Test Timer cancellation - let timer = executor.create_timer(); + let timer = executor.create_timer_from_tick_event(); executor .spawn_local("LocalFuture1", async move { timer.sleep_for(1000.0).await; }) .detach(); - let timer = executor.create_timer(); + let timer = executor.create_timer_from_tick_event(); executor .spawn_local("LocalFuture2", async move { timer.sleep_for(1000.0).await; From f6fb0bbca16dcf049e17b955e1741c5f3c13e710 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 00:14:16 -0700 Subject: [PATCH 04/13] Added benchmark --- Cargo.toml | 5 +++++ benches/benchmark.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 benches/benchmark.rs diff --git a/Cargo.toml b/Cargo.toml index ed90d56..a0edb6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,8 @@ tokio = { version = "1", default-features = false, features = ["sync"] } [dev-dependencies] tokio = { version = "1", features = ["full"] } +criterion = "0.5" + +[[bench]] +name = "benchmark" +harness = false diff --git a/benches/benchmark.rs b/benches/benchmark.rs new file mode 100644 index 0000000..35a51c3 --- /dev/null +++ b/benches/benchmark.rs @@ -0,0 +1,28 @@ +use criterion::{criterion_group, criterion_main, Criterion}; + +use ticked_async_executor::TickedAsyncExecutor; + +fn spawn_tasks_benchmark(c: &mut Criterion) { + c.bench_function("1 task", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + executor.spawn_local("empty", async move {}).detach(); + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("2 tasks", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + executor.spawn_local("empty1", async move {}).detach(); + executor.spawn_local("empty2", async move {}).detach(); + + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); +} + +criterion_group!(benches, spawn_tasks_benchmark); +criterion_main!(benches); From 9b9dda4340cdc7d70940b1b17255c50b65c075bb Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 00:35:00 -0700 Subject: [PATCH 05/13] Added criterion benchmark --- benches/benchmark.rs | 88 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 3 deletions(-) diff --git a/benches/benchmark.rs b/benches/benchmark.rs index 35a51c3..ff0c40d 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -2,8 +2,15 @@ use criterion::{criterion_group, criterion_main, Criterion}; use ticked_async_executor::TickedAsyncExecutor; +fn ticked_async_executor_benchmark(c: &mut Criterion) { + spawn_tasks_benchmark(c); + + #[cfg(feature = "tick_event")] + timer_from_tick_event_benchmark(c); +} + fn spawn_tasks_benchmark(c: &mut Criterion) { - c.bench_function("1 task", |b| { + c.bench_function("Spawn 1 task", |b| { b.iter_with_large_drop(|| { let mut executor = TickedAsyncExecutor::default(); executor.spawn_local("empty", async move {}).detach(); @@ -12,7 +19,7 @@ fn spawn_tasks_benchmark(c: &mut Criterion) { }); }); - c.bench_function("2 tasks", |b| { + c.bench_function("Spawn 2 tasks", |b| { b.iter_with_large_drop(|| { let mut executor = TickedAsyncExecutor::default(); executor.spawn_local("empty1", async move {}).detach(); @@ -22,7 +29,82 @@ fn spawn_tasks_benchmark(c: &mut Criterion) { assert_eq!(executor.num_tasks(), 0); }); }); + + c.bench_function("Spawn 100 tasks", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + for _ in 0..100 { + executor.spawn_local("_", async move {}).detach(); + } + + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 1000 tasks", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + for _ in 0..1000 { + executor.spawn_local("_", async move {}).detach(); + } + + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 10000 tasks", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + for _ in 0..10000 { + executor.spawn_local("_", async move {}).detach(); + } + + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); +} + +#[cfg(feature = "tick_event")] +fn timer_from_tick_event_benchmark(c: &mut Criterion) { + c.bench_function("Spawn 1 timer from tick event", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + let timer = executor.create_timer_from_tick_event(); + + executor + .spawn_local("empty", async move { + timer.sleep_for(1.0).await; + }) + .detach(); + + executor.wait_till_completed(0.1); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 1000 timers from tick event", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + + for _ in 0..1000 { + let timer = executor.create_timer_from_tick_event(); + executor + .spawn_local("empty", async move { + timer.sleep_for(1.0).await; + }) + .detach(); + } + + for _ in 0..11 { + executor.tick(0.1, None); + } + assert_eq!(executor.num_tasks(), 0); + }); + }); } -criterion_group!(benches, spawn_tasks_benchmark); +criterion_group!(benches, ticked_async_executor_benchmark); criterion_main!(benches); From 412eadb7f892457149e28f5d0f48a80baa052e73 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 00:41:56 -0700 Subject: [PATCH 06/13] Naming updates --- src/split_ticked_async_executor.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/split_ticked_async_executor.rs b/src/split_ticked_async_executor.rs index 188a9f1..9cb1fb5 100644 --- a/src/split_ticked_async_executor.rs +++ b/src/split_ticked_async_executor.rs @@ -33,7 +33,7 @@ impl SplitTickedAsyncExecutor { where O: Fn(TaskState) + Clone + Send + Sync + 'static, { - let (tx_channel, rx_channel) = mpsc::channel(); + let (task_tx, task_rx) = mpsc::channel(); let num_woken_tasks = Arc::new(AtomicUsize::new(0)); let num_spawned_tasks = Arc::new(AtomicUsize::new(0)); @@ -41,7 +41,7 @@ impl SplitTickedAsyncExecutor { let (tick_event_tx, tick_event_rx) = tokio::sync::watch::channel(1.0); let spawner = TickedAsyncExecutorSpawner { - tx_channel, + task_tx, num_woken_tasks: num_woken_tasks.clone(), num_spawned_tasks: num_spawned_tasks.clone(), observer: observer.clone(), @@ -49,7 +49,7 @@ impl SplitTickedAsyncExecutor { tick_event_rx, }; let ticker = TickedAsyncExecutorTicker { - rx_channel, + task_rx, num_woken_tasks, num_spawned_tasks, observer, @@ -61,7 +61,7 @@ impl SplitTickedAsyncExecutor { } pub struct TickedAsyncExecutorSpawner { - tx_channel: mpsc::Sender, + task_tx: mpsc::Sender, num_woken_tasks: Arc, num_spawned_tasks: Arc, @@ -132,7 +132,7 @@ where } fn runnable_schedule_cb(&self, identifier: TaskIdentifier) -> impl Fn(async_task::Runnable) { - let sender = self.tx_channel.clone(); + let sender = self.task_tx.clone(); let num_woken_tasks = self.num_woken_tasks.clone(); let observer = self.observer.clone(); move |runnable| { @@ -144,7 +144,7 @@ where } pub struct TickedAsyncExecutorTicker { - rx_channel: mpsc::Receiver, + task_rx: mpsc::Receiver, num_woken_tasks: Arc, num_spawned_tasks: Arc, observer: O, @@ -167,7 +167,7 @@ where num_woken_tasks = num_woken_tasks.min(limit); } - self.rx_channel + self.task_rx .try_iter() .take(num_woken_tasks) .for_each(|(identifier, runnable)| { From 641869dba85e7a3dcd9a95ae0dbd1f0de42dc3b4 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 01:17:55 -0700 Subject: [PATCH 07/13] Added feature timer_registration --- Cargo.toml | 9 ++++ src/lib.rs | 5 +++ src/split_ticked_async_executor.rs | 47 ++++++++++++++++++++- src/ticked_async_executor.rs | 41 +++++++++++++++++- src/ticked_timer_from_timer_registration.rs | 27 ++++++++++++ 5 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 src/ticked_timer_from_timer_registration.rs diff --git a/Cargo.toml b/Cargo.toml index a0edb6b..404b36b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,17 @@ categories = ["asynchronous", "concurrency", "game-development", "simulation"] readme = "README.md" [features] +# Provides a tick event in the form of a `tokio::sync::watch::Receiver` as per the `delta` provided +# to the `TickedAsyncExecutorTicker::tick` API +# Also provides a timer implementation: `TickedTimerFromTickEvent` tick_event = [] +# Timers can be registered with the `TickedAsyncExecutorTicker` via the `TickedAsyncExecutorSpawner` +# The timers count down with every call to `TickedAsyncExecutorTicker::tick` API as per the delta provided +# Once the timer has elapsed, the corresponding `tokio::sync::oneshot::*` channel is notified +# Also provides a timer implementation: `TickedTimerFromTimerRegistration` +timer_registration = [] + [dependencies] async-task = "4.7" pin-project = "1" diff --git a/src/lib.rs b/src/lib.rs index 695d86e..8c786da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,3 +16,8 @@ pub use ticked_async_executor::*; mod ticked_timer_from_tick_event; #[cfg(feature = "tick_event")] pub use ticked_timer_from_tick_event::*; + +#[cfg(feature = "timer_registration")] +mod ticked_timer_from_timer_registration; +#[cfg(feature = "timer_registration")] +pub use ticked_timer_from_timer_registration::*; diff --git a/src/split_ticked_async_executor.rs b/src/split_ticked_async_executor.rs index 9cb1fb5..6ae2153 100644 --- a/src/split_ticked_async_executor.rs +++ b/src/split_ticked_async_executor.rs @@ -40,6 +40,9 @@ impl SplitTickedAsyncExecutor { #[cfg(feature = "tick_event")] let (tick_event_tx, tick_event_rx) = tokio::sync::watch::channel(1.0); + #[cfg(feature = "timer_registration")] + let (timer_registration_tx, timer_registration_rx) = mpsc::channel(); + let spawner = TickedAsyncExecutorSpawner { task_tx, num_woken_tasks: num_woken_tasks.clone(), @@ -47,6 +50,8 @@ impl SplitTickedAsyncExecutor { observer: observer.clone(), #[cfg(feature = "tick_event")] tick_event_rx, + #[cfg(feature = "timer_registration")] + timer_registration_tx, }; let ticker = TickedAsyncExecutorTicker { task_rx, @@ -55,6 +60,10 @@ impl SplitTickedAsyncExecutor { observer, #[cfg(feature = "tick_event")] tick_event_tx, + #[cfg(feature = "timer_registration")] + timer_registration_rx, + #[cfg(feature = "timer_registration")] + timers: Vec::new(), }; (spawner, ticker) } @@ -72,6 +81,8 @@ pub struct TickedAsyncExecutorSpawner { #[cfg(feature = "tick_event")] tick_event_rx: tokio::sync::watch::Receiver, + #[cfg(feature = "timer_registration")] + timer_registration_tx: mpsc::Sender<(f64, tokio::sync::oneshot::Sender<()>)>, } impl TickedAsyncExecutorSpawner @@ -96,8 +107,7 @@ where #[cfg(feature = "tick_event")] pub fn create_timer_from_tick_event(&self) -> crate::TickedTimerFromTickEvent { - let tick_recv = self.tick_event_rx.clone(); - crate::TickedTimerFromTickEvent::new(tick_recv) + crate::TickedTimerFromTickEvent::new(self.tick_event_rx.clone()) } #[cfg(feature = "tick_event")] @@ -105,6 +115,11 @@ where self.tick_event_rx.clone() } + #[cfg(feature = "timer_registration")] + pub fn create_timer_from_timer_registration(&self) -> crate::TickedTimerFromTimerRegistration { + crate::TickedTimerFromTimerRegistration::new(self.timer_registration_tx.clone()) + } + pub fn num_tasks(&self) -> usize { self.num_spawned_tasks.load(Ordering::Relaxed) } @@ -151,6 +166,11 @@ pub struct TickedAsyncExecutorTicker { #[cfg(feature = "tick_event")] tick_event_tx: tokio::sync::watch::Sender, + + #[cfg(feature = "timer_registration")] + timer_registration_rx: mpsc::Receiver<(f64, tokio::sync::oneshot::Sender<()>)>, + #[cfg(feature = "timer_registration")] + timers: Vec<(f64, tokio::sync::oneshot::Sender<()>)>, } impl TickedAsyncExecutorTicker @@ -161,6 +181,9 @@ where #[cfg(feature = "tick_event")] let _r = self.tick_event_tx.send(delta); + #[cfg(feature = "timer_registration")] + self.timer_registration_tick(delta); + let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); if let Some(limit) = limit { // Woken tasks should not exceed the allowed limit @@ -183,4 +206,24 @@ where self.tick(constant_delta, None); } } + + #[cfg(feature = "timer_registration")] + fn timer_registration_tick(&mut self, delta: f64) { + // Get new timers + let mut new_timers = self.timer_registration_rx.try_iter().collect::>(); + self.timers.append(&mut new_timers); + + // Countdown timers + self.timers.iter_mut().for_each(|(elapsed, _)| { + *elapsed -= delta; + }); + + // Extract timers that have elapsed + // Notify corresponding channels + self.timers + .extract_if(.., |(elapsed, _)| *elapsed <= 0.0) + .for_each(|(_, rx)| { + let _ignore = rx.send(()); + }); + } } diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index f40a443..df683dc 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -67,6 +67,11 @@ where self.spawner.tick_channel() } + #[cfg(feature = "timer_registration")] + pub fn create_timer_from_timer_registration(&self) -> crate::TickedTimerFromTimerRegistration { + self.spawner.create_timer_from_timer_registration() + } + pub fn wait_till_completed(&mut self, delta: f64) { self.ticker.wait_till_completed(delta); } @@ -146,7 +151,7 @@ mod tests { #[cfg(feature = "tick_event")] #[test] - fn test_ticked_timer() { + fn test_ticked_timer_from_tick_event() { use std::time::{Duration, Instant}; let mut executor = TickedAsyncExecutor::default(); @@ -221,6 +226,40 @@ mod tests { drop(executor); } + #[cfg(feature = "timer_registration")] + #[test] + fn test_ticked_timer_from_timer_registration() { + use std::time::{Duration, Instant}; + + let mut executor = TickedAsyncExecutor::default(); + + for _ in 0..10 { + let timer = executor.create_timer_from_timer_registration(); + executor + .spawn_local("LocalTimer", async move { + timer.sleep_for(256.0).await; + }) + .detach(); + } + + let now = Instant::now(); + let mut instances = vec![]; + while executor.num_tasks() != 0 { + let current = Instant::now(); + executor.tick(DELTA, None); + instances.push(current.elapsed()); + std::thread::sleep(Duration::from_millis(16)); + } + let elapsed = now.elapsed(); + println!("Elapsed: {:?}", elapsed); + println!("Total: {:?}", instances); + println!( + "Min: {:?}, Max: {:?}", + instances.iter().min(), + instances.iter().max() + ); + } + #[test] fn test_limit() { let mut executor = TickedAsyncExecutor::default(); diff --git a/src/ticked_timer_from_timer_registration.rs b/src/ticked_timer_from_timer_registration.rs new file mode 100644 index 0000000..b3da528 --- /dev/null +++ b/src/ticked_timer_from_timer_registration.rs @@ -0,0 +1,27 @@ +use std::sync::mpsc; + +pub struct TickedTimerFromTimerRegistration { + timer_registration_tx: mpsc::Sender<(f64, tokio::sync::oneshot::Sender<()>)>, +} + +impl TickedTimerFromTimerRegistration { + pub fn new( + timer_registration_tx: mpsc::Sender<(f64, tokio::sync::oneshot::Sender<()>)>, + ) -> Self { + Self { + timer_registration_tx, + } + } + + pub async fn sleep_for(&self, duration_in_ms: f64) { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ignore = async { + self.timer_registration_tx + .send((duration_in_ms, tx)) + .map_err(|_| ())?; + rx.await.map_err(|_| ())?; + Ok::<(), ()>(()) + } + .await; + } +} From 3f62e67807e5594c97f3c37a0007ebac4111f86b Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 01:21:19 -0700 Subject: [PATCH 08/13] Updated CI/CD with test and build feature combinations --- .github/workflows/rust.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index cb1407c..4f850e2 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -28,11 +28,17 @@ jobs: run: | cargo clippy cargo test + cargo test --features tick_event + cargo test --features timer_registration + cargo test --features tick_event,timer_registration - name: Build run: | cargo build cargo build --release + cargo build --features tick_event + cargo build --features timer_registration + cargo build --features tick_event,timer_registration - name: Generate Coverage Report if: ${{ matrix.os == 'ubuntu-latest' }} From 8bb7b3c817348c3e94f6a8132ed320bac00cfe11 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 02:23:45 -0700 Subject: [PATCH 09/13] Updated benchmark code for timer registration --- benches/benchmark.rs | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/benches/benchmark.rs b/benches/benchmark.rs index ff0c40d..4a01782 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -7,6 +7,9 @@ fn ticked_async_executor_benchmark(c: &mut Criterion) { #[cfg(feature = "tick_event")] timer_from_tick_event_benchmark(c); + + #[cfg(feature = "timer_registration")] + timer_from_timer_registration_benchmark(c); } fn spawn_tasks_benchmark(c: &mut Criterion) { @@ -72,8 +75,8 @@ fn timer_from_tick_event_benchmark(c: &mut Criterion) { c.bench_function("Spawn 1 timer from tick event", |b| { b.iter_with_large_drop(|| { let mut executor = TickedAsyncExecutor::default(); - let timer = executor.create_timer_from_tick_event(); + let timer = executor.create_timer_from_tick_event(); executor .spawn_local("empty", async move { timer.sleep_for(1.0).await; @@ -98,9 +101,44 @@ fn timer_from_tick_event_benchmark(c: &mut Criterion) { .detach(); } - for _ in 0..11 { - executor.tick(0.1, None); + executor.wait_till_completed(0.1); + assert_eq!(executor.num_tasks(), 0); + }); + }); +} + +#[cfg(feature = "timer_registration")] +fn timer_from_timer_registration_benchmark(c: &mut Criterion) { + c.bench_function("Spawn 1 timer from timer registration", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + + let timer = executor.create_timer_from_timer_registration(); + executor + .spawn_local("empty", async move { + timer.sleep_for(1.0).await; + }) + .detach(); + + executor.wait_till_completed(0.1); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 1000 timers from timer registration", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + + for _ in 0..1000 { + let timer = executor.create_timer_from_timer_registration(); + executor + .spawn_local("empty", async move { + timer.sleep_for(1.0).await; + }) + .detach(); } + + executor.wait_till_completed(0.1); assert_eq!(executor.num_tasks(), 0); }); }); From bca2e2c66d6538d9f19072741257a02017f0b2e7 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 02:23:59 -0700 Subject: [PATCH 10/13] Updated timer_registration_tick code --- src/split_ticked_async_executor.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/split_ticked_async_executor.rs b/src/split_ticked_async_executor.rs index 6ae2153..967d883 100644 --- a/src/split_ticked_async_executor.rs +++ b/src/split_ticked_async_executor.rs @@ -210,10 +210,14 @@ where #[cfg(feature = "timer_registration")] fn timer_registration_tick(&mut self, delta: f64) { // Get new timers - let mut new_timers = self.timer_registration_rx.try_iter().collect::>(); - self.timers.append(&mut new_timers); + self.timer_registration_rx.try_iter().for_each(|timer| { + self.timers.push(timer); + }); // Countdown timers + if self.timers.is_empty() { + return; + } self.timers.iter_mut().for_each(|(elapsed, _)| { *elapsed -= delta; }); From ab2b7a20017aac5c86a635022e0fb69709ddb44d Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 19:38:15 -0700 Subject: [PATCH 11/13] Updated CI/CD --- .github/workflows/rust.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4f850e2..ed26683 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -31,7 +31,14 @@ jobs: cargo test --features tick_event cargo test --features timer_registration cargo test --features tick_event,timer_registration - + + - name: Bench + run: | + cargo bench + cargo bench --features tick_event + cargo bench --features timer_registration + cargo bench --features tick_event,timer_registration + - name: Build run: | cargo build @@ -43,7 +50,7 @@ jobs: - name: Generate Coverage Report if: ${{ matrix.os == 'ubuntu-latest' }} run: | - cargo tarpaulin --engine llvm --out xml --output-dir target + cargo tarpaulin --engine llvm --out xml --output-dir target --all-features - name: Upload coverage reports to Codecov if: ${{ matrix.os == 'ubuntu-latest' }} From c6cf3a26aa0c274d3b0aa74c95c30d68acf393cb Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 19:45:30 -0700 Subject: [PATCH 12/13] Added rust-toolchain to 1.87 --- .github/workflows/rust.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ed26683..2d2d629 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -17,7 +17,10 @@ jobs: os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v4 + - name: Checkout + uses: actions/checkout@v4 + - name: Rust Toolchain Setup + uses: dtolnay/rust-toolchain@1.87.0 - name: Install if: ${{ matrix.os == 'ubuntu-latest' }} From 047c1b253759c781d89490a3e08195db939a9d68 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 16 May 2025 19:48:06 -0700 Subject: [PATCH 13/13] Install cargo clippy --- .github/workflows/rust.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 2d2d629..5f65335 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -29,6 +29,7 @@ jobs: - name: Test run: | + rustup component add clippy cargo clippy cargo test cargo test --features tick_event