From eb3f94db023a9d737375a8d18f496bdd06a8bdfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 15 Jan 2025 17:14:22 +0100 Subject: [PATCH 01/10] Avoid sleeping in test --- crates/services/src/async_processor.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index db929176483..3ddbd8182f7 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -126,23 +126,25 @@ mod tests { }; use tokio::time::Instant; - #[test] - fn one_spawn_single_tasks_works() { + #[tokio::test] + async fn one_spawn_single_tasks_works() { // Given let number_of_pending_tasks = 1; let heavy_task_processor = AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); // When - let (sender, mut receiver) = tokio::sync::oneshot::channel(); + let (sender, receiver) = tokio::sync::oneshot::channel(); let result = heavy_task_processor.try_spawn(async move { sender.send(()).unwrap(); }); // Then result.expect("Expected Ok result"); - sleep(Duration::from_secs(1)); - receiver.try_recv().unwrap(); + tokio::time::timeout(Duration::from_secs(5), receiver) + .await + .unwrap() + .unwrap(); } #[tokio::test] From ea7519b3cef27a401ff07a6cec0fbd04b54a1e72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 15 Jan 2025 17:20:04 +0100 Subject: [PATCH 02/10] Update assertions --- crates/services/src/async_processor.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index 3ddbd8182f7..6f7d936e491 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -150,10 +150,10 @@ mod tests { #[tokio::test] async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() { // Given - let number_of_threads = 10; + let max_number_of_threads = 10; let number_of_pending_tasks = 10000; let heavy_task_processor = - AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", max_number_of_threads, number_of_pending_tasks) .unwrap(); let main_handler = tokio::spawn(async move { std::thread::current().id() }); let main_id = main_handler.await.unwrap(); @@ -161,10 +161,7 @@ mod tests { // When let futures = iter::repeat_with(|| { heavy_task_processor - .try_spawn(async move { - tokio::time::sleep(Duration::from_secs(1)).await; - std::thread::current().id() - }) + .try_spawn(async move { std::thread::current().id() }) .unwrap() }) .take(number_of_pending_tasks) @@ -177,8 +174,12 @@ mod tests { .map(|r| r.unwrap()) .collect::>(); + // Main thread was not used. assert!(!unique_thread_ids.contains(&main_id)); - assert_eq!(unique_thread_ids.len(), number_of_threads); + // There's been at least one worker thread used. + assert!(unique_thread_ids.len() > 1); + // There were no more worker threads above the threshold. + assert!(unique_thread_ids.len() <= max_number_of_threads); } #[test] @@ -198,7 +199,7 @@ mod tests { }); // Then - let err = second_spawn_result.expect_err("Expected Ok result"); + let err = second_spawn_result.expect_err("Should error"); assert_eq!(err, OutOfCapacity); } From 89f80512b49c8b0e083d6f06e2d338e878dc6ec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 15 Jan 2025 17:34:52 +0100 Subject: [PATCH 03/10] Reduce the arbitrary number of tasks --- crates/services/src/async_processor.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index 6f7d936e491..1bb8f2bbbc4 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -248,9 +248,9 @@ mod tests { } #[tokio::test] - async fn executes_10_tasks_for_10_seconds_with_one_thread() { + async fn executes_5_tasks_for_5_seconds_with_one_thread() { // Given - let number_of_pending_tasks = 10; + let number_of_pending_tasks = 5; let number_of_threads = 1; let heavy_task_processor = AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) @@ -272,11 +272,14 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() >= Duration::from_secs(10)); + // 5 tasks running on 1 thread, each task taking 1 second, should complete in approximately 5 seconds overall. + // Allowing some LEEWAY to account for runtime overhead. + const LEEWAY: Duration = Duration::from_millis(300); + assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY); // Wait for the metrics to be updated. tokio::time::sleep(Duration::from_secs(1)).await; let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); - assert_eq!(duration.as_secs(), 10); + assert_eq!(duration.as_secs(), 5); let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get()); assert_eq!(duration.as_secs(), 0); } From d21ae4b534c5f93f7a325486da0c3bf14d209dfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 15 Jan 2025 17:39:27 +0100 Subject: [PATCH 04/10] Clean-up comment --- crates/services/src/async_processor.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index 1bb8f2bbbc4..355af109e96 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -272,7 +272,8 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} - // 5 tasks running on 1 thread, each task taking 1 second, should complete in approximately 5 seconds overall. + // 5 tasks running on 1 thread, each task taking 1 second, + // should complete in approximately 5 seconds overall. // Allowing some LEEWAY to account for runtime overhead. const LEEWAY: Duration = Duration::from_millis(300); assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY); From 15cc5c3688ceaa432cbb881279eff46201138ace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 15 Jan 2025 17:40:43 +0100 Subject: [PATCH 05/10] Rename some tests for clarity and update some assertions --- crates/services/src/async_processor.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index 355af109e96..0778e5d570a 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -286,7 +286,8 @@ mod tests { } #[tokio::test] - async fn executes_10_tasks_for_2_seconds_with_10_thread() { + async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time() + { // Given let number_of_pending_tasks = 10; let number_of_threads = 10; @@ -310,7 +311,11 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() <= Duration::from_secs(2)); + // 10 blocking tasks running on 10 threads, each task taking 1 second, + // should complete in approximately 1 second overall. + // Allowing some LEEWAY to account for runtime overhead. + const LEEWAY: Duration = Duration::from_millis(300); + assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY); // Wait for the metrics to be updated. tokio::time::sleep(Duration::from_secs(1)).await; let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); @@ -320,7 +325,8 @@ mod tests { } #[tokio::test] - async fn executes_10_tasks_for_2_seconds_with_1_thread() { + async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time( + ) { // Given let number_of_pending_tasks = 10; let number_of_threads = 10; @@ -344,7 +350,11 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() <= Duration::from_secs(2)); + // 10 blocking tasks running on 10 threads, each task taking 1 second, + // should complete in approximately 1 second overall. + // Allowing some LEEWAY to account for runtime overhead. + const LEEWAY: Duration = Duration::from_millis(300); + assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY); // Wait for the metrics to be updated. tokio::time::sleep(Duration::from_secs(1)).await; let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); From 073bffa2a03e5cfe743f1ef8bcd7b662cec2b99e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 15 Jan 2025 18:16:40 +0100 Subject: [PATCH 06/10] Convert some lets into consts --- crates/services/src/async_processor.rs | 52 +++++++++++++------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index 0778e5d570a..a59ba5b34cf 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -129,9 +129,9 @@ mod tests { #[tokio::test] async fn one_spawn_single_tasks_works() { // Given - let number_of_pending_tasks = 1; + const NUMBER_OF_PENDING_TASKS: usize = 1; let heavy_task_processor = - AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); // When let (sender, receiver) = tokio::sync::oneshot::channel(); @@ -150,10 +150,10 @@ mod tests { #[tokio::test] async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() { // Given - let max_number_of_threads = 10; - let number_of_pending_tasks = 10000; + const MAX_NUMBER_OF_THREADS: usize = 10; + const NUMBER_OF_PENDING_TASKS: usize = 10000; let heavy_task_processor = - AsyncProcessor::new("Test", max_number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS) .unwrap(); let main_handler = tokio::spawn(async move { std::thread::current().id() }); let main_id = main_handler.await.unwrap(); @@ -164,7 +164,7 @@ mod tests { .try_spawn(async move { std::thread::current().id() }) .unwrap() }) - .take(number_of_pending_tasks) + .take(NUMBER_OF_PENDING_TASKS) .collect::>(); // Then @@ -179,15 +179,15 @@ mod tests { // There's been at least one worker thread used. assert!(unique_thread_ids.len() > 1); // There were no more worker threads above the threshold. - assert!(unique_thread_ids.len() <= max_number_of_threads); + assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS); } #[test] fn second_spawn_fails_when_limit_is_one_and_first_in_progress() { // Given - let number_of_pending_tasks = 1; + const NUMBER_OF_PENDING_TASKS: usize = 1; let heavy_task_processor = - AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); let first_spawn_result = heavy_task_processor.try_spawn(async move { sleep(Duration::from_secs(1)); }); @@ -205,9 +205,9 @@ mod tests { #[test] fn second_spawn_works_when_first_is_finished() { - let number_of_pending_tasks = 1; + const NUMBER_OF_PENDING_TASKS: usize = 1; let heavy_task_processor = - AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); // Given let (sender, receiver) = tokio::sync::oneshot::channel(); @@ -232,11 +232,11 @@ mod tests { #[test] fn can_spawn_10_tasks_when_limit_is_10() { // Given - let number_of_pending_tasks = 10; + const NUMBER_OF_PENDING_TASKS: usize = 10; let heavy_task_processor = - AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); - for _ in 0..number_of_pending_tasks { + for _ in 0..NUMBER_OF_PENDING_TASKS { // When let result = heavy_task_processor.try_spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; @@ -250,17 +250,17 @@ mod tests { #[tokio::test] async fn executes_5_tasks_for_5_seconds_with_one_thread() { // Given - let number_of_pending_tasks = 5; - let number_of_threads = 1; + const NUMBER_OF_PENDING_TASKS: usize = 5; + const NUMBER_OF_THREADS: usize = 1; let heavy_task_processor = - AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS) .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = tokio::sync::broadcast::channel(1024); let instant = Instant::now(); - for _ in 0..number_of_pending_tasks { + for _ in 0..NUMBER_OF_PENDING_TASKS { let broadcast_sender = broadcast_sender.clone(); let result = heavy_task_processor.try_spawn(async move { sleep(Duration::from_secs(1)); @@ -289,17 +289,17 @@ mod tests { async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time() { // Given - let number_of_pending_tasks = 10; - let number_of_threads = 10; + const NUMBER_OF_PENDING_TASKS: usize = 10; + const NUMBER_OF_THREADS: usize = 10; let heavy_task_processor = - AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS) .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = tokio::sync::broadcast::channel(1024); let instant = Instant::now(); - for _ in 0..number_of_pending_tasks { + for _ in 0..NUMBER_OF_PENDING_TASKS { let broadcast_sender = broadcast_sender.clone(); let result = heavy_task_processor.try_spawn(async move { sleep(Duration::from_secs(1)); @@ -328,17 +328,17 @@ mod tests { async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time( ) { // Given - let number_of_pending_tasks = 10; - let number_of_threads = 10; + const NUMBER_OF_PENDING_TASKS: usize = 10; + const NUMBER_OF_THREADS: usize = 10; let heavy_task_processor = - AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS) .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = tokio::sync::broadcast::channel(1024); let instant = Instant::now(); - for _ in 0..number_of_pending_tasks { + for _ in 0..NUMBER_OF_PENDING_TASKS { let broadcast_sender = broadcast_sender.clone(); let result = heavy_task_processor.try_spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; From 75838d6ec633850f23b172aa8da9479b297ace41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 15 Jan 2025 18:30:25 +0100 Subject: [PATCH 07/10] Fix assertion --- crates/services/src/async_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index a59ba5b34cf..a8703d1a41f 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -177,7 +177,7 @@ mod tests { // Main thread was not used. assert!(!unique_thread_ids.contains(&main_id)); // There's been at least one worker thread used. - assert!(unique_thread_ids.len() > 1); + assert!(unique_thread_ids.len() >= 1); // There were no more worker threads above the threshold. assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS); } From c61c44225c4a38322f2cc9c8ab94533408dd18b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Wed, 15 Jan 2025 18:32:45 +0100 Subject: [PATCH 08/10] Update comment --- crates/services/src/async_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index a8703d1a41f..13ca51f22e5 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -350,7 +350,7 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} - // 10 blocking tasks running on 10 threads, each task taking 1 second, + // 10 non-blocking tasks running on 10 threads, each task taking 1 second, // should complete in approximately 1 second overall. // Allowing some LEEWAY to account for runtime overhead. const LEEWAY: Duration = Duration::from_millis(300); From 2825e52588470608ea6ca3805beec8056befd207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Thu, 16 Jan 2025 12:55:08 +0100 Subject: [PATCH 09/10] Satisfy Clippy --- crates/services/src/async_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index 13ca51f22e5..e354e42e269 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -177,7 +177,7 @@ mod tests { // Main thread was not used. assert!(!unique_thread_ids.contains(&main_id)); // There's been at least one worker thread used. - assert!(unique_thread_ids.len() >= 1); + assert!(!unique_thread_ids.is_empty()); // There were no more worker threads above the threshold. assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS); } From 8443c00eccaef13b6e607685ceb102b0d8a4fad4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Chabowski?= Date: Thu, 16 Jan 2025 19:36:26 +0100 Subject: [PATCH 10/10] Add assert to verify that tasks are not executed in parallel when they're not supposed to --- crates/services/src/async_processor.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index e354e42e269..01446a7136b 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -277,6 +277,8 @@ mod tests { // Allowing some LEEWAY to account for runtime overhead. const LEEWAY: Duration = Duration::from_millis(300); assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY); + // Make sure that the tasks were not executed in parallel. + assert!(instant.elapsed() >= Duration::from_secs(5)); // Wait for the metrics to be updated. tokio::time::sleep(Duration::from_secs(1)).await; let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());