Skip to content

Commit 34ed89d

Browse files
committed
feat(common): bytes::Buf wrapper that notifies subscribers on EOS
1 parent 9214294 commit 34ed89d

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

src/common/buf.rs

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use hyper::body::Buf;
2+
use std::sync::atomic::{AtomicBool, Ordering};
3+
use std::sync::Arc;
4+
use tokio::sync::Notify;
5+
6+
#[derive(Clone)]
7+
pub struct EosSignaler {
8+
notifier: Arc<Notify>,
9+
}
10+
11+
impl EosSignaler {
12+
fn notify_eos(&self) {
13+
self.notifier.notify_waiters();
14+
}
15+
16+
pub async fn wait_till_eos(self) {
17+
self.notifier.notified().await;
18+
}
19+
}
20+
21+
pub struct AlertOnEos<B> {
22+
inner: B,
23+
signaler: EosSignaler,
24+
// It'd be better if we consumed the signaler, making it inaccessible after notification.
25+
// Unfortunately, that would require something like AtomicOption.
26+
// arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
27+
// One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/),
28+
// but it requires both unsafe and heap allocation, which is not worth it.
29+
has_already_signaled: AtomicBool,
30+
}
31+
32+
impl<B> AlertOnEos<B> {
33+
pub fn new(inner: B) -> (Self, EosSignaler) {
34+
let signal = EosSignaler {
35+
notifier: Arc::new(Notify::new()),
36+
};
37+
let this = Self {
38+
inner,
39+
signaler: signal.clone(),
40+
has_already_signaled: AtomicBool::new(false),
41+
};
42+
(this, signal)
43+
}
44+
}
45+
46+
impl<B: Buf> Buf for AlertOnEos<B> {
47+
fn remaining(&self) -> usize {
48+
self.inner.remaining()
49+
}
50+
51+
fn chunk(&self) -> &[u8] {
52+
self.inner.chunk()
53+
}
54+
55+
fn advance(&mut self, cnt: usize) {
56+
self.inner.advance(cnt);
57+
if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) {
58+
self.signaler.notify_eos();
59+
}
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
66+
}

src/common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ macro_rules! ready {
1010
}
1111

1212
pub(crate) use ready;
13+
pub mod buf;
1314
pub(crate) mod exec;

0 commit comments

Comments
 (0)