Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions glommio/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4133,6 +4133,19 @@ mod test {
});
}

#[test]
fn wake_refcount_overflow() {
LocalExecutor::default().run(async {
const NUM_CLONES: usize = u16::MAX as usize;

crate::spawn_local(poll_fn::<(), _>(move |cx| {
let _wakers = Vec::from_iter((0..NUM_CLONES).map(|_| cx.waker().clone()));
Poll::Ready(())
}))
.await;
})
}

#[test]
fn blocking_function() {
LocalExecutor::default().run(async {
Expand Down
7 changes: 5 additions & 2 deletions glommio/src/task/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use core::{fmt, task::Waker};
#[cfg(feature = "debugging")]
use std::cell::Cell;
use std::sync::{
atomic::{AtomicI16, Ordering},
atomic::{AtomicI32, Ordering},
Arc,
};

Expand All @@ -16,6 +16,9 @@ use crate::{
task::{raw::TaskVTable, state::*, utils::abort_on_panic},
};

pub(crate) type RefCount = i32;
pub(crate) type AtomicRefCount = AtomicI32;

/// The header of a task.
///
/// This header is stored right at the beginning of every heap-allocated task.
Expand All @@ -31,7 +34,7 @@ pub(crate) struct Header {
pub(crate) latency_matters: bool,

/// Current reference count of the task.
pub(crate) references: AtomicI16,
pub(crate) references: AtomicRefCount,

/// The task that is blocked on the `JoinHandle`.
///
Expand Down
9 changes: 6 additions & 3 deletions glommio/src/task/join_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use core::{
use crate::task::debugging::TaskDebugger;
use crate::{
dbg_context,
task::{header::Header, state::*},
task::{
header::{Header, RefCount},
state::*,
},
};
use std::sync::atomic::Ordering;

Expand Down Expand Up @@ -70,7 +73,7 @@ impl<R> JoinHandle<R> {
// If we schedule it, need to bump the reference count, since after run() we
// decrement it.
let refs = (*header).references.fetch_add(1, Ordering::Relaxed);
assert_ne!(refs, i16::MAX);
assert_ne!(refs, RefCount::MAX);

((*header).vtable.schedule)(ptr);
}
Expand Down Expand Up @@ -132,7 +135,7 @@ impl<R> Drop for JoinHandle<R> {
if refs == 0 {
if state & CLOSED == 0 {
let refs = (*header).references.fetch_add(1, Ordering::Relaxed);
assert_ne!(refs, i16::MAX);
assert_ne!(refs, RefCount::MAX);
((*header).vtable.schedule)(ptr);
} else {
((*header).vtable.destroy)(ptr);
Expand Down
10 changes: 5 additions & 5 deletions glommio/src/task/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use core::{
};
#[cfg(feature = "debugging")]
use std::cell::Cell;
use std::sync::atomic::{AtomicI16, Ordering};
use std::sync::atomic::Ordering;

#[cfg(feature = "debugging")]
use crate::task::debugging::TaskDebugger;
use crate::{
dbg_context, sys,
task::{
header::Header,
header::{AtomicRefCount, Header, RefCount},
state::*,
utils::{abort, abort_on_panic, extend},
Task,
Expand Down Expand Up @@ -132,7 +132,7 @@ where
notifier: sys::get_sleep_notifier_for(executor_id).unwrap(),
state: SCHEDULED | HANDLE,
latency_matters,
references: AtomicI16::new(0),
references: AtomicRefCount::new(0),
awaiter: None,
vtable: &TaskVTable {
schedule: Self::schedule,
Expand Down Expand Up @@ -276,12 +276,12 @@ where
#[track_caller]
fn increment_references(header: &Header) {
let refs = header.references.fetch_add(1, Ordering::Relaxed);
assert_ne!(refs, i16::MAX, "Waker invariant broken: {header:?}");
assert_ne!(refs, RefCount::MAX, "Waker invariant broken: {header:?}");
}

#[inline]
#[track_caller]
fn decrement_references(header: &Header) -> i16 {
fn decrement_references(header: &Header) -> RefCount {
let refs = header.references.fetch_sub(1, Ordering::Relaxed);
assert_ne!(refs, 0, "Waker invariant broken: {header:?}");
refs - 1
Expand Down
9 changes: 7 additions & 2 deletions glommio/src/task/task_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ use core::{fmt, future::Future, marker::PhantomData, mem, ptr::NonNull};
use crate::task::debugging::TaskDebugger;
use crate::{
dbg_context,
task::{header::Header, raw::RawTask, state::*, JoinHandle},
task::{
header::{Header, RefCount},
raw::RawTask,
state::*,
JoinHandle,
},
};

use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -133,7 +138,7 @@ impl Task {

unsafe {
let refs = (*header).references.fetch_add(1, Ordering::Relaxed);
assert_ne!(refs, i16::MAX);
assert_ne!(refs, RefCount::MAX);
((*header).vtable.run)(ptr)
}
}
Expand Down
Loading