diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index cb1407c..5f65335 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -17,7 +17,10 @@ jobs: os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v4 + - name: Checkout + uses: actions/checkout@v4 + - name: Rust Toolchain Setup + uses: dtolnay/rust-toolchain@1.87.0 - name: Install if: ${{ matrix.os == 'ubuntu-latest' }} @@ -26,18 +29,32 @@ jobs: - name: Test run: | + rustup component add clippy cargo clippy cargo test - + cargo test --features tick_event + cargo test --features timer_registration + cargo test --features tick_event,timer_registration + + - name: Bench + run: | + cargo bench + cargo bench --features tick_event + cargo bench --features timer_registration + cargo bench --features tick_event,timer_registration + - name: Build run: | cargo build cargo build --release + cargo build --features tick_event + cargo build --features timer_registration + cargo build --features tick_event,timer_registration - name: Generate Coverage Report if: ${{ matrix.os == 'ubuntu-latest' }} run: | - cargo tarpaulin --engine llvm --out xml --output-dir target + cargo tarpaulin --engine llvm --out xml --output-dir target --all-features - name: Upload coverage reports to Codecov if: ${{ matrix.os == 'ubuntu-latest' }} diff --git a/Cargo.toml b/Cargo.toml index 09821bb..404b36b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ticked_async_executor" -version = "0.2.1" +version = "0.3.0" authors = ["coder137"] edition = "2021" description = "Local executor that runs woken async tasks when it is ticked" @@ -9,6 +9,18 @@ repository = "https://github.com/coder137/ticked-async-executor" categories = ["asynchronous", "concurrency", "game-development", "simulation"] readme = "README.md" +[features] +# Provides a tick event in the form of a `tokio::sync::watch::Receiver` as per the `delta` provided +# to the `TickedAsyncExecutorTicker::tick` API +# Also provides a timer implementation: `TickedTimerFromTickEvent` +tick_event = [] + +# Timers can be registered with the `TickedAsyncExecutorTicker` via the `TickedAsyncExecutorSpawner` +# The timers count down with every call to `TickedAsyncExecutorTicker::tick` API as per the delta provided +# Once the timer has elapsed, the corresponding `tokio::sync::oneshot::*` channel is notified +# Also provides a timer implementation: `TickedTimerFromTimerRegistration` +timer_registration = [] + [dependencies] async-task = "4.7" pin-project = "1" @@ -16,3 +28,8 @@ tokio = { version = "1", default-features = false, features = ["sync"] } [dev-dependencies] tokio = { version = "1", features = ["full"] } +criterion = "0.5" + +[[bench]] +name = "benchmark" +harness = false diff --git a/README.md b/README.md index b433167..9fb83b8 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Async Local Executor which executes woken tasks only when it is ticked +MSRV: 1.87 + # Usage ## Default Local Executor @@ -11,7 +13,7 @@ use ticked_async_executor::*; const DELTA: f64 = 1000.0 / 60.0; -let executor = TickedAsyncExecutor::default(); +let mut executor = TickedAsyncExecutor::default(); executor.spawn_local("MyIdentifier", async move {}).detach(); @@ -28,7 +30,7 @@ use ticked_async_executor::*; const DELTA: f64 = 1000.0 / 60.0; -let (spawner, ticker) = SplitTickedAsyncExecutor::default(); +let (spawner, mut ticker) = SplitTickedAsyncExecutor::default(); spawner.spawn_local("MyIdentifier", async move {}).detach(); @@ -45,7 +47,7 @@ use ticked_async_executor::*; const DELTA: f64 = 1000.0 / 60.0; -let executor = TickedAsyncExecutor::default(); +let mut executor = TickedAsyncExecutor::default(); executor.spawn_local("MyIdentifier1", async move {}).detach(); executor.spawn_local("MyIdentifier2", async move {}).detach(); diff --git a/benches/benchmark.rs b/benches/benchmark.rs new file mode 100644 index 0000000..4a01782 --- /dev/null +++ b/benches/benchmark.rs @@ -0,0 +1,148 @@ +use criterion::{criterion_group, criterion_main, Criterion}; + +use ticked_async_executor::TickedAsyncExecutor; + +fn ticked_async_executor_benchmark(c: &mut Criterion) { + spawn_tasks_benchmark(c); + + #[cfg(feature = "tick_event")] + timer_from_tick_event_benchmark(c); + + #[cfg(feature = "timer_registration")] + timer_from_timer_registration_benchmark(c); +} + +fn spawn_tasks_benchmark(c: &mut Criterion) { + c.bench_function("Spawn 1 task", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + executor.spawn_local("empty", async move {}).detach(); + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 2 tasks", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + executor.spawn_local("empty1", async move {}).detach(); + executor.spawn_local("empty2", async move {}).detach(); + + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 100 tasks", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + for _ in 0..100 { + executor.spawn_local("_", async move {}).detach(); + } + + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 1000 tasks", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + for _ in 0..1000 { + executor.spawn_local("_", async move {}).detach(); + } + + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 10000 tasks", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + for _ in 0..10000 { + executor.spawn_local("_", async move {}).detach(); + } + + executor.tick(0.1, None); + assert_eq!(executor.num_tasks(), 0); + }); + }); +} + +#[cfg(feature = "tick_event")] +fn timer_from_tick_event_benchmark(c: &mut Criterion) { + c.bench_function("Spawn 1 timer from tick event", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + + let timer = executor.create_timer_from_tick_event(); + executor + .spawn_local("empty", async move { + timer.sleep_for(1.0).await; + }) + .detach(); + + executor.wait_till_completed(0.1); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 1000 timers from tick event", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + + for _ in 0..1000 { + let timer = executor.create_timer_from_tick_event(); + executor + .spawn_local("empty", async move { + timer.sleep_for(1.0).await; + }) + .detach(); + } + + executor.wait_till_completed(0.1); + assert_eq!(executor.num_tasks(), 0); + }); + }); +} + +#[cfg(feature = "timer_registration")] +fn timer_from_timer_registration_benchmark(c: &mut Criterion) { + c.bench_function("Spawn 1 timer from timer registration", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + + let timer = executor.create_timer_from_timer_registration(); + executor + .spawn_local("empty", async move { + timer.sleep_for(1.0).await; + }) + .detach(); + + executor.wait_till_completed(0.1); + assert_eq!(executor.num_tasks(), 0); + }); + }); + + c.bench_function("Spawn 1000 timers from timer registration", |b| { + b.iter_with_large_drop(|| { + let mut executor = TickedAsyncExecutor::default(); + + for _ in 0..1000 { + let timer = executor.create_timer_from_timer_registration(); + executor + .spawn_local("empty", async move { + timer.sleep_for(1.0).await; + }) + .detach(); + } + + executor.wait_till_completed(0.1); + assert_eq!(executor.num_tasks(), 0); + }); + }); +} + +criterion_group!(benches, ticked_async_executor_benchmark); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 2aeddef..8c786da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,5 +12,12 @@ pub use split_ticked_async_executor::*; mod ticked_async_executor; pub use ticked_async_executor::*; -mod ticked_timer; -pub use ticked_timer::*; +#[cfg(feature = "tick_event")] +mod ticked_timer_from_tick_event; +#[cfg(feature = "tick_event")] +pub use ticked_timer_from_tick_event::*; + +#[cfg(feature = "timer_registration")] +mod ticked_timer_from_timer_registration; +#[cfg(feature = "timer_registration")] +pub use ticked_timer_from_timer_registration::*; diff --git a/src/split_ticked_async_executor.rs b/src/split_ticked_async_executor.rs index 5371057..967d883 100644 --- a/src/split_ticked_async_executor.rs +++ b/src/split_ticked_async_executor.rs @@ -6,7 +6,7 @@ use std::{ }, }; -use crate::{DroppableFuture, TaskIdentifier, TickedTimer}; +use crate::{DroppableFuture, TaskIdentifier}; #[derive(Debug)] pub enum TaskState { @@ -33,30 +33,44 @@ impl SplitTickedAsyncExecutor { where O: Fn(TaskState) + Clone + Send + Sync + 'static, { - let (tx_channel, rx_channel) = mpsc::channel(); + let (task_tx, task_rx) = mpsc::channel(); let num_woken_tasks = Arc::new(AtomicUsize::new(0)); let num_spawned_tasks = Arc::new(AtomicUsize::new(0)); - let (tx_tick_event, rx_tick_event) = tokio::sync::watch::channel(1.0); + + #[cfg(feature = "tick_event")] + let (tick_event_tx, tick_event_rx) = tokio::sync::watch::channel(1.0); + + #[cfg(feature = "timer_registration")] + let (timer_registration_tx, timer_registration_rx) = mpsc::channel(); + let spawner = TickedAsyncExecutorSpawner { - tx_channel, + task_tx, num_woken_tasks: num_woken_tasks.clone(), num_spawned_tasks: num_spawned_tasks.clone(), observer: observer.clone(), - rx_tick_event, + #[cfg(feature = "tick_event")] + tick_event_rx, + #[cfg(feature = "timer_registration")] + timer_registration_tx, }; let ticker = TickedAsyncExecutorTicker { - rx_channel, + task_rx, num_woken_tasks, num_spawned_tasks, observer, - tx_tick_event, + #[cfg(feature = "tick_event")] + tick_event_tx, + #[cfg(feature = "timer_registration")] + timer_registration_rx, + #[cfg(feature = "timer_registration")] + timers: Vec::new(), }; (spawner, ticker) } } pub struct TickedAsyncExecutorSpawner { - tx_channel: mpsc::Sender, + task_tx: mpsc::Sender, num_woken_tasks: Arc, num_spawned_tasks: Arc, @@ -64,7 +78,11 @@ pub struct TickedAsyncExecutorSpawner { // Broadcast recv channel should be notified when there are new messages in the queue // Broadcast channel must also be able to remove older/stale messages (like a RingBuffer) observer: O, - rx_tick_event: tokio::sync::watch::Receiver, + + #[cfg(feature = "tick_event")] + tick_event_rx: tokio::sync::watch::Receiver, + #[cfg(feature = "timer_registration")] + timer_registration_tx: mpsc::Sender<(f64, tokio::sync::oneshot::Sender<()>)>, } impl TickedAsyncExecutorSpawner @@ -87,13 +105,19 @@ where task } - pub fn create_timer(&self) -> TickedTimer { - let tick_recv = self.rx_tick_event.clone(); - TickedTimer::new(tick_recv) + #[cfg(feature = "tick_event")] + pub fn create_timer_from_tick_event(&self) -> crate::TickedTimerFromTickEvent { + crate::TickedTimerFromTickEvent::new(self.tick_event_rx.clone()) } + #[cfg(feature = "tick_event")] pub fn tick_channel(&self) -> tokio::sync::watch::Receiver { - self.rx_tick_event.clone() + self.tick_event_rx.clone() + } + + #[cfg(feature = "timer_registration")] + pub fn create_timer_from_timer_registration(&self) -> crate::TickedTimerFromTimerRegistration { + crate::TickedTimerFromTimerRegistration::new(self.timer_registration_tx.clone()) } pub fn num_tasks(&self) -> usize { @@ -123,7 +147,7 @@ where } fn runnable_schedule_cb(&self, identifier: TaskIdentifier) -> impl Fn(async_task::Runnable) { - let sender = self.tx_channel.clone(); + let sender = self.task_tx.clone(); let num_woken_tasks = self.num_woken_tasks.clone(); let observer = self.observer.clone(); move |runnable| { @@ -135,19 +159,30 @@ where } pub struct TickedAsyncExecutorTicker { - rx_channel: mpsc::Receiver, + task_rx: mpsc::Receiver, num_woken_tasks: Arc, num_spawned_tasks: Arc, observer: O, - tx_tick_event: tokio::sync::watch::Sender, + + #[cfg(feature = "tick_event")] + tick_event_tx: tokio::sync::watch::Sender, + + #[cfg(feature = "timer_registration")] + timer_registration_rx: mpsc::Receiver<(f64, tokio::sync::oneshot::Sender<()>)>, + #[cfg(feature = "timer_registration")] + timers: Vec<(f64, tokio::sync::oneshot::Sender<()>)>, } impl TickedAsyncExecutorTicker where O: Fn(TaskState), { - pub fn tick(&self, delta: f64, limit: Option) { - let _r = self.tx_tick_event.send(delta); + pub fn tick(&mut self, delta: f64, limit: Option) { + #[cfg(feature = "tick_event")] + let _r = self.tick_event_tx.send(delta); + + #[cfg(feature = "timer_registration")] + self.timer_registration_tick(delta); let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); if let Some(limit) = limit { @@ -155,7 +190,7 @@ where num_woken_tasks = num_woken_tasks.min(limit); } - self.rx_channel + self.task_rx .try_iter() .take(num_woken_tasks) .for_each(|(identifier, runnable)| { @@ -166,9 +201,33 @@ where .fetch_sub(num_woken_tasks, Ordering::Relaxed); } - pub fn wait_till_completed(&self, constant_delta: f64) { + pub fn wait_till_completed(&mut self, constant_delta: f64) { while self.num_spawned_tasks.load(Ordering::Relaxed) != 0 { self.tick(constant_delta, None); } } + + #[cfg(feature = "timer_registration")] + fn timer_registration_tick(&mut self, delta: f64) { + // Get new timers + self.timer_registration_rx.try_iter().for_each(|timer| { + self.timers.push(timer); + }); + + // Countdown timers + if self.timers.is_empty() { + return; + } + self.timers.iter_mut().for_each(|(elapsed, _)| { + *elapsed -= delta; + }); + + // Extract timers that have elapsed + // Notify corresponding channels + self.timers + .extract_if(.., |(elapsed, _)| *elapsed <= 0.0) + .for_each(|(_, rx)| { + let _ignore = rx.send(()); + }); + } } diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index a014246..df683dc 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -2,7 +2,7 @@ use std::future::Future; use crate::{ SplitTickedAsyncExecutor, Task, TaskIdentifier, TaskState, TickedAsyncExecutorSpawner, - TickedAsyncExecutorTicker, TickedTimer, + TickedAsyncExecutorTicker, }; pub struct TickedAsyncExecutor { @@ -53,19 +53,26 @@ where /// Tick is !Sync i.e cannot be invoked from multiple threads /// /// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run` - pub fn tick(&self, delta: f64, limit: Option) { + pub fn tick(&mut self, delta: f64, limit: Option) { self.ticker.tick(delta, limit); } - pub fn create_timer(&self) -> TickedTimer { - self.spawner.create_timer() + #[cfg(feature = "tick_event")] + pub fn create_timer_from_tick_event(&self) -> crate::TickedTimerFromTickEvent { + self.spawner.create_timer_from_tick_event() } + #[cfg(feature = "tick_event")] pub fn tick_channel(&self) -> tokio::sync::watch::Receiver { self.spawner.tick_channel() } - pub fn wait_till_completed(&self, delta: f64) { + #[cfg(feature = "timer_registration")] + pub fn create_timer_from_timer_registration(&self) -> crate::TickedTimerFromTimerRegistration { + self.spawner.create_timer_from_timer_registration() + } + + pub fn wait_till_completed(&mut self, delta: f64) { self.ticker.wait_till_completed(delta); } } @@ -73,7 +80,6 @@ where #[cfg(test)] mod tests { use super::*; - use std::time::{Duration, Instant}; const DELTA: f64 = 1000.0 / 60.0; @@ -82,7 +88,7 @@ mod tests { const DELTA: f64 = 1.0 / 60.0; const LIMIT: Option = None; - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); executor.spawn_local("MyIdentifier", async move {}).detach(); @@ -93,7 +99,7 @@ mod tests { #[test] fn test_multiple_tasks() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); executor .spawn_local("A", async move { tokio::task::yield_now().await; @@ -115,7 +121,7 @@ mod tests { #[test] fn test_task_cancellation() { - let executor = TickedAsyncExecutor::new(|_state| println!("{_state:?}")); + let mut executor = TickedAsyncExecutor::new(|_state| println!("{_state:?}")); let task1 = executor.spawn_local("A", async move { loop { tokio::task::yield_now().await; @@ -143,12 +149,15 @@ mod tests { executor.wait_till_completed(DELTA); } + #[cfg(feature = "tick_event")] #[test] - fn test_ticked_timer() { - let executor = TickedAsyncExecutor::default(); + fn test_ticked_timer_from_tick_event() { + use std::time::{Duration, Instant}; + + let mut executor = TickedAsyncExecutor::default(); for _ in 0..10 { - let timer = executor.create_timer(); + let timer = executor.create_timer_from_tick_event(); executor .spawn_local("LocalTimer", async move { timer.sleep_for(256.0).await; @@ -174,14 +183,14 @@ mod tests { ); // Test Timer cancellation - let timer = executor.create_timer(); + let timer = executor.create_timer_from_tick_event(); executor .spawn_local("LocalFuture1", async move { timer.sleep_for(1000.0).await; }) .detach(); - let timer = executor.create_timer(); + let timer = executor.create_timer_from_tick_event(); executor .spawn_local("LocalFuture2", async move { timer.sleep_for(1000.0).await; @@ -217,9 +226,43 @@ mod tests { drop(executor); } + #[cfg(feature = "timer_registration")] + #[test] + fn test_ticked_timer_from_timer_registration() { + use std::time::{Duration, Instant}; + + let mut executor = TickedAsyncExecutor::default(); + + for _ in 0..10 { + let timer = executor.create_timer_from_timer_registration(); + executor + .spawn_local("LocalTimer", async move { + timer.sleep_for(256.0).await; + }) + .detach(); + } + + let now = Instant::now(); + let mut instances = vec![]; + while executor.num_tasks() != 0 { + let current = Instant::now(); + executor.tick(DELTA, None); + instances.push(current.elapsed()); + std::thread::sleep(Duration::from_millis(16)); + } + let elapsed = now.elapsed(); + println!("Elapsed: {:?}", elapsed); + println!("Total: {:?}", instances); + println!( + "Min: {:?}, Max: {:?}", + instances.iter().min(), + instances.iter().max() + ); + } + #[test] fn test_limit() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); for i in 0..10 { executor .spawn_local(format!("{i}"), async move { diff --git a/src/ticked_timer.rs b/src/ticked_timer_from_tick_event.rs similarity index 54% rename from src/ticked_timer.rs rename to src/ticked_timer_from_tick_event.rs index ed108fe..87f242e 100644 --- a/src/ticked_timer.rs +++ b/src/ticked_timer_from_tick_event.rs @@ -1,21 +1,21 @@ -pub struct TickedTimer { - tick_recv: tokio::sync::watch::Receiver, +pub struct TickedTimerFromTickEvent { + tick_event_rx: tokio::sync::watch::Receiver, } -impl TickedTimer { - pub fn new(tick_recv: tokio::sync::watch::Receiver) -> Self { - Self { tick_recv } +impl TickedTimerFromTickEvent { + pub fn new(tick_event_rx: tokio::sync::watch::Receiver) -> Self { + Self { tick_event_rx } } pub async fn sleep_for(mut self, mut duration_in_ms: f64) { loop { - let _r = self.tick_recv.changed().await; + let _r = self.tick_event_rx.changed().await; if _r.is_err() { // This means that the executor supplying the delta channel has shutdown // We must stop waiting gracefully break; } - let current_dt = *self.tick_recv.borrow_and_update(); + let current_dt = *self.tick_event_rx.borrow_and_update(); duration_in_ms -= current_dt; if duration_in_ms <= 0.0 { break; diff --git a/src/ticked_timer_from_timer_registration.rs b/src/ticked_timer_from_timer_registration.rs new file mode 100644 index 0000000..b3da528 --- /dev/null +++ b/src/ticked_timer_from_timer_registration.rs @@ -0,0 +1,27 @@ +use std::sync::mpsc; + +pub struct TickedTimerFromTimerRegistration { + timer_registration_tx: mpsc::Sender<(f64, tokio::sync::oneshot::Sender<()>)>, +} + +impl TickedTimerFromTimerRegistration { + pub fn new( + timer_registration_tx: mpsc::Sender<(f64, tokio::sync::oneshot::Sender<()>)>, + ) -> Self { + Self { + timer_registration_tx, + } + } + + pub async fn sleep_for(&self, duration_in_ms: f64) { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ignore = async { + self.timer_registration_tx + .send((duration_in_ms, tx)) + .map_err(|_| ())?; + rx.await.map_err(|_| ())?; + Ok::<(), ()>(()) + } + .await; + } +} diff --git a/tests/tokio_tests.rs b/tests/tokio_tests.rs index 1b2be1b..05c2ac7 100644 --- a/tests/tokio_tests.rs +++ b/tests/tokio_tests.rs @@ -4,7 +4,7 @@ const DELTA: f64 = 1000.0 / 60.0; #[test] fn test_tokio_join() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (tx2, mut rx2) = tokio::sync::mpsc::channel::(1); @@ -41,7 +41,7 @@ fn test_tokio_join() { #[test] fn test_tokio_select() { - let executor = TickedAsyncExecutor::default(); + let mut executor = TickedAsyncExecutor::default(); let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (_tx2, mut rx2) = tokio::sync::mpsc::channel::(1);