Skip to content

Commit a94d2f4

Browse files
author
Stjepan Glavina
authored
Add waker_fn (#18)
* Add waker_fn * Add a waker_fn test * Double sleep times * More benches * Prohibit recursive block_on calls * Reformat code
1 parent 4a94b05 commit a94d2f4

13 files changed

+298
-64
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ crossbeam = "0.7.3"
2323
crossbeam-utils = "0.7.0"
2424
futures = "0.3.1"
2525
lazy_static = "1.4.0"
26+
pin-utils = "0.1.0-alpha.4"
File renamed without changes.

benches/waker_fn.rs

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#![feature(test)]
2+
3+
extern crate test;
4+
5+
use std::cell::RefCell;
6+
use std::future::Future;
7+
use std::pin::Pin;
8+
use std::task::{Context, Poll, Waker};
9+
10+
use crossbeam::sync::Parker;
11+
use test::Bencher;
12+
13+
/// Runs a future to completion on the current thread.
14+
fn block_on<F: Future>(future: F) -> F::Output {
15+
// Pin the future on the stack.
16+
pin_utils::pin_mut!(future);
17+
18+
thread_local! {
19+
// Parker and waker associated with the current thread.
20+
static CACHE: RefCell<(Parker, Waker)> = {
21+
let parker = Parker::new();
22+
let unparker = parker.unparker().clone();
23+
let waker = async_task::waker_fn(move || unparker.unpark());
24+
RefCell::new((parker, waker))
25+
};
26+
}
27+
28+
CACHE.with(|cache| {
29+
// Panic if `block_on()` is called recursively.
30+
let (parker, waker) = &mut *cache.try_borrow_mut().ok().expect("recursive `block_on`");
31+
32+
// Create the task context.
33+
let cx = &mut Context::from_waker(&waker);
34+
35+
// Keep polling the future until completion.
36+
loop {
37+
match future.as_mut().poll(cx) {
38+
Poll::Ready(output) => return output,
39+
Poll::Pending => parker.park(),
40+
}
41+
}
42+
})
43+
}
44+
45+
#[bench]
46+
fn custom_block_on_0_yields(b: &mut Bencher) {
47+
b.iter(|| block_on(Yields(0)));
48+
}
49+
50+
#[bench]
51+
fn custom_block_on_10_yields(b: &mut Bencher) {
52+
b.iter(|| block_on(Yields(10)));
53+
}
54+
55+
#[bench]
56+
fn custom_block_on_50_yields(b: &mut Bencher) {
57+
b.iter(|| block_on(Yields(50)));
58+
}
59+
60+
#[bench]
61+
fn futures_block_on_0_yields(b: &mut Bencher) {
62+
b.iter(|| futures::executor::block_on(Yields(0)));
63+
}
64+
65+
#[bench]
66+
fn futures_block_on_10_yields(b: &mut Bencher) {
67+
b.iter(|| futures::executor::block_on(Yields(10)));
68+
}
69+
70+
#[bench]
71+
fn futures_block_on_50_yields(b: &mut Bencher) {
72+
b.iter(|| futures::executor::block_on(Yields(50)));
73+
}
74+
75+
struct Yields(u32);
76+
77+
impl Future for Yields {
78+
type Output = ();
79+
80+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81+
if self.0 == 0 {
82+
Poll::Ready(())
83+
} else {
84+
self.0 -= 1;
85+
cx.waker().wake_by_ref();
86+
Poll::Pending
87+
}
88+
}
89+
}

examples/block.rs

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//! A simple implementation of `block_on`.
2+
3+
use std::cell::RefCell;
4+
use std::future::Future;
5+
use std::task::{Context, Poll, Waker};
6+
use std::thread;
7+
use std::time::Duration;
8+
9+
use crossbeam::sync::Parker;
10+
use futures::channel::oneshot;
11+
12+
/// Runs a future to completion on the current thread.
13+
fn block_on<F: Future>(future: F) -> F::Output {
14+
// Pin the future on the stack.
15+
pin_utils::pin_mut!(future);
16+
17+
thread_local! {
18+
// Parker and waker associated with the current thread.
19+
static CACHE: RefCell<(Parker, Waker)> = {
20+
let parker = Parker::new();
21+
let unparker = parker.unparker().clone();
22+
let waker = async_task::waker_fn(move || unparker.unpark());
23+
RefCell::new((parker, waker))
24+
};
25+
}
26+
27+
CACHE.with(|cache| {
28+
// Panic if `block_on()` is called recursively.
29+
let (parker, waker) = &mut *cache.try_borrow_mut().ok().expect("recursive block_on()");
30+
31+
// Create the task context.
32+
let cx = &mut Context::from_waker(&waker);
33+
34+
// Keep polling the future until completion.
35+
loop {
36+
match future.as_mut().poll(cx) {
37+
Poll::Ready(output) => return output,
38+
Poll::Pending => parker.park(),
39+
}
40+
}
41+
})
42+
}
43+
44+
fn main() {
45+
let (s, r) = oneshot::channel();
46+
47+
// Spawn a thread that will send a message through the channel.
48+
thread::spawn(move || {
49+
thread::sleep(Duration::from_secs(1));
50+
s.send("Hello, world!").unwrap();
51+
});
52+
53+
// Block until the message is received.
54+
let msg = block_on(async {
55+
println!("Awaiting...");
56+
r.await.unwrap()
57+
});
58+
59+
println!("{}", msg);
60+
}

src/lib.rs

+24-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
//! Task abstraction for building executors.
22
//!
3+
//! # Spawning
4+
//!
35
//! To spawn a future onto an executor, we first need to allocate it on the heap and keep some
46
//! state alongside it. The state indicates whether the future is ready for polling, waiting to be
57
//! woken up, or completed. Such a future is called a *task*.
68
//!
7-
//! This crate helps with task allocation and polling its future to completion.
8-
//!
9-
//! # Spawning
10-
//!
119
//! All executors have some kind of queue that holds runnable tasks:
1210
//!
1311
//! ```
@@ -89,13 +87,31 @@
8987
//! Task construction incurs a single allocation that holds its state, the schedule function, and
9088
//! the future or the result of the future if completed.
9189
//!
92-
//! The layout of a task is equivalent to 4 words followed by the schedule function, and then by a
93-
//! union of the future and its output.
90+
//! The layout of a task is equivalent to 4 `usize`s followed by the schedule function, and then by
91+
//! a union of the future and its output.
92+
//!
93+
//! # Waking
94+
//!
95+
//! The handy [`waker_fn`] constructor converts any function into a [`Waker`]. Every time it is
96+
//! woken, the function gets called:
97+
//!
98+
//! ```
99+
//! let waker = async_task::waker_fn(|| println!("Wake!"));
100+
//!
101+
//! // Prints "Wake!" twice.
102+
//! waker.wake_by_ref();
103+
//! waker.wake_by_ref();
104+
//! ```
105+
//!
106+
//! This is useful for implementing single-future executors like [`block_on`].
94107
//!
95108
//! [`spawn`]: fn.spawn.html
96109
//! [`spawn_local`]: fn.spawn_local.html
110+
//! [`waker_fn`]: fn.waker_fn.html
97111
//! [`Task`]: struct.Task.html
98112
//! [`JoinHandle`]: struct.JoinHandle.html
113+
//! [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
114+
//! [`block_on`]: https://github.com/async-rs/async-task/blob/master/examples/block.rs
99115
100116
#![no_std]
101117
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
@@ -110,6 +126,8 @@ mod raw;
110126
mod state;
111127
mod task;
112128
mod utils;
129+
mod waker_fn;
113130

114131
pub use crate::join_handle::JoinHandle;
115132
pub use crate::task::{spawn, spawn_local, Task};
133+
pub use crate::waker_fn::waker_fn;

src/raw.rs

+9-15
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ use crate::Task;
1515

1616
/// The vtable for a task.
1717
pub(crate) struct TaskVTable {
18-
/// The raw waker vtable.
19-
pub(crate) raw_waker_vtable: RawWakerVTable,
20-
2118
/// Schedules the task.
2219
pub(crate) schedule: unsafe fn(*const ()),
2320

@@ -101,6 +98,13 @@ where
10198
F: Future<Output = R> + 'static,
10299
S: Fn(Task<T>) + Send + Sync + 'static,
103100
{
101+
const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
102+
Self::clone_waker,
103+
Self::wake,
104+
Self::wake_by_ref,
105+
Self::drop_waker,
106+
);
107+
104108
/// Allocates a task with the given `future` and `schedule` function.
105109
///
106110
/// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
@@ -122,12 +126,6 @@ where
122126
state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
123127
awaiter: UnsafeCell::new(None),
124128
vtable: &TaskVTable {
125-
raw_waker_vtable: RawWakerVTable::new(
126-
Self::clone_waker,
127-
Self::wake,
128-
Self::wake_by_ref,
129-
Self::drop_waker,
130-
),
131129
schedule: Self::schedule,
132130
drop_future: Self::drop_future,
133131
get_output: Self::get_output,
@@ -335,7 +333,6 @@ where
335333
/// Clones a waker.
336334
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
337335
let raw = Self::from_ptr(ptr);
338-
let raw_waker_vtable = &(*raw.header).vtable.raw_waker_vtable;
339336

340337
// Increment the reference count. With any kind of reference-counted data structure,
341338
// relaxed ordering is appropriate when incrementing the counter.
@@ -346,7 +343,7 @@ where
346343
abort();
347344
}
348345

349-
RawWaker::new(ptr, raw_waker_vtable)
346+
RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
350347
}
351348

352349
/// Drops a waker.
@@ -464,10 +461,7 @@ where
464461
let raw = Self::from_ptr(ptr);
465462

466463
// Create a context from the raw task pointer and the vtable inside the its header.
467-
let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
468-
ptr,
469-
&(*raw.header).vtable.raw_waker_vtable,
470-
)));
464+
let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
471465
let cx = &mut Context::from_waker(&waker);
472466

473467
let mut state = (*raw.header).state.load(Ordering::Acquire);

src/waker_fn.rs

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use alloc::sync::Arc;
2+
use core::mem::{self, ManuallyDrop};
3+
use core::task::{RawWaker, RawWakerVTable, Waker};
4+
5+
/// Creates a waker from a wake function.
6+
///
7+
/// The function gets called every time the waker is woken.
8+
pub fn waker_fn<F: Fn() + Send + Sync + 'static>(f: F) -> Waker {
9+
let raw = Arc::into_raw(Arc::new(f)) as *const ();
10+
let vtable = &Helper::<F>::VTABLE;
11+
unsafe { Waker::from_raw(RawWaker::new(raw, vtable)) }
12+
}
13+
14+
struct Helper<F>(F);
15+
16+
impl<F: Fn() + Send + Sync + 'static> Helper<F> {
17+
const VTABLE: RawWakerVTable = RawWakerVTable::new(
18+
Self::clone_waker,
19+
Self::wake,
20+
Self::wake_by_ref,
21+
Self::drop_waker,
22+
);
23+
24+
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
25+
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const F));
26+
mem::forget(arc.clone());
27+
RawWaker::new(ptr, &Self::VTABLE)
28+
}
29+
30+
unsafe fn wake(ptr: *const ()) {
31+
let arc = Arc::from_raw(ptr as *const F);
32+
(arc)();
33+
}
34+
35+
unsafe fn wake_by_ref(ptr: *const ()) {
36+
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const F));
37+
(arc)();
38+
}
39+
40+
unsafe fn drop_waker(ptr: *const ()) {
41+
drop(Arc::from_raw(ptr as *const F));
42+
}
43+
}

0 commit comments

Comments
 (0)