Skip to content

Applications can signal and await subscriber shutdown #5024

@dbolduc

Description

@dbolduc

Related to #4869, #4868

Those are more concerned with the shutdown behavior. This issue is about how the application can signal and await a shutdown.

Background

Messages can get "frozen" for 10 minutes if a program exits before the subscriber can flush pending nacks. And there may be unnecessary redeliveries if the subscriber does not flush pending acks. Letting an application await shutdown can avoid/mitigate these problems.

It is also somewhat difficult for an application to signal a shutdown. They have to drop(stream), which means they need to own the stream. It is more convenient if applications can spawn a background task, and signal shutdown from it.

Design

We should add a ShutdownToken, with an accessor for it on the MessageStream:

#[derive(Clone)]
pub struct ShutdownToken {
  inner: CancellationToken,
  fut: Shared<BoxFuture<'static, ()>>,
}

impl ShutdownToken {
  // I think it is convenient to shutdown and await.
  pub async fn shutdown(&self) {
    self.inner.cancel();
    self.fut.await
  }

  // We could also add this to just signal. But initially, I would not add this.
  pub fn signal_shutdown(&self) {
    self.inner.cancel();
  }
}

impl MessageStream {
  pub fn shutdown_token(&self) -> ShutdownToken {
    self.shutdown.clone()
  }
}

Work

  • Hold lease loop as shared future
  • Add CancellationToken to MessageStream
  • Introduce ShutdownToken
  • Simplify any examples, integration tests.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.type: cleanupAn internal cleanup or hygiene concern.

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions