Description
This is the reproduction:
use std::{
io::Write,
pin::Pin,
task::{Context, Poll},
};
use futures::prelude::*;
pub async fn yield_now() {
/// Yield implementation
struct YieldNow {
yielded: bool,
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
return Poll::Ready(());
}
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
YieldNow { yielded: false }.await
}
#[tokio::main]
async fn main() {
for _ in 0..200 {
print!(".");
std::io::stdout().flush().unwrap();
for _ in 0..10000 {
test().await;
}
}
println!();
}
async fn test() {
let f1 = yield_now().shared();
let f2 = f1.clone();
let x1 = tokio::spawn(async move {
f1.now_or_never();
});
let x2 = tokio::spawn(async move {
f2.await;
});
x1.await.ok();
x2.await.ok();
}
It manages to hang up eventually on most runs, both in debug and release mode.
The example is crafted to expose a weakness spotted during code review of .shared()
. The problematic part is here:
futures-rs/futures-util/src/future/future/shared.rs
Lines 288 to 293 in aa1f5c7
This code is simply insufficient to ensure the current context
’s waker is actually ever woken up. The current context’s waker was stored into the inner slab previously in this line
However, a concurrently running poll
could have already been within the call to the inner poll
before the new waker got registered, i.e. here:
As soon as the call to poll
on the inner future starts, it could already be calling the waker, thus doing the waking before the new waker that we care about was registered and put into the slab in the first place, because the waking could happen before the state is put back into IDLE
.
The waking could happen before the state is put back into IDLE
in particular because it is okay for a .poll
call to do the waking immediately, before it returns, i.e the way the yield_now
function in the example always does (copied from the old yield_now
implementation of tokio
).