Skip to content

Commit 7d512f9

Browse files
committed
WIP: feat: SetHost, Http1RequestTarget and DelayedResposne middlewares
1 parent 85aade4 commit 7d512f9

File tree

7 files changed

+221
-5
lines changed

7 files changed

+221
-5
lines changed

Cargo.toml

+7-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ repository = "https://github.com/hyperium/hyper-util"
99
license = "MIT"
1010
authors = ["Sean McArthur <[email protected]>"]
1111
keywords = ["http", "hyper", "hyperium"]
12-
categories = ["network-programming", "web-programming::http-client", "web-programming::http-server"]
12+
categories = [
13+
"network-programming",
14+
"web-programming::http-client",
15+
"web-programming::http-server",
16+
]
1317
edition = "2018"
1418

1519
publish = false # no accidents while in dev
@@ -35,8 +39,8 @@ pnet_datalink = "0.27.2"
3539
[features]
3640
runtime = []
3741
tcp = []
38-
http1 = []
39-
http2 = []
42+
http1 = ["hyper/http1"]
43+
http2 = ["hyper/http2"]
4044

4145
# internal features used in CI
4246
__internal_happy_eyeballs_tests = []

src/client/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
pub mod client;
55
pub mod connect;
66
pub mod pool;
7+
pub mod services;
+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use std::{
2+
future,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
use futures_channel::oneshot;
8+
use futures_util::Future;
9+
use http::Response;
10+
use hyper::Body;
11+
use pin_project_lite::pin_project;
12+
use tower_service::Service;
13+
14+
pub struct DelayedResponse<S> {
15+
inner: S,
16+
}
17+
18+
impl<S, Req> Service<Req> for DelayedResponse<S>
19+
where
20+
S: Service<Req, Response = Response<Body>>,
21+
{
22+
type Response = S::Response;
23+
type Error = S::Error;
24+
type Future = DelayedResponseFuture<S::Future>;
25+
26+
fn poll_ready(
27+
&mut self,
28+
cx: &mut std::task::Context<'_>,
29+
) -> std::task::Poll<Result<(), Self::Error>> {
30+
self.inner.poll_ready(cx)
31+
}
32+
33+
fn call(&mut self, req: Req) -> Self::Future {
34+
DelayedResponseFuture {
35+
inner: self.inner.call(req),
36+
}
37+
}
38+
}
39+
40+
pin_project! {
41+
struct DelayedResponseFuture<F> {
42+
#[pin]
43+
inner: F,
44+
}
45+
}
46+
47+
impl<F, E> Future for DelayedResponseFuture<F>
48+
where
49+
F: Future<Output = Result<Response<Body>, E>>,
50+
{
51+
type Output = F::Output;
52+
53+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54+
match self.project().inner.poll(cx) {
55+
Poll::Ready(res) => {
56+
let res = res?;
57+
let (delayed_tx, delayed_rx) = oneshot::channel();
58+
res.body_mut().delayed_eof(delayed_rx);
59+
let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
60+
// At this point, `pooled` is dropped, and had a chance
61+
// to insert into the pool (if conn was idle)
62+
drop(delayed_tx);
63+
});
64+
65+
self.executor.execute(on_idle);
66+
Poll::Ready(Ok(res))
67+
}
68+
Poll::Pending => Poll::Pending,
69+
}
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use http::{uri::Scheme, Method, Request, Uri};
2+
use tower_service::Service;
3+
use tracing::warn;
4+
5+
pub struct Http1RequestTarget<S> {
6+
inner: S,
7+
}
8+
9+
impl<S, B> Service<Request<B>> for Http1RequestTarget<S>
10+
where
11+
S: Service<Request<B>>,
12+
{
13+
type Response = S::Response;
14+
type Error = S::Error;
15+
type Future = S::Future;
16+
17+
fn poll_ready(
18+
&mut self,
19+
cx: &mut std::task::Context<'_>,
20+
) -> std::task::Poll<Result<(), Self::Error>> {
21+
self.inner.poll_ready(cx)
22+
}
23+
24+
fn call(&mut self, mut req: Request<B>) -> Self::Future {
25+
// CONNECT always sends authority-form, so check it first...
26+
if req.method() == Method::CONNECT {
27+
authority_form(req.uri_mut());
28+
// TODO: this middleware must be connection pool aware
29+
// } else if pooled.conn_info.is_proxied {
30+
// absolute_form(req.uri_mut());
31+
} else {
32+
origin_form(req.uri_mut());
33+
}
34+
self.inner.call(req)
35+
}
36+
}
37+
38+
fn origin_form(uri: &mut Uri) {
39+
let path = match uri.path_and_query() {
40+
Some(path) if path.as_str() != "/" => {
41+
let mut parts = ::http::uri::Parts::default();
42+
parts.path_and_query = Some(path.clone());
43+
Uri::from_parts(parts).expect("path is valid uri")
44+
}
45+
_none_or_just_slash => {
46+
debug_assert!(Uri::default() == "/");
47+
Uri::default()
48+
}
49+
};
50+
*uri = path
51+
}
52+
53+
fn absolute_form(uri: &mut Uri) {
54+
debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
55+
debug_assert!(
56+
uri.authority().is_some(),
57+
"absolute_form needs an authority"
58+
);
59+
// If the URI is to HTTPS, and the connector claimed to be a proxy,
60+
// then it *should* have tunneled, and so we don't want to send
61+
// absolute-form in that case.
62+
if uri.scheme() == Some(&Scheme::HTTPS) {
63+
origin_form(uri);
64+
}
65+
}
66+
67+
fn authority_form(uri: &mut Uri) {
68+
if let Some(path) = uri.path_and_query() {
69+
// `https://hyper.rs` would parse with `/` path, don't
70+
// annoy people about that...
71+
if path != "/" {
72+
warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
73+
}
74+
}
75+
*uri = match uri.authority() {
76+
Some(auth) => {
77+
let mut parts = ::http::uri::Parts::default();
78+
parts.authority = Some(auth.clone());
79+
Uri::from_parts(parts).expect("authority is valid")
80+
}
81+
None => {
82+
unreachable!("authority_form with relative uri");
83+
}
84+
};
85+
}

src/client/services/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mod delayed_response;
2+
mod http1_request_target;
3+
mod set_host;
4+
5+
pub use delayed_response::DelayedResponse;
6+
pub use http1_request_target::Http1RequestTarget;
7+
pub use set_host::SetHost;

src/client/services/set_host.rs

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use std::task::{Context, Poll};
2+
3+
use http::{header::HOST, uri::Port, HeaderValue, Request, Uri};
4+
use hyper::service::Service;
5+
6+
pub struct SetHost<S> {
7+
inner: S,
8+
}
9+
10+
impl<S, B> Service<Request<B>> for SetHost<S>
11+
where
12+
S: Service<Request<B>>,
13+
{
14+
type Response = S::Response;
15+
type Error = S::Error;
16+
type Future = S::Future;
17+
18+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
19+
self.inner.poll_ready(cx)
20+
}
21+
22+
fn call(&mut self, mut req: Request<B>) -> Self::Future {
23+
let uri = req.uri().clone();
24+
req.headers_mut().entry(HOST).or_insert_with(|| {
25+
let hostname = uri.host().expect("authority implies host");
26+
if let Some(port) = get_non_default_port(&uri) {
27+
let s = format!("{}:{}", hostname, port);
28+
HeaderValue::from_str(&s)
29+
} else {
30+
HeaderValue::from_str(hostname)
31+
}
32+
.expect("uri host is valid header value")
33+
});
34+
self.inner.call(req)
35+
}
36+
}
37+
38+
fn get_non_default_port(uri: &Uri) -> Option<Port<&str>> {
39+
match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
40+
(Some(443), true) => None,
41+
(Some(80), false) => None,
42+
_ => uri.port(),
43+
}
44+
}
45+
46+
fn is_schema_secure(uri: &Uri) -> bool {
47+
uri.scheme_str()
48+
.map(|scheme_str| matches!(scheme_str, "wss" | "https"))
49+
.unwrap_or_default()
50+
}

src/common/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,3 @@ macro_rules! ready {
1212
pub(crate) use ready;
1313
pub(crate) mod exec;
1414
pub(crate) mod never;
15-
16-
pub(crate) use never::Never;

0 commit comments

Comments
 (0)