From ff267f1ef54120c6a074d63028c748a89769291f Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Sun, 16 Feb 2025 16:43:24 +0100 Subject: [PATCH] refactor: reduce dependency on `futures-util` --- src/body/incoming.rs | 12 ++++++------ src/client/conn/http1.rs | 7 +++---- src/client/conn/http2.rs | 5 ++--- src/proto/h1/conn.rs | 5 ++--- src/proto/h1/decode.rs | 9 ++++----- src/proto/h1/dispatch.rs | 3 +-- src/proto/h1/io.rs | 9 ++++----- src/proto/h2/client.rs | 3 +-- src/proto/h2/mod.rs | 3 +-- src/proto/h2/server.rs | 3 +-- src/server/conn/http1.rs | 5 ++--- src/server/conn/http2.rs | 3 +-- tests/client.rs | 3 ++- 13 files changed, 30 insertions(+), 40 deletions(-) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index dcfb71d53a..4c600f798b 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -2,16 +2,16 @@ use std::fmt; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] use std::future::Future; use std::pin::Pin; +#[cfg(all( + any(feature = "http1", feature = "http2"), + any(feature = "client", feature = "server") +))] +use std::task::ready; use std::task::{Context, Poll}; use bytes::Bytes; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] use futures_channel::{mpsc, oneshot}; -#[cfg(all( - any(feature = "http1", feature = "http2"), - any(feature = "client", feature = "server") -))] -use futures_util::ready; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] @@ -368,7 +368,7 @@ impl Sender { #[cfg(test)] async fn ready(&mut self) -> crate::Result<()> { - futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + std::future::poll_fn(|cx| self.poll_ready(cx)).await } /// Send data on data channel when it is ready. diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index ecfe6eb8fb..2ad60706e5 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -4,11 +4,10 @@ use std::error::Error as StdError; use std::fmt; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use crate::rt::{Read, Write}; use bytes::Bytes; -use futures_util::ready; use http::{Request, Response}; use httparse::ParserConfig; @@ -92,7 +91,7 @@ where /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. pub async fn without_shutdown(self) -> crate::Result> { let mut conn = Some(self); - futures_util::future::poll_fn(move |cx| -> Poll>> { + std::future::poll_fn(move |cx| -> Poll>> { ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; Poll::Ready(Ok(conn.take().unwrap().into_parts())) }) @@ -148,7 +147,7 @@ impl SendRequest { /// /// If the associated connection is closed, this returns an Error. pub async fn ready(&mut self) -> crate::Result<()> { - futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + std::future::poll_fn(|cx| self.poll_ready(cx)).await } /// Checks if the connection is currently ready to send a request. diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 3db28957b6..1f8e51309a 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -6,11 +6,10 @@ use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::time::Duration; use crate::rt::{Read, Write}; -use futures_util::ready; use http::{Request, Response}; use super::super::dispatch::{self, TrySendError}; @@ -99,7 +98,7 @@ impl SendRequest { /// /// If the associated connection is closed, this returns an Error. pub async fn ready(&mut self) -> crate::Result<()> { - futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + std::future::poll_fn(|cx| self.poll_ready(cx)).await } /// Checks if the connection is currently ready to send a request. diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index bea8faa221..fdee36ee05 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -4,13 +4,12 @@ use std::future::Future; use std::io; use std::marker::{PhantomData, Unpin}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; #[cfg(feature = "server")] use std::time::{Duration, Instant}; use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; -use futures_util::ready; use http::header::{HeaderValue, CONNECTION, TE}; use http::{HeaderMap, Method, Version}; use http_body::Frame; @@ -1174,7 +1173,7 @@ mod tests { .unwrap(); b.iter(|| { - rt.block_on(futures_util::future::poll_fn(|cx| { + rt.block_on(std::future::poll_fn(|cx| { match conn.poll_read_head(cx) { Poll::Ready(Some(Ok(x))) => { ::test::black_box(&x); diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index dd293e1228..f01d1cfd65 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -1,10 +1,9 @@ use std::error::Error as StdError; use std::fmt; use std::io; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use bytes::{BufMut, Bytes, BytesMut}; -use futures_util::ready; use http::{HeaderMap, HeaderName, HeaderValue}; use http_body::Frame; @@ -244,7 +243,7 @@ impl Decoder { #[cfg(test)] async fn decode_fut(&mut self, body: &mut R) -> Result, io::Error> { - futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await + std::future::poll_fn(move |cx| self.decode(cx, body)).await } } @@ -746,7 +745,7 @@ mod tests { let mut ext_cnt = 0; let mut trailers_cnt = 0; loop { - let result = futures_util::future::poll_fn(|cx| { + let result = std::future::poll_fn(|cx| { state.step( cx, rdr, @@ -776,7 +775,7 @@ mod tests { let mut ext_cnt = 0; let mut trailers_cnt = 0; loop { - let result = futures_util::future::poll_fn(|cx| { + let result = std::future::poll_fn(|cx| { state.step( cx, rdr, diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 4d921a3b83..8b4b4fdfaa 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -3,12 +3,11 @@ use std::{ future::Future, marker::Unpin, pin::Pin, - task::{Context, Poll}, + task::{ready, Context, Poll}, }; use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; -use futures_util::ready; use http::Request; use super::{Http1Transaction, Wants}; diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index d5afba683a..e01d113263 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -2,11 +2,10 @@ use std::cmp; use std::fmt; use std::io::{self, IoSlice}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use crate::rt::{Read, ReadBuf, Write}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures_util::ready; use super::{Http1Transaction, ParseContext, ParsedMessage}; use crate::common::buf::BufList; @@ -325,7 +324,7 @@ where #[cfg(test)] fn flush(&mut self) -> impl std::future::Future> + '_ { - futures_util::future::poll_fn(move |cx| self.poll_flush(cx)) + std::future::poll_fn(move |cx| self.poll_flush(cx)) } } @@ -668,7 +667,7 @@ mod tests { // // First, let's just check that the Mock would normally return an // // error on an unexpected write, even if the buffer is empty... // let mut mock = Mock::new().build(); - // futures_util::future::poll_fn(|cx| { + // std::future::poll_fn(|cx| { // Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[])) // }) // .await @@ -700,7 +699,7 @@ mod tests { // We expect a `parse` to be not ready, and so can't await it directly. // Rather, this `poll_fn` will wrap the `Poll` result. - futures_util::future::poll_fn(|cx| { + std::future::poll_fn(|cx| { let parse_ctx = ParseContext { cached_headers: &mut None, req_method: &mut None, diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 5e9641e408..114a1c9e95 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -3,7 +3,7 @@ use std::{ future::Future, marker::PhantomData, pin::Pin, - task::{Context, Poll}, + task::{ready, Context, Poll}, time::Duration, }; @@ -12,7 +12,6 @@ use bytes::Bytes; use futures_channel::mpsc::{Receiver, Sender}; use futures_channel::{mpsc, oneshot}; use futures_util::future::{Either, FusedFuture, FutureExt as _}; -use futures_util::ready; use futures_util::stream::{StreamExt as _, StreamFuture}; use h2::client::{Builder, Connection, SendRequest}; use h2::SendStream; diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index adb6de87f9..6e6697bb5d 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -3,10 +3,9 @@ use std::future::Future; use std::io::{Cursor, IoSlice}; use std::mem; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use bytes::{Buf, Bytes}; -use futures_util::ready; use h2::{Reason, RecvStream, SendStream}; use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE}; use http::HeaderMap; diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index a8a20dd68e..579417d3f7 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -1,11 +1,10 @@ use std::error::Error as StdError; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::time::Duration; use bytes::Bytes; -use futures_util::ready; use h2::server::{Connection, Handshake, SendResponse}; use h2::{Reason, RecvStream}; use http::{Method, Request}; diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index af703018c5..64d39b4f52 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -5,13 +5,12 @@ use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::time::Duration; use crate::rt::{Read, Write}; use crate::upgrade::Upgraded; use bytes::Bytes; -use futures_util::ready; use crate::body::{Body, Incoming as IncomingBody}; use crate::proto; @@ -179,7 +178,7 @@ where /// This errors if the underlying connection protocol is not HTTP/1. pub fn without_shutdown(self) -> impl Future>> { let mut zelf = Some(self); - futures_util::future::poll_fn(move |cx| { + std::future::poll_fn(move |cx| { ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?; Poll::Ready(Ok(zelf.take().unwrap().into_parts())) }) diff --git a/src/server/conn/http2.rs b/src/server/conn/http2.rs index e0d61c13a6..88511b022a 100644 --- a/src/server/conn/http2.rs +++ b/src/server/conn/http2.rs @@ -5,11 +5,10 @@ use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use std::time::Duration; use crate::rt::{Read, Write}; -use futures_util::ready; use pin_project_lite::pin_project; use crate::body::{Body, Incoming as IncomingBody}; diff --git a/tests/client.rs b/tests/client.rs index 1f1a456f95..a89b4ddda7 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1486,6 +1486,7 @@ test! { mod conn { use std::error::Error; + use std::future::poll_fn; use std::io::{self, Read, Write}; use std::net::{SocketAddr, TcpListener}; use std::pin::Pin; @@ -1495,7 +1496,7 @@ mod conn { use bytes::{Buf, Bytes}; use futures_channel::{mpsc, oneshot}; - use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; + use futures_util::future::{self, FutureExt, TryFutureExt}; use http_body_util::{BodyExt, Empty, Full, StreamBody}; use hyper::rt::Timer; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};