1
1
use hyper:: body:: Buf ;
2
- use std:: sync:: atomic:: { AtomicBool , Ordering } ;
3
2
use std:: sync:: Arc ;
4
3
use tokio:: sync:: Notify ;
5
4
@@ -24,22 +23,15 @@ impl EosSignaler {
24
23
/// enough times to advance to the end of the buffer (so that [`Buf::has_remaining`] afterwards returns `0`).
25
24
pub struct NotifyOnEos < B > {
26
25
inner : B ,
27
- notifier : Arc < Notify > ,
28
- // It'd be better if we consumed the signaler, making it inaccessible after notification.
29
- // Unfortunately, that would require something like AtomicOption.
30
- // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
31
- // One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/),
32
- // but it requires both unsafe and heap allocation, which is not worth it.
33
- has_already_signaled : AtomicBool ,
26
+ notifier : Option < Arc < Notify > > ,
34
27
}
35
28
36
29
impl < B > NotifyOnEos < B > {
37
30
pub fn new ( inner : B ) -> ( Self , EosSignaler ) {
38
31
let notifier = Arc :: new ( Notify :: new ( ) ) ;
39
32
let this = Self {
40
33
inner,
41
- notifier : notifier. clone ( ) ,
42
- has_already_signaled : AtomicBool :: new ( false ) ,
34
+ notifier : Some ( notifier. clone ( ) ) ,
43
35
} ;
44
36
let signal = EosSignaler { notifier } ;
45
37
( this, signal)
@@ -57,11 +49,14 @@ impl<B: Buf> Buf for NotifyOnEos<B> {
57
49
58
50
fn advance ( & mut self , cnt : usize ) {
59
51
self . inner . advance ( cnt) ;
60
- if !self . inner . has_remaining ( ) && !self . has_already_signaled . swap ( true , Ordering :: AcqRel ) {
61
- // tokio::sync::Notify has private method `notify_all` that, once stabilized,
62
- // would allow us to make `EosSignaler` Cloneable with better ergonomics
63
- // to await EOS from multiple places.
64
- self . notifier . notify_one ( ) ;
52
+ if !self . inner . has_remaining ( ) {
53
+ // consume the notifier to ensure we only notify once
54
+ if let Some ( notifier) = self . notifier . take ( ) {
55
+ // tokio::sync::Notify has private method `notify_all` that, once stabilized,
56
+ // would allow us to make `EosSignaler` Cloneable with better ergonomics
57
+ // to await EOS from multiple places.
58
+ notifier. notify_one ( ) ;
59
+ }
65
60
}
66
61
}
67
62
}
0 commit comments