Skip to content

Commit fcfa4ab

Browse files
author
Stjepan Glavina
committed
Add spawn_local function
1 parent 5c398cf commit fcfa4ab

11 files changed

+562
-29
lines changed

examples/panic-propagation.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use futures::executor;
1111
use futures::future::FutureExt;
1212
use lazy_static::lazy_static;
1313

14+
type Task = async_task::Task<()>;
15+
1416
/// Spawns a future on the executor.
1517
fn spawn<F, R>(future: F) -> JoinHandle<R>
1618
where
@@ -19,8 +21,8 @@ where
1921
{
2022
lazy_static! {
2123
// A channel that holds scheduled tasks.
22-
static ref QUEUE: Sender<async_task::Task<()>> = {
23-
let (sender, receiver) = unbounded::<async_task::Task<()>>();
24+
static ref QUEUE: Sender<Task> = {
25+
let (sender, receiver) = unbounded::<Task>();
2426

2527
// Start the executor thread.
2628
thread::spawn(|| {

examples/panic-result.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,19 @@ use futures::executor;
99
use futures::future::FutureExt;
1010
use lazy_static::lazy_static;
1111

12+
type Task = async_task::Task<()>;
13+
type JoinHandle<T> = async_task::JoinHandle<T, ()>;
14+
1215
/// Spawns a future on the executor.
13-
fn spawn<F, R>(future: F) -> async_task::JoinHandle<thread::Result<R>, ()>
16+
fn spawn<F, R>(future: F) -> JoinHandle<thread::Result<R>>
1417
where
1518
F: Future<Output = R> + Send + 'static,
1619
R: Send + 'static,
1720
{
1821
lazy_static! {
1922
// A channel that holds scheduled tasks.
20-
static ref QUEUE: Sender<async_task::Task<()>> = {
21-
let (sender, receiver) = unbounded::<async_task::Task<()>>();
23+
static ref QUEUE: Sender<Task> = {
24+
let (sender, receiver) = unbounded::<Task>();
2225

2326
// Start the executor thread.
2427
thread::spawn(|| {

examples/spawn-local.rs

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//! A simple single-threaded executor that can spawn non-`Send` futures.
2+
3+
use std::cell::Cell;
4+
use std::future::Future;
5+
use std::rc::Rc;
6+
7+
use crossbeam::channel::{unbounded, Receiver, Sender};
8+
9+
type Task = async_task::Task<()>;
10+
type JoinHandle<T> = async_task::JoinHandle<T, ()>;
11+
12+
thread_local! {
13+
// A channel that holds scheduled tasks.
14+
static QUEUE: (Sender<Task>, Receiver<Task>) = unbounded();
15+
}
16+
17+
/// Spawns a future on the executor.
18+
fn spawn<F, R>(future: F) -> JoinHandle<R>
19+
where
20+
F: Future<Output = R> + 'static,
21+
R: 'static,
22+
{
23+
// Create a task that is scheduled by sending itself into the channel.
24+
let schedule = |t| QUEUE.with(|(s, _)| s.send(t).unwrap());
25+
let (task, handle) = async_task::spawn_local(future, schedule, ());
26+
27+
// Schedule the task by sending it into the queue.
28+
task.schedule();
29+
30+
handle
31+
}
32+
33+
/// Runs a future to completion.
34+
fn run<F, R>(future: F) -> R
35+
where
36+
F: Future<Output = R> + 'static,
37+
R: 'static,
38+
{
39+
// Spawn a task that sends its result through a channel.
40+
let (s, r) = unbounded();
41+
spawn(async move { s.send(future.await).unwrap() });
42+
43+
loop {
44+
// If the original task has completed, return its result.
45+
if let Ok(val) = r.try_recv() {
46+
return val;
47+
}
48+
49+
// Otherwise, take a task from the queue and run it.
50+
QUEUE.with(|(_, r)| r.recv().unwrap().run());
51+
}
52+
}
53+
54+
fn main() {
55+
let val = Rc::new(Cell::new(0));
56+
57+
// Run a future that increments a non-`Send` value.
58+
run({
59+
let val = val.clone();
60+
async move {
61+
// Spawn a future that increments the value.
62+
let handle = spawn({
63+
let val = val.clone();
64+
async move {
65+
val.set(dbg!(val.get()) + 1);
66+
}
67+
});
68+
69+
val.set(dbg!(val.get()) + 1);
70+
handle.await;
71+
}
72+
});
73+
74+
// The value should be 2 at the end of the program.
75+
dbg!(val.get());
76+
}

examples/spawn-on-thread.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ use std::thread;
77
use crossbeam::channel;
88
use futures::executor;
99

10+
type JoinHandle<T> = async_task::JoinHandle<T, ()>;
11+
1012
/// Spawns a future on a new dedicated thread.
1113
///
1214
/// The returned handle can be used to await the output of the future.
13-
fn spawn_on_thread<F, R>(future: F) -> async_task::JoinHandle<R, ()>
15+
fn spawn_on_thread<F, R>(future: F) -> JoinHandle<R>
1416
where
1517
F: Future<Output = R> + Send + 'static,
1618
R: Send + 'static,

examples/spawn.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,19 @@ use crossbeam::channel::{unbounded, Sender};
88
use futures::executor;
99
use lazy_static::lazy_static;
1010

11+
type Task = async_task::Task<()>;
12+
type JoinHandle<T> = async_task::JoinHandle<T, ()>;
13+
1114
/// Spawns a future on the executor.
12-
fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>
15+
fn spawn<F, R>(future: F) -> JoinHandle<R>
1316
where
1417
F: Future<Output = R> + Send + 'static,
1518
R: Send + 'static,
1619
{
1720
lazy_static! {
1821
// A channel that holds scheduled tasks.
19-
static ref QUEUE: Sender<async_task::Task<()>> = {
20-
let (sender, receiver) = unbounded::<async_task::Task<()>>();
22+
static ref QUEUE: Sender<Task> = {
23+
let (sender, receiver) = unbounded::<Task>();
2124

2225
// Start the executor thread.
2326
thread::spawn(|| {

examples/task-id.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ use lazy_static::lazy_static;
1313
#[derive(Clone, Copy, Debug)]
1414
struct TaskId(usize);
1515

16+
type Task = async_task::Task<TaskId>;
17+
type JoinHandle<T> = async_task::JoinHandle<T, TaskId>;
18+
1619
thread_local! {
1720
/// The ID of the current task.
1821
static TASK_ID: Cell<Option<TaskId>> = Cell::new(None);
@@ -26,15 +29,15 @@ fn task_id() -> Option<TaskId> {
2629
}
2730

2831
/// Spawns a future on the executor.
29-
fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId>
32+
fn spawn<F, R>(future: F) -> JoinHandle<R>
3033
where
3134
F: Future<Output = R> + Send + 'static,
3235
R: Send + 'static,
3336
{
3437
lazy_static! {
3538
// A channel that holds scheduled tasks.
36-
static ref QUEUE: Sender<async_task::Task<TaskId>> = {
37-
let (sender, receiver) = unbounded::<async_task::Task<TaskId>>();
39+
static ref QUEUE: Sender<Task> = {
40+
let (sender, receiver) = unbounded::<Task>();
3841

3942
// Start the executor thread.
4043
thread::spawn(|| {

src/join_handle.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ pub struct JoinHandle<R, T> {
2424
pub(crate) _marker: PhantomData<(R, T)>,
2525
}
2626

27-
unsafe impl<R, T> Send for JoinHandle<R, T> {}
28-
unsafe impl<R, T> Sync for JoinHandle<R, T> {}
27+
unsafe impl<R: Send, T> Send for JoinHandle<R, T> {}
28+
unsafe impl<R: Send, T> Sync for JoinHandle<R, T> {}
2929

3030
impl<R, T> Unpin for JoinHandle<R, T> {}
3131

src/lib.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
//! # let (task, handle) = async_task::spawn(future, schedule, ());
2424
//! ```
2525
//!
26-
//! A task is constructed using the [`spawn`] function:
26+
//! A task is constructed using either [`spawn`] or [`spawn_local`]:
2727
//!
2828
//! ```
2929
//! # let (sender, receiver) = crossbeam::channel::unbounded();
@@ -93,6 +93,7 @@
9393
//! union of the future and its output.
9494
//!
9595
//! [`spawn`]: fn.spawn.html
96+
//! [`spawn_local`]: fn.spawn_local.html
9697
//! [`Task`]: struct.Task.html
9798
//! [`JoinHandle`]: struct.JoinHandle.html
9899
@@ -108,4 +109,4 @@ mod task;
108109
mod utils;
109110

110111
pub use crate::join_handle::JoinHandle;
111-
pub use crate::task::{spawn, Task};
112+
pub use crate::task::{spawn, spawn_local, Task};

src/raw.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,13 @@ impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
9595

9696
impl<F, R, S, T> RawTask<F, R, S, T>
9797
where
98-
F: Future<Output = R> + Send + 'static,
99-
R: Send + 'static,
98+
F: Future<Output = R> + 'static,
10099
S: Fn(Task<T>) + Send + Sync + 'static,
101-
T: Send + 'static,
102100
{
103101
/// Allocates a task with the given `future` and `schedule` function.
104102
///
105103
/// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
106-
pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> {
104+
pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> {
107105
// Compute the layout of the task for allocation. Abort if the computation fails.
108106
let task_layout = abort_on_panic(|| Self::task_layout());
109107

@@ -592,17 +590,13 @@ where
592590
/// A guard that closes the task if polling its future panics.
593591
struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
594592
where
595-
F: Future<Output = R> + Send + 'static,
596-
R: Send + 'static,
597-
S: Fn(Task<T>) + Send + Sync + 'static,
598-
T: Send + 'static;
593+
F: Future<Output = R> + 'static,
594+
S: Fn(Task<T>) + Send + Sync + 'static;
599595

600596
impl<F, R, S, T> Drop for Guard<F, R, S, T>
601597
where
602-
F: Future<Output = R> + Send + 'static,
603-
R: Send + 'static,
598+
F: Future<Output = R> + 'static,
604599
S: Fn(Task<T>) + Send + Sync + 'static,
605-
T: Send + 'static,
606600
{
607601
fn drop(&mut self) {
608602
let raw = self.0;

src/task.rs

+98-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use std::fmt;
22
use std::future::Future;
33
use std::marker::PhantomData;
4-
use std::mem;
4+
use std::mem::{self, ManuallyDrop};
5+
use std::pin::Pin;
56
use std::ptr::NonNull;
7+
use std::task::{Context, Poll};
8+
use std::thread::{self, ThreadId};
69

710
use crate::header::Header;
811
use crate::raw::RawTask;
@@ -16,8 +19,13 @@ use crate::JoinHandle;
1619
/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
1720
/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
1821
///
22+
/// If you need to spawn a future that does not implement [`Send`], consider using the
23+
/// [`spawn_local`] function instead.
24+
///
1925
/// [`Task`]: struct.Task.html
2026
/// [`JoinHandle`]: struct.JoinHandle.html
27+
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
28+
/// [`spawn_local`]: fn.spawn_local.html
2129
///
2230
/// # Examples
2331
///
@@ -43,7 +51,95 @@ where
4351
S: Fn(Task<T>) + Send + Sync + 'static,
4452
T: Send + Sync + 'static,
4553
{
46-
let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule);
54+
let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag);
55+
let task = Task {
56+
raw_task,
57+
_marker: PhantomData,
58+
};
59+
let handle = JoinHandle {
60+
raw_task,
61+
_marker: PhantomData,
62+
};
63+
(task, handle)
64+
}
65+
66+
/// Creates a new local task.
67+
///
68+
/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
69+
/// awaits its result.
70+
///
71+
/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
72+
/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
73+
///
74+
/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
75+
/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
76+
///
77+
/// [`Task`]: struct.Task.html
78+
/// [`JoinHandle`]: struct.JoinHandle.html
79+
/// [`spawn`]: fn.spawn.html
80+
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
81+
///
82+
/// # Examples
83+
///
84+
/// ```
85+
/// use crossbeam::channel;
86+
///
87+
/// // The future inside the task.
88+
/// let future = async {
89+
/// println!("Hello, world!");
90+
/// };
91+
///
92+
/// // If the task gets woken up, it will be sent into this channel.
93+
/// let (s, r) = channel::unbounded();
94+
/// let schedule = move |task| s.send(task).unwrap();
95+
///
96+
/// // Create a task with the future and the schedule function.
97+
/// let (task, handle) = async_task::spawn_local(future, schedule, ());
98+
/// ```
99+
pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
100+
where
101+
F: Future<Output = R> + 'static,
102+
R: 'static,
103+
S: Fn(Task<T>) + Send + Sync + 'static,
104+
T: Send + Sync + 'static,
105+
{
106+
thread_local! {
107+
static ID: ThreadId = thread::current().id();
108+
}
109+
110+
struct Checked<F> {
111+
id: ThreadId,
112+
inner: ManuallyDrop<F>,
113+
}
114+
115+
impl<F> Drop for Checked<F> {
116+
fn drop(&mut self) {
117+
if ID.with(|id| *id) != self.id {
118+
panic!("local task dropped by a thread that didn't spawn it");
119+
}
120+
unsafe {
121+
ManuallyDrop::drop(&mut self.inner);
122+
}
123+
}
124+
}
125+
126+
impl<F: Future> Future for Checked<F> {
127+
type Output = F::Output;
128+
129+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130+
if ID.with(|id| *id) != self.id {
131+
panic!("local task polled by a thread that didn't spawn it");
132+
}
133+
unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
134+
}
135+
}
136+
137+
let future = Checked {
138+
id: ID.with(|id| *id),
139+
inner: ManuallyDrop::new(future),
140+
};
141+
142+
let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag);
47143
let task = Task {
48144
raw_task,
49145
_marker: PhantomData,

0 commit comments

Comments
 (0)