Skip to content

Commit 3ce2c8a

Browse files
committed
refactor(body): move channel based incoming body implementation to ChanBody type
1 parent f6e33a7 commit 3ce2c8a

File tree

2 files changed

+209
-183
lines changed

2 files changed

+209
-183
lines changed

src/body/incoming/channel.rs

+183
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use std::fmt;
2+
use std::future::Future;
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
5+
6+
use bytes::Bytes;
7+
use futures_channel::{mpsc, oneshot};
8+
use futures_util::ready;
9+
use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
10+
use http::HeaderMap;
11+
use http_body::{Frame, SizeHint};
12+
13+
use crate::body::DecodedLength;
14+
use crate::common::watch;
15+
16+
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
17+
type TrailersSender = oneshot::Sender<HeaderMap>;
18+
19+
const WANT_PENDING: usize = 1;
20+
const WANT_READY: usize = 2;
21+
22+
/// channel based incoming body
23+
pub(crate) struct ChanBody {
24+
content_length: DecodedLength,
25+
want_tx: watch::Sender,
26+
data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
27+
trailers_rx: oneshot::Receiver<HeaderMap>,
28+
}
29+
30+
impl ChanBody {
31+
pub(crate) fn new(content_length: DecodedLength, wanter: bool) -> (Sender, Self) {
32+
let (data_tx, data_rx) = mpsc::channel(0);
33+
let (trailers_tx, trailers_rx) = oneshot::channel();
34+
35+
// If wanter is true, `Sender::poll_ready()` won't becoming ready
36+
// until the `Body` has been polled for data once.
37+
let want = if wanter { WANT_PENDING } else { WANT_READY };
38+
39+
let (want_tx, want_rx) = watch::channel(want);
40+
41+
let tx = Sender {
42+
want_rx,
43+
data_tx,
44+
trailers_tx: Some(trailers_tx),
45+
};
46+
let rx = Self {
47+
content_length,
48+
want_tx,
49+
data_rx,
50+
trailers_rx,
51+
};
52+
53+
(tx, rx)
54+
}
55+
56+
pub(crate) fn poll_frame(
57+
&mut self,
58+
cx: &mut Context<'_>,
59+
) -> Poll<Option<Result<Frame<Bytes>, crate::Error>>> {
60+
let Self {
61+
content_length: ref mut len,
62+
ref mut data_rx,
63+
ref mut want_tx,
64+
ref mut trailers_rx,
65+
} = self;
66+
67+
want_tx.send(WANT_READY);
68+
69+
if !data_rx.is_terminated() {
70+
if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
71+
len.sub_if(chunk.len() as u64);
72+
return Poll::Ready(Some(Ok(Frame::data(chunk))));
73+
}
74+
}
75+
76+
// check trailers after data is terminated
77+
match ready!(Pin::new(trailers_rx).poll(cx)) {
78+
Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
79+
Err(_) => Poll::Ready(None),
80+
}
81+
}
82+
83+
pub(crate) fn is_end_stream(&self) -> bool {
84+
self.content_length == DecodedLength::ZERO
85+
}
86+
87+
pub(crate) fn size_hint(&self) -> SizeHint {
88+
super::opt_len(self.content_length)
89+
}
90+
}
91+
92+
/// A sender half created through [`Body::channel()`].
93+
///
94+
/// Useful when wanting to stream chunks from another thread.
95+
///
96+
/// ## Body Closing
97+
///
98+
/// Note that the request body will always be closed normally when the sender is dropped (meaning
99+
/// that the empty terminating chunk will be sent to the remote). If you desire to close the
100+
/// connection with an incomplete response (e.g. in the case of an error during asynchronous
101+
/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
102+
///
103+
/// [`Body::channel()`]: struct.Body.html#method.channel
104+
/// [`Sender::abort()`]: struct.Sender.html#method.abort
105+
#[must_use = "Sender does nothing unless sent on"]
106+
pub(crate) struct Sender {
107+
want_rx: watch::Receiver,
108+
data_tx: BodySender,
109+
trailers_tx: Option<TrailersSender>,
110+
}
111+
112+
impl Sender {
113+
/// Check to see if this `Sender` can send more data.
114+
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
115+
// Check if the receiver end has tried polling for the body yet
116+
ready!(self.poll_want(cx)?);
117+
self.data_tx
118+
.poll_ready(cx)
119+
.map_err(|_| crate::Error::new_closed())
120+
}
121+
122+
pub(crate) fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
123+
match self.want_rx.load(cx) {
124+
WANT_READY => Poll::Ready(Ok(())),
125+
WANT_PENDING => Poll::Pending,
126+
watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
127+
unexpected => unreachable!("want_rx value: {}", unexpected),
128+
}
129+
}
130+
131+
/// Send trailers on trailers channel.
132+
#[allow(unused)]
133+
pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
134+
let tx = match self.trailers_tx.take() {
135+
Some(tx) => tx,
136+
None => return Err(crate::Error::new_closed()),
137+
};
138+
tx.send(trailers).map_err(|_| crate::Error::new_closed())
139+
}
140+
141+
/// Try to send data on this channel.
142+
///
143+
/// # Errors
144+
///
145+
/// Returns `Err(Bytes)` if the channel could not (currently) accept
146+
/// another `Bytes`.
147+
///
148+
/// # Note
149+
///
150+
/// This is mostly useful for when trying to send from some other thread
151+
/// that doesn't have an async context. If in an async context, prefer
152+
/// `send_data()` instead.
153+
pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
154+
self.data_tx
155+
.try_send(Ok(chunk))
156+
.map_err(|err| err.into_inner().expect("just sent Ok"))
157+
}
158+
159+
pub(crate) fn send_error(&mut self, err: crate::Error) {
160+
let _ = self
161+
.data_tx
162+
// clone so the send works even if buffer is full
163+
.clone()
164+
.try_send(Err(err));
165+
}
166+
}
167+
168+
impl fmt::Debug for Sender {
169+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170+
#[derive(Debug)]
171+
struct Open;
172+
#[derive(Debug)]
173+
struct Closed;
174+
175+
let mut builder = f.debug_tuple("Sender");
176+
match self.want_rx.peek() {
177+
watch::CLOSED => builder.field(&Closed),
178+
_ => builder.field(&Open),
179+
};
180+
181+
builder.finish()
182+
}
183+
}

0 commit comments

Comments
 (0)