From b99624ff6486687c127c354b8c13d7cbb4d1c27c Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Fri, 19 Aug 2022 21:03:28 +0200 Subject: [PATCH 1/6] feat(common): bytes::Buf wrapper that notifies subscribers on EOS --- src/common/buf.rs | 66 +++++++++++++++++++++++++++++++++++++++++++++++ src/common/mod.rs | 1 + 2 files changed, 67 insertions(+) create mode 100644 src/common/buf.rs diff --git a/src/common/buf.rs b/src/common/buf.rs new file mode 100644 index 0000000..3afd4cd --- /dev/null +++ b/src/common/buf.rs @@ -0,0 +1,66 @@ +use hyper::body::Buf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; + +#[derive(Clone)] +pub struct EosSignaler { + notifier: Arc, +} + +impl EosSignaler { + fn notify_eos(&self) { + self.notifier.notify_waiters(); + } + + pub async fn wait_till_eos(self) { + self.notifier.notified().await; + } +} + +pub struct AlertOnEos { + inner: B, + signaler: EosSignaler, + // It'd be better if we consumed the signaler, making it inaccessible after notification. + // Unfortunately, that would require something like AtomicOption. + // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed. + // One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/), + // but it requires both unsafe and heap allocation, which is not worth it. + has_already_signaled: AtomicBool, +} + +impl AlertOnEos { + pub fn new(inner: B) -> (Self, EosSignaler) { + let signal = EosSignaler { + notifier: Arc::new(Notify::new()), + }; + let this = Self { + inner, + signaler: signal.clone(), + has_already_signaled: AtomicBool::new(false), + }; + (this, signal) + } +} + +impl Buf for AlertOnEos { + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt); + if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) { + self.signaler.notify_eos(); + } + } +} + +#[cfg(test)] +mod tests { + +} \ No newline at end of file diff --git a/src/common/mod.rs b/src/common/mod.rs index 52b9917..3b7fbe8 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -10,6 +10,7 @@ macro_rules! ready { } pub(crate) use ready; +pub mod buf; pub(crate) mod exec; pub(crate) mod never; From 03f7ca1aee058b799d14b57709bfe91f03cb1443 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Fri, 19 Aug 2022 21:42:08 +0200 Subject: [PATCH 2/6] test(common): add basic test for AlertOnEos --- src/common/buf.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/common/buf.rs b/src/common/buf.rs index 3afd4cd..4819b34 100644 --- a/src/common/buf.rs +++ b/src/common/buf.rs @@ -62,5 +62,15 @@ impl Buf for AlertOnEos { #[cfg(test)] mod tests { + use crate::common::buf::AlertOnEos; + use hyper::body::Bytes; + use std::time::Duration; -} \ No newline at end of file + #[tokio::test] + async fn test_get_notified() { + let buf = Bytes::from_static(b""); + let (_buf, signaler) = AlertOnEos::new(buf); + let result = tokio::time::timeout(Duration::from_secs(1), signaler.wait_till_eos()).await; + assert_eq!(result, Ok(())); + } +} From e00f6f148e53ce383459753e82839a44bf306eb4 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Sat, 20 Aug 2022 19:19:06 +0200 Subject: [PATCH 3/6] fix(common): replace usage of Notify::notify_waiters with Notify::notify_one --- src/common/buf.rs | 59 ++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/src/common/buf.rs b/src/common/buf.rs index 4819b34..2c43ead 100644 --- a/src/common/buf.rs +++ b/src/common/buf.rs @@ -3,24 +3,28 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::sync::Notify; -#[derive(Clone)] +/// Signaler returned as part of `NotifyOnEos::new` that can be polled to receive information, +/// when the buffer gets advanced to the end. +// Cannot be Clone due to usage of `Notify::notify_one` in `NotifyOnEos::advance`, +// revisit once `Notify::notify_all` stabilizes. pub struct EosSignaler { notifier: Arc, } impl EosSignaler { - fn notify_eos(&self) { - self.notifier.notify_waiters(); - } - pub async fn wait_till_eos(self) { self.notifier.notified().await; } } -pub struct AlertOnEos { +/// Wrapper for `bytes::Buf` that returns a `EosSignaler` that can be polled to receive information, +/// when the buffer gets advanced to the end. +/// +/// NOTE: For the notification to work, caller must ensure that `Buf::advance` gets called +/// enough times to advance to the end of the buffer (so that `Buf::has_remaining` afterwards returns `0`). +pub struct NotifyOnEos { inner: B, - signaler: EosSignaler, + notifier: Arc, // It'd be better if we consumed the signaler, making it inaccessible after notification. // Unfortunately, that would require something like AtomicOption. // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed. @@ -29,21 +33,20 @@ pub struct AlertOnEos { has_already_signaled: AtomicBool, } -impl AlertOnEos { +impl NotifyOnEos { pub fn new(inner: B) -> (Self, EosSignaler) { - let signal = EosSignaler { - notifier: Arc::new(Notify::new()), - }; + let notifier = Arc::new(Notify::new()); let this = Self { inner, - signaler: signal.clone(), + notifier: notifier.clone(), has_already_signaled: AtomicBool::new(false), }; + let signal = EosSignaler { notifier }; (this, signal) } } -impl Buf for AlertOnEos { +impl Buf for NotifyOnEos { fn remaining(&self) -> usize { self.inner.remaining() } @@ -55,22 +58,36 @@ impl Buf for AlertOnEos { fn advance(&mut self, cnt: usize) { self.inner.advance(cnt); if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) { - self.signaler.notify_eos(); + // tokio::sync::Notify has private method `notify_all` that, once stabilized, + // would allow us to make `EosSignaler` Cloneable with better ergonomics + // to await EOS from multiple places. + self.notifier.notify_one(); } } } #[cfg(test)] mod tests { - use crate::common::buf::AlertOnEos; - use hyper::body::Bytes; + use crate::common::buf::NotifyOnEos; + use hyper::body::{Buf, Bytes}; use std::time::Duration; #[tokio::test] - async fn test_get_notified() { - let buf = Bytes::from_static(b""); - let (_buf, signaler) = AlertOnEos::new(buf); - let result = tokio::time::timeout(Duration::from_secs(1), signaler.wait_till_eos()).await; - assert_eq!(result, Ok(())); + async fn test_get_notified_immediately() { + let buf = Bytes::from_static(b"abc"); + let (mut buf, signaler) = NotifyOnEos::new(buf); + buf.advance(3); + signaler.wait_till_eos().await; + } + + #[tokio::test] + async fn test_get_notified_after_1ms() { + let buf = Bytes::from_static(b"abc"); + let (mut buf, signaler) = NotifyOnEos::new(buf); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(1)).await; + buf.advance(3); + }); + signaler.wait_till_eos().await; } } From 5cd09b5dce88d5f48417f99fc95a75bd653a0e74 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Sat, 20 Aug 2022 19:25:09 +0200 Subject: [PATCH 4/6] chore: disable miri for tests --- src/common/buf.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/common/buf.rs b/src/common/buf.rs index 2c43ead..673059e 100644 --- a/src/common/buf.rs +++ b/src/common/buf.rs @@ -72,6 +72,7 @@ mod tests { use hyper::body::{Buf, Bytes}; use std::time::Duration; + #[cfg(not(miri))] #[tokio::test] async fn test_get_notified_immediately() { let buf = Bytes::from_static(b"abc"); @@ -80,6 +81,7 @@ mod tests { signaler.wait_till_eos().await; } + #[cfg(not(miri))] #[tokio::test] async fn test_get_notified_after_1ms() { let buf = Bytes::from_static(b"abc"); From 129d90115da3d451fabae913e72d7d29177cb749 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Sat, 20 Aug 2022 19:33:27 +0200 Subject: [PATCH 5/6] style: make structs/methods linkable in doc-strings --- src/common/buf.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/buf.rs b/src/common/buf.rs index 673059e..d7565e4 100644 --- a/src/common/buf.rs +++ b/src/common/buf.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::sync::Notify; -/// Signaler returned as part of `NotifyOnEos::new` that can be polled to receive information, +/// Signaler returned as part of [`NotifyOnEos::new`] that can be polled to receive information, /// when the buffer gets advanced to the end. // Cannot be Clone due to usage of `Notify::notify_one` in `NotifyOnEos::advance`, // revisit once `Notify::notify_all` stabilizes. @@ -17,11 +17,11 @@ impl EosSignaler { } } -/// Wrapper for `bytes::Buf` that returns a `EosSignaler` that can be polled to receive information, +/// Wrapper for [`Buf`] that returns an [`EosSignaler`] that can be polled to receive information, /// when the buffer gets advanced to the end. /// -/// NOTE: For the notification to work, caller must ensure that `Buf::advance` gets called -/// enough times to advance to the end of the buffer (so that `Buf::has_remaining` afterwards returns `0`). +/// NOTE: For the notification to work, caller must ensure that [`Buf::advance`] gets called +/// enough times to advance to the end of the buffer (so that [`Buf::has_remaining`] afterwards returns `0`). pub struct NotifyOnEos { inner: B, notifier: Arc, From 9150c0f6a28f18e179ac75fc0ab0ea2049770b65 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Tue, 23 Aug 2022 20:31:47 +0200 Subject: [PATCH 6/6] refactor(common): make notifier an Option instead of struct + boolean We want to ensure that the notifier is only called once, when writing that earier I tried to force the notifier into an Arc>, completely forgetting about the ownership-safe and easier Option>. --- src/common/buf.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/common/buf.rs b/src/common/buf.rs index d7565e4..59aaa5c 100644 --- a/src/common/buf.rs +++ b/src/common/buf.rs @@ -1,5 +1,4 @@ use hyper::body::Buf; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::sync::Notify; @@ -24,13 +23,7 @@ impl EosSignaler { /// enough times to advance to the end of the buffer (so that [`Buf::has_remaining`] afterwards returns `0`). pub struct NotifyOnEos { inner: B, - notifier: Arc, - // It'd be better if we consumed the signaler, making it inaccessible after notification. - // Unfortunately, that would require something like AtomicOption. - // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed. - // One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/), - // but it requires both unsafe and heap allocation, which is not worth it. - has_already_signaled: AtomicBool, + notifier: Option>, } impl NotifyOnEos { @@ -38,8 +31,7 @@ impl NotifyOnEos { let notifier = Arc::new(Notify::new()); let this = Self { inner, - notifier: notifier.clone(), - has_already_signaled: AtomicBool::new(false), + notifier: Some(notifier.clone()), }; let signal = EosSignaler { notifier }; (this, signal) @@ -57,11 +49,14 @@ impl Buf for NotifyOnEos { fn advance(&mut self, cnt: usize) { self.inner.advance(cnt); - if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) { - // tokio::sync::Notify has private method `notify_all` that, once stabilized, - // would allow us to make `EosSignaler` Cloneable with better ergonomics - // to await EOS from multiple places. - self.notifier.notify_one(); + if !self.inner.has_remaining() { + // consume the notifier to ensure we only notify once + if let Some(notifier) = self.notifier.take() { + // tokio::sync::Notify has private method `notify_all` that, once stabilized, + // would allow us to make `EosSignaler` Cloneable with better ergonomics + // to await EOS from multiple places. + notifier.notify_one(); + } } } } @@ -83,6 +78,7 @@ mod tests { #[cfg(not(miri))] #[tokio::test] + /// Test against the foot-gun of using [`tokio::sync::Notify::notify_waiters`] instead of `notify_one`. async fn test_get_notified_after_1ms() { let buf = Bytes::from_static(b"abc"); let (mut buf, signaler) = NotifyOnEos::new(buf);