Skip to content

Commit 5c398cf

Browse files
author
Stjepan Glavina
committed
Reschedule if last waker is dropped
1 parent 8b8c8a1 commit 5c398cf

File tree

7 files changed

+213
-81
lines changed

7 files changed

+213
-81
lines changed

src/header.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub(crate) struct Header {
2121

2222
/// The task that is blocked on the `JoinHandle`.
2323
///
24-
/// This waker needs to be woken once the task completes or is closed.
24+
/// This waker needs to be woken up once the task completes or is closed.
2525
pub(crate) awaiter: Cell<Option<Waker>>,
2626

2727
/// The virtual table.
@@ -34,8 +34,8 @@ pub(crate) struct Header {
3434
impl Header {
3535
/// Cancels the task.
3636
///
37-
/// This method will only mark the task as closed and will notify the awaiter, but it won't
38-
/// reschedule the task if it's not completed.
37+
/// This method will mark the task as closed and notify the awaiter, but it won't reschedule
38+
/// the task if it's not completed.
3939
pub(crate) fn cancel(&self) {
4040
let mut state = self.state.load(Ordering::Acquire);
4141

@@ -65,9 +65,9 @@ impl Header {
6565
}
6666
}
6767

68-
/// Notifies the task blocked on the task.
68+
/// Notifies the awaiter blocked on this task.
6969
///
70-
/// If there is a registered waker, it will be removed from the header and woken.
70+
/// If there is a registered waker, it will be removed from the header and woken up.
7171
#[inline]
7272
pub(crate) fn notify(&self) {
7373
if let Some(waker) = self.swap_awaiter(None) {
@@ -78,9 +78,9 @@ impl Header {
7878
}
7979
}
8080

81-
/// Notifies the task blocked on the task unless its waker matches `current`.
81+
/// Notifies the awaiter blocked on this task, unless its waker matches `current`.
8282
///
83-
/// If there is a registered waker, it will be removed from the header.
83+
/// If there is a registered waker, it will be removed from the header in any case.
8484
#[inline]
8585
pub(crate) fn notify_unless(&self, current: &Waker) {
8686
if let Some(waker) = self.swap_awaiter(None) {
@@ -93,7 +93,7 @@ impl Header {
9393
}
9494
}
9595

96-
/// Swaps the awaiter and returns the previous value.
96+
/// Swaps the awaiter for a new waker and returns the previous value.
9797
#[inline]
9898
pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
9999
let new_is_none = new.is_none();

src/join_handle.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ use crate::utils::abort_on_panic;
1414
///
1515
/// This type is a future that resolves to an `Option<R>` where:
1616
///
17-
/// * `None` indicates the task has panicked or was cancelled
18-
/// * `Some(res)` indicates the task has completed with `res`
17+
/// * `None` indicates the task has panicked or was cancelled.
18+
/// * `Some(result)` indicates the task has completed with `result` of type `R`.
1919
pub struct JoinHandle<R, T> {
2020
/// A raw task pointer.
2121
pub(crate) raw_task: NonNull<()>,
2222

23-
/// A marker capturing the generic type `R`.
23+
/// A marker capturing generic types `R` and `T`.
2424
pub(crate) _marker: PhantomData<(R, T)>,
2525
}
2626

@@ -34,7 +34,7 @@ impl<R, T> JoinHandle<R, T> {
3434
///
3535
/// If the task has already completed, calling this method will have no effect.
3636
///
37-
/// When a task is cancelled, its future cannot be polled again and will be dropped instead.
37+
/// When a task is cancelled, its future will not be polled again.
3838
pub fn cancel(&self) {
3939
let ptr = self.raw_task.as_ptr();
4040
let header = ptr as *const Header;
@@ -63,8 +63,8 @@ impl<R, T> JoinHandle<R, T> {
6363
Ordering::Acquire,
6464
) {
6565
Ok(_) => {
66-
// If the task is not scheduled nor running, schedule it so that its future
67-
// gets dropped by the executor.
66+
// If the task is not scheduled nor running, schedule it one more time so
67+
// that its future gets dropped by the executor.
6868
if state & (SCHEDULED | RUNNING) == 0 {
6969
((*header).vtable.schedule)(ptr);
7070
}

src/lib.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,17 @@
7777
//!
7878
//! # Cancellation
7979
//!
80-
//! Both [`Task`] and [`JoinHandle`] have a method that cancels the task. When cancelled, the
81-
//! task's future will not be polled again and will get dropped instead.
80+
//! Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task's
81+
//! future will not be polled again and will get dropped instead.
8282
//!
8383
//! If cancelled by the [`Task`] instance, the task is destroyed immediately. If cancelled by the
8484
//! [`JoinHandle`] instance, it will be scheduled one more time and the next attempt to run it will
8585
//! simply destroy it.
8686
//!
8787
//! # Performance
8888
//!
89-
//! Task construction incurs a single allocation only that holds its state, the schedule function,
90-
//! and the future or the result of the future if completed.
89+
//! Task construction incurs a single allocation that holds its state, the schedule function, and
90+
//! the future or the result of the future if completed.
9191
//!
9292
//! The layout of a task is equivalent to 4 words followed by the schedule function, and then by a
9393
//! union of the future and its output.

src/raw.rs

+62-33
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pub(crate) struct TaskVTable {
2727
/// Returns a pointer to the output stored after completion.
2828
pub(crate) get_output: unsafe fn(*const ()) -> *const (),
2929

30-
/// Drops a waker or a task.
31-
pub(crate) decrement: unsafe fn(ptr: *const ()),
30+
/// Drops the task.
31+
pub(crate) drop_task: unsafe fn(ptr: *const ()),
3232

3333
/// Destroys the task.
3434
pub(crate) destroy: unsafe fn(*const ()),
@@ -39,7 +39,7 @@ pub(crate) struct TaskVTable {
3939

4040
/// Memory layout of a task.
4141
///
42-
/// This struct contains the information on:
42+
/// This struct contains the following information:
4343
///
4444
/// 1. How to allocate and deallocate the task.
4545
/// 2. How to access the fields inside the task.
@@ -61,7 +61,7 @@ pub(crate) struct TaskLayout {
6161
pub(crate) offset_r: usize,
6262
}
6363

64-
/// Raw pointers to the fields of a task.
64+
/// Raw pointers to the fields inside a task.
6565
pub(crate) struct RawTask<F, R, S, T> {
6666
/// The task header.
6767
pub(crate) header: *const Header,
@@ -102,7 +102,7 @@ where
102102
{
103103
/// Allocates a task with the given `future` and `schedule` function.
104104
///
105-
/// It is assumed there are initially only the `Task` reference and the `JoinHandle`.
105+
/// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
106106
pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> {
107107
// Compute the layout of the task for allocation. Abort if the computation fails.
108108
let task_layout = abort_on_panic(|| Self::task_layout());
@@ -125,12 +125,12 @@ where
125125
Self::clone_waker,
126126
Self::wake,
127127
Self::wake_by_ref,
128-
Self::decrement,
128+
Self::drop_waker,
129129
),
130130
schedule: Self::schedule,
131131
drop_future: Self::drop_future,
132132
get_output: Self::get_output,
133-
decrement: Self::decrement,
133+
drop_task: Self::drop_task,
134134
destroy: Self::destroy,
135135
run: Self::run,
136136
},
@@ -181,7 +181,7 @@ where
181181
let align_union = layout_f.align().max(layout_r.align());
182182
let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
183183

184-
// Compute the layout for `Header` followed by `T`, then `S`, then `union { F, R }`.
184+
// Compute the layout for `Header` followed by `T`, then `S`, and finally `union { F, R }`.
185185
let layout = layout_header;
186186
let (layout, offset_t) = extend(layout, layout_t);
187187
let (layout, offset_s) = extend(layout, layout_s);
@@ -205,10 +205,10 @@ where
205205
let mut state = (*raw.header).state.load(Ordering::Acquire);
206206

207207
loop {
208-
// If the task is completed or closed, it can't be woken.
208+
// If the task is completed or closed, it can't be woken up.
209209
if state & (COMPLETED | CLOSED) != 0 {
210210
// Drop the waker.
211-
Self::decrement(ptr);
211+
Self::drop_waker(ptr);
212212
break;
213213
}
214214

@@ -224,7 +224,7 @@ where
224224
) {
225225
Ok(_) => {
226226
// Drop the waker.
227-
Self::decrement(ptr);
227+
Self::drop_waker(ptr);
228228
break;
229229
}
230230
Err(s) => state = s,
@@ -249,7 +249,7 @@ where
249249
(*raw.schedule)(task);
250250
} else {
251251
// Drop the waker.
252-
Self::decrement(ptr);
252+
Self::drop_waker(ptr);
253253
}
254254

255255
break;
@@ -267,7 +267,7 @@ where
267267
let mut state = (*raw.header).state.load(Ordering::Acquire);
268268

269269
loop {
270-
// If the task is completed or closed, it can't be woken.
270+
// If the task is completed or closed, it can't be woken up.
271271
if state & (COMPLETED | CLOSED) != 0 {
272272
break;
273273
}
@@ -330,7 +330,7 @@ where
330330
let raw_waker = &(*raw.header).vtable.raw_waker;
331331

332332
// Increment the reference count. With any kind of reference-counted data structure,
333-
// relaxed ordering is fine when the reference is being cloned.
333+
// relaxed ordering is appropriate when incrementing the counter.
334334
let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
335335

336336
// If the reference count overflowed, abort.
@@ -341,19 +341,48 @@ where
341341
RawWaker::new(ptr, raw_waker)
342342
}
343343

344-
/// Drops a waker or a task.
344+
/// Drops a waker.
345+
///
346+
/// This function will decrement the reference count. If it drops down to zero, the associated
347+
/// join handle has been dropped too, and the task has not been completed, then it will get
348+
/// scheduled one more time so that its future gets dropped by the executor.
349+
#[inline]
350+
unsafe fn drop_waker(ptr: *const ()) {
351+
let raw = Self::from_ptr(ptr);
352+
353+
// Decrement the reference count.
354+
let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
355+
356+
// If this was the last reference to the task and the `JoinHandle` has been dropped too,
357+
// then we need to decide how to destroy the task.
358+
if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
359+
if new & (COMPLETED | CLOSED) == 0 {
360+
// If the task was not completed nor closed, close it and schedule one more time so
361+
// that its future gets dropped by the executor.
362+
(*raw.header)
363+
.state
364+
.store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
365+
((*raw.header).vtable.schedule)(ptr);
366+
} else {
367+
// Otherwise, destroy the task right away.
368+
Self::destroy(ptr);
369+
}
370+
}
371+
}
372+
373+
/// Drops a task.
345374
///
346375
/// This function will decrement the reference count. If it drops down to zero and the
347376
/// associated join handle has been dropped too, then the task gets destroyed.
348377
#[inline]
349-
unsafe fn decrement(ptr: *const ()) {
378+
unsafe fn drop_task(ptr: *const ()) {
350379
let raw = Self::from_ptr(ptr);
351380

352381
// Decrement the reference count.
353382
let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
354383

355-
// If this was the last reference to the task and the `JoinHandle` has been dropped as
356-
// well, then destroy the task.
384+
// If this was the last reference to the task and the `JoinHandle` has been dropped too,
385+
// then destroy the task.
357386
if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
358387
Self::destroy(ptr);
359388
}
@@ -391,8 +420,8 @@ where
391420

392421
/// Cleans up task's resources and deallocates it.
393422
///
394-
/// If the task has not been closed, then its future or the output will be dropped. The
395-
/// schedule function and the tag get dropped too.
423+
/// The schedule function and the tag will be dropped, and the task will then get deallocated.
424+
/// The task must be closed before this function is called.
396425
#[inline]
397426
unsafe fn destroy(ptr: *const ()) {
398427
let raw = Self::from_ptr(ptr);
@@ -413,8 +442,8 @@ where
413442

414443
/// Runs a task.
415444
///
416-
/// If polling its future panics, the task will be closed and the panic propagated into the
417-
/// caller.
445+
/// If polling its future panics, the task will be closed and the panic will be propagated into
446+
/// the caller.
418447
unsafe fn run(ptr: *const ()) {
419448
let raw = Self::from_ptr(ptr);
420449

@@ -429,7 +458,7 @@ where
429458

430459
// Update the task's state before polling its future.
431460
loop {
432-
// If the task has been closed, drop the task reference and return.
461+
// If the task has already been closed, drop the task reference and return.
433462
if state & CLOSED != 0 {
434463
// Notify the awaiter that the task has been closed.
435464
if state & AWAITER != 0 {
@@ -440,7 +469,7 @@ where
440469
Self::drop_future(ptr);
441470

442471
// Drop the task reference.
443-
Self::decrement(ptr);
472+
Self::drop_task(ptr);
444473
return;
445474
}
446475

@@ -505,7 +534,7 @@ where
505534
}
506535

507536
// Drop the task reference.
508-
Self::decrement(ptr);
537+
Self::drop_task(ptr);
509538
break;
510539
}
511540
Err(s) => state = s,
@@ -519,7 +548,7 @@ where
519548
// The task is still not completed.
520549
loop {
521550
// If the task was closed while running, we'll need to unschedule in case it
522-
// was woken and then clean up its resources.
551+
// was woken up and then destroy it.
523552
let new = if state & CLOSED != 0 {
524553
state & !RUNNING & !SCHEDULED
525554
} else {
@@ -535,22 +564,22 @@ where
535564
) {
536565
Ok(state) => {
537566
// If the task was closed while running, we need to drop its future.
538-
// If the task was woken while running, we need to schedule it.
567+
// If the task was woken up while running, we need to schedule it.
539568
// Otherwise, we just drop the task reference.
540569
if state & CLOSED != 0 {
541570
// The thread that closed the task didn't drop the future because
542571
// it was running so now it's our responsibility to do so.
543572
Self::drop_future(ptr);
544573

545574
// Drop the task reference.
546-
Self::decrement(ptr);
575+
Self::drop_task(ptr);
547576
} else if state & SCHEDULED != 0 {
548-
// The thread that has woken the task didn't reschedule it because
577+
// The thread that woke the task up didn't reschedule it because
549578
// it was running so now it's our responsibility to do so.
550579
Self::schedule(ptr);
551580
} else {
552581
// Drop the task reference.
553-
Self::decrement(ptr);
582+
Self::drop_task(ptr);
554583
}
555584
break;
556585
}
@@ -587,15 +616,15 @@ where
587616
// future, and drop the task reference.
588617
if state & CLOSED != 0 {
589618
// We still need to unschedule the task because it is possible it was
590-
// woken while running.
619+
// woken up while running.
591620
(*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
592621

593622
// The thread that closed the task didn't drop the future because it
594623
// was running so now it's our responsibility to do so.
595624
RawTask::<F, R, S, T>::drop_future(ptr);
596625

597626
// Drop the task reference.
598-
RawTask::<F, R, S, T>::decrement(ptr);
627+
RawTask::<F, R, S, T>::drop_task(ptr);
599628
break;
600629
}
601630

@@ -616,7 +645,7 @@ where
616645
}
617646

618647
// Drop the task reference.
619-
RawTask::<F, R, S, T>::decrement(ptr);
648+
RawTask::<F, R, S, T>::drop_task(ptr);
620649
break;
621650
}
622651
Err(s) => state = s,

0 commit comments

Comments
 (0)