Skip to content

Commit

Permalink
fix(volo-thrift): clean waiters while failed waiting for idle connect…
Browse files Browse the repository at this point in the history
…ion (#547)
  • Loading branch information
Millione authored Feb 6, 2025
1 parent 7d9814d commit 5a6018b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-thrift"
version = "0.10.5"
version = "0.10.6"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
53 changes: 51 additions & 2 deletions volo-thrift/src/transport/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ impl<K: Key, T: Poolable + Send + 'static> Pool<K, T> {
})
}

/// Returns a `Checkout` which is a future that resolves if an idle
/// connection becomes available.
pub fn checkout(&self, key: K, waiter: (oneshot::Receiver<T>, usize)) -> Checkout<K, T> {
Checkout {
key,
pool: self.clone(),
waiter,
clean: true,
}
}

pub async fn get<MT>(
&self,
key: K,
Expand All @@ -219,7 +230,7 @@ impl<K: Key, T: Poolable + Send + 'static> Pool<K, T> {
MT: UnaryService<K, Response = T> + Send + 'static + Sync,
MT::Error: Into<crate::ClientError> + Send,
{
let (rx, _waiter_token) = {
let (rx, waiter_token) = {
let entry = 'outer: loop {
let entry = 'inner: {
let mut inner = self.inner.lock().volo_unwrap();
Expand Down Expand Up @@ -286,6 +297,7 @@ impl<K: Key, T: Poolable + Send + 'static> Pool<K, T> {
};

// 3. select waiter and mc return future
let checkout = self.checkout(key.clone(), (rx, waiter_token));
let connector = {
let key = key.clone();
let this = self.clone();
Expand All @@ -309,7 +321,7 @@ impl<K: Key, T: Poolable + Send + 'static> Pool<K, T> {
};

// waiter or make transport finished
match future::select(rx, started::lazy(connector)).await {
match future::select(checkout, started::lazy(connector)).await {
Either::Left((Ok(v), fut)) => {
// check the make transport future has started
if fut.started() {
Expand Down Expand Up @@ -407,6 +419,43 @@ impl<K: Key, T: Poolable> Drop for Connecting<K, T> {
}
}

pub struct Checkout<K: Key, T: Poolable> {
key: K,
pool: Pool<K, T>,
waiter: (oneshot::Receiver<T>, usize),
clean: bool,
}

impl<K: Key, T: Poolable> Future for Checkout<K, T> {
type Output = Result<T, oneshot::error::RecvError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.waiter.0).poll(cx) {
Poll::Ready(v) => {
// Successfully received an idle connection, it means that the corresponding tx is
// already popped from waiters, so no need to remove it again.
self.clean = false;
Poll::Ready(v)
}
Poll::Pending => Poll::Pending,
}
}
}

impl<K: Key, T: Poolable> Drop for Checkout<K, T> {
fn drop(&mut self) {
// if clean needed, remove the corresponding tx from waiters
if self.clean {
tracing::trace!("checkout dropped for {:?}", self.key);
if let Ok(mut pool) = self.pool.inner.lock() {
if let Some(waiters) = pool.waiters.get_mut(&self.key) {
waiters.remove(self.waiter.1);
}
}
}
}
}

struct Idle<T> {
inner: T,
idle_at: Instant,
Expand Down

0 comments on commit 5a6018b

Please sign in to comment.