Skip to content

Conversation

@rdhabalia
Copy link
Contributor

@rdhabalia rdhabalia commented Dec 26, 2023

Fixes: apache/pulsar#21451

Motivation

Apache Pulsar provides a messaging queue using a Shared subscription to process unordered messages in parallel using multiple connected consumers. Shared subscription is also commonly used in data processing pipelines where they need to forcefully unsubscribe from the subscription after processing messages on the topic. One example is Pulsar-Storm adapter where Pulsar spout creates Pulsar consumers on a shared subscription for distributed processing and then unsubscribe on the topic. However, PulsarSpout always fails to unsubscribe shared subscriptions and it also doesn't close the pulsar consumers if there is more than one consumer connected to the subscription which causes a leaked subscription and consumer for that application.

Modifications

PR:21687 has introduced force-unsubscribe to support unsubscribe for shared subscription which can be used by Pulsar-Spout to unsubscribe on spout at the end of the cycle.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation


@Override
public void unsubscribe() throws PulsarClientException {
consumer.unsubscribe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to split the actual fix from the version change. eg: first the version update and then using the new API

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pulsar spout fails to unsubscribe and close consumers on a shared subscription

2 participants