Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update async processor tests #2577

Merged
merged 12 commits into from
Jan 22, 2025
105 changes: 62 additions & 43 deletions crates/services/src/async_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,46 +126,45 @@ mod tests {
};
use tokio::time::Instant;

#[test]
fn one_spawn_single_tasks_works() {
#[tokio::test]
async fn one_spawn_single_tasks_works() {
// Given
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

// When
let (sender, mut receiver) = tokio::sync::oneshot::channel();
let (sender, receiver) = tokio::sync::oneshot::channel();
let result = heavy_task_processor.try_spawn(async move {
sender.send(()).unwrap();
});

// Then
result.expect("Expected Ok result");
sleep(Duration::from_secs(1));
receiver.try_recv().unwrap();
tokio::time::timeout(Duration::from_secs(5), receiver)
.await
.unwrap()
.unwrap();
}

#[tokio::test]
async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
// Given
let number_of_threads = 10;
let number_of_pending_tasks = 10000;
const MAX_NUMBER_OF_THREADS: usize = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10000;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();
let main_handler = tokio::spawn(async move { std::thread::current().id() });
let main_id = main_handler.await.unwrap();

// When
let futures = iter::repeat_with(|| {
heavy_task_processor
.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
std::thread::current().id()
})
.try_spawn(async move { std::thread::current().id() })
.unwrap()
})
.take(number_of_pending_tasks)
.take(NUMBER_OF_PENDING_TASKS)
.collect::<Vec<_>>();

// Then
Expand All @@ -175,16 +174,20 @@ mod tests {
.map(|r| r.unwrap())
.collect::<HashSet<_>>();

// Main thread was not used.
assert!(!unique_thread_ids.contains(&main_id));
assert_eq!(unique_thread_ids.len(), number_of_threads);
// There's been at least one worker thread used.
assert!(!unique_thread_ids.is_empty());
// There were no more worker threads above the threshold.
assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS);
}

#[test]
fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
// Given
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
let first_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});
Expand All @@ -196,15 +199,15 @@ mod tests {
});

// Then
let err = second_spawn_result.expect_err("Expected Ok result");
let err = second_spawn_result.expect_err("Should error");
assert_eq!(err, OutOfCapacity);
}

#[test]
fn second_spawn_works_when_first_is_finished() {
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

// Given
let (sender, receiver) = tokio::sync::oneshot::channel();
Expand All @@ -229,11 +232,11 @@ mod tests {
#[test]
fn can_spawn_10_tasks_when_limit_is_10() {
// Given
let number_of_pending_tasks = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
// When
let result = heavy_task_processor.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -245,19 +248,19 @@ mod tests {
}

#[tokio::test]
async fn executes_10_tasks_for_10_seconds_with_one_thread() {
async fn executes_5_tasks_for_5_seconds_with_one_thread() {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 1;
const NUMBER_OF_PENDING_TASKS: usize = 5;
const NUMBER_OF_THREADS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
Expand All @@ -269,29 +272,36 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() >= Duration::from_secs(10));
// 5 tasks running on 1 thread, each task taking 1 second,
// should complete in approximately 5 seconds overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY);
// Make sure that the tasks were not executed in parallel.
assert!(instant.elapsed() >= Duration::from_secs(5));
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
assert_eq!(duration.as_secs(), 10);
assert_eq!(duration.as_secs(), 5);
let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
assert_eq!(duration.as_secs(), 0);
}

#[tokio::test]
async fn executes_10_tasks_for_2_seconds_with_10_thread() {
async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time()
{
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
const NUMBER_OF_THREADS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
Expand All @@ -303,7 +313,11 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() <= Duration::from_secs(2));
// 10 blocking tasks running on 10 threads, each task taking 1 second,
// should complete in approximately 1 second overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
Expand All @@ -313,19 +327,20 @@ mod tests {
}

#[tokio::test]
async fn executes_10_tasks_for_2_seconds_with_1_thread() {
async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time(
) {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
const NUMBER_OF_THREADS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -337,7 +352,11 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() <= Duration::from_secs(2));
// 10 non-blocking tasks running on 10 threads, each task taking 1 second,
// should complete in approximately 1 second overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
Expand Down
Loading