Skip to content

Pubsub: Exactly once delivery duplicates messages when there is a backlog #2555

@IkueWatanabe-Aidea

Description

@IkueWatanabe-Aidea

Environment details

  1. API: pubsub
  2. OS type and version: macOS Sequoia 15.5
  3. Java version: OpenJDK 21.0.6 (JBR-21.0.6+9-895.109-nomod)
  4. version(s): google-cloud-pubsub-1.141.2

Steps to reproduce

  1. Create a subscriber with below settings
    1. maxAckExtensionPeriod: 60 mins
    2. maxDurationPerAckExtension: 5 mins
    3. minDurationPerAckExtension: 1 min
    4. parallelPullCount: 2
    5. maxOutstandingElementCount: 10
  2. Start the subscriber and sleep for 1 - 15 mins when a message is received. Maximum 10 messages can be processed in parallel due to maxOutstandingElementCount setting.
  3. publish 15 messages (more than MaxOutstandingElementCount)
  4. Error "INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed." is logged multiple times.
  5. Duplicate messages with same message Id are delivered

Code example

    @PostConstruct
    public void initialize() {
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionName);

        // Create message receiver with exactly-once delivery support
        MessageReceiverWithAckResponse receiver = new MessageReceiverWithAckResponse() {
            @Override
            public void receiveMessage(PubsubMessage message, AckReplyConsumerWithResponse consumer) {
                sleepForRandomTime(message, consumer);
            }
        };

        // Create flow control settings to limit concurrent message processing
        FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder()
                .setMaxOutstandingElementCount(10L) // Maximum 10 messages being processed at once
                .setMaxOutstandingRequestBytes(1000000L) // 1MB max outstanding bytes
                .build();

        // Create subscriber WITH native ack extension and enhanced monitoring
        this.subscriber = Subscriber.newBuilder(subscription, receiver)
                .setMaxAckExtensionPeriod(org.threeten.bp.Duration.ofMinutes(60)) // 60 minutes max extension
                .setMaxDurationPerAckExtension(org.threeten.bp.Duration.ofMinutes(5)) // Extend by 5 min each time
                .setMinDurationPerAckExtension(org.threeten.bp.Duration.ofMinutes(1)) // Min 1 min extension
                .setParallelPullCount(2) // Conservative parallel pulls
                .setFlowControlSettings(flowControlSettings) // Limit to 10 concurrent messages
                .build();

        // Start the subscriber
        subscriber.startAsync().awaitRunning();
}

private void sleepForRandomTime(PubsubMessage pubsubMessage, AckReplyConsumerWithResponse consumer) {
        String messageId = pubsubMessage.getMessageId();
        long receiveTime = System.currentTimeMillis();

        // Check for duplicate message processing
        Long previousProcessTime = processedMessages.putIfAbsent(messageId, receiveTime);
        if (previousProcessTime != null) {
            logger.warn("DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: {} was already processed at {}. " +
                    "Current receive time: {}. Acknowledging duplicate to prevent redelivery.",
                    messageId, new java.util.Date(previousProcessTime), new java.util.Date(receiveTime));

            ApiFuture<AckResponse> ackFuture = consumer.ack();
            return;
        }

        int processingTimeSeconds = random.nextInt(840) + 60; // 60 to 900 seconds (1-15 minutes)
        logger.info("Received native Pub/Sub message ID: {}. Sleep for {} seconds",
                messageId, processingTimeSeconds);

        try {
            Thread.sleep(processingTimeSeconds * 1000L);

            // Use ack() for exactly-once delivery - returns ApiFuture<AckResponse>
            ApiFuture<AckResponse> ackFuture = consumer.ack();
            logger.info("Successfully processed and acknowledged native Pub/Sub message ID: {}",
                    messageId);
        } catch (Exception e) {
            logger.error("Exception occurred while processing native Pub/Sub message ID: {}", messageId, e);
            processedMessages.remove(messageId); // Remove from processed messages on exception

            // Use nack() for exactly-once delivery - returns ApiFuture<AckResponse>
            ApiFuture<AckResponse> nackFuture = consumer.nack();
            logger.warn("Exception handling: nacked native Pub/Sub message ID: {}.", messageId);
        }
    }

Output log

INVALID_ARGUMENT error log is omitted except the 1st one.

2025-09-16 17:13:08 [Gax-3] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16313525043018429. Sleep for 824 seconds
2025-09-16 17:13:08 [Gax-4] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 15389502673124437. Sleep for 827 seconds
2025-09-16 17:13:08 [Gax-5] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16315255480206003. Sleep for 596 seconds
2025-09-16 17:13:08 [Gax-6] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16309022234542561. Sleep for 610 seconds
2025-09-16 17:13:08 [Gax-7] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16311609492452975. Sleep for 446 seconds
2025-09-16 17:13:10 [Gax-8] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16308533929060216. Sleep for 744 seconds
2025-09-16 17:13:10 [Gax-9] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16315272577144587. Sleep for 793 seconds
2025-09-16 17:13:10 [Gax-10] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 15393608130839305. Sleep for 526 seconds
2025-09-16 17:13:10 [Gax-11] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16312247916552181. Sleep for 547 seconds
2025-09-16 17:13:10 [Gax-12] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16307219734642660. Sleep for 254 seconds
2025-09-16 17:17:24 [Gax-12] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16307219734642660
2025-09-16 17:17:33 [Subscriber-EOD-CallbackExecutor-14] WARN  c.g.c.p.v.StreamingSubscriberConnection - failed to send operations
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1307)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1070)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:819)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:651)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:621)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:569)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.GrpcLoggingInterceptor$1$1.onClose(GrpcLoggingInterceptor.java:98)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:565)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:733)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:714)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:545)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:328)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:309)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1095)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:619)
	at java.base/java.lang.Thread.run(Thread.java:1447)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed.
	at io.grpc.Status.asRuntimeException(Status.java:532)
	... 21 common frames omitted
2025-09-16 17:20:34 [Gax-7] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16311609492452975
2025-09-16 17:20:34 [Gax-12] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16308461489469056. Sleep for 815 seconds
2025-09-16 17:20:34 [Gax-7] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16307308491278046. Sleep for 430 seconds
2025-09-16 17:21:56 [Gax-10] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 15393608130839305
2025-09-16 17:21:56 [Subscriber-EOD-CallbackExecutor-15] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:22:17 [Gax-11] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16312247916552181
2025-09-16 17:22:17 [Gax-10] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16309022239040559. Sleep for 734 seconds
2025-09-16 17:22:17 [Gax-11] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 15389052601735206. Sleep for 688 seconds
2025-09-16 17:23:04 [Gax-5] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16315255480206003
2025-09-16 17:23:04 [Subscriber-EOD-CallbackExecutor-16] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:04 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:04 [Gax-5] INFO  c.e.p.s.NativeMessageSubscriber - Received native Pub/Sub message ID: 16312609047030287. Sleep for 350 seconds
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-16] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-16] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-16] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:05 [Subscriber-EOD-CallbackExecutor-16] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:23:18 [Gax-6] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16309022234542561
2025-09-16 17:23:18 [Gax-6] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16308533929060216 was already processed at Tue Sep 16 17:13:10 JST 2025. Current receive time: Tue Sep 16 17:23:18 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:23:18 [Gax-6] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16315255480206003 was already processed at Tue Sep 16 17:13:08 JST 2025. Current receive time: Tue Sep 16 17:23:18 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:23:19 [Gax-6] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 15393608130839305 was already processed at Tue Sep 16 17:13:10 JST 2025. Current receive time: Tue Sep 16 17:23:19 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:23:19 [Gax-6] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16309022234542561 was already processed at Tue Sep 16 17:13:08 JST 2025. Current receive time: Tue Sep 16 17:23:19 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:23:19 [Gax-6] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16312609047030287 was already processed at Tue Sep 16 17:23:04 JST 2025. Current receive time: Tue Sep 16 17:23:19 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:23:19 [Gax-6] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16309022239040559 was already processed at Tue Sep 16 17:22:17 JST 2025. Current receive time: Tue Sep 16 17:23:19 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:23:19 [Gax-6] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16307219734642660 was already processed at Tue Sep 16 17:13:10 JST 2025. Current receive time: Tue Sep 16 17:23:19 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16308533929060216
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 15389052601735206 was already processed at Tue Sep 16 17:22:17 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16315272577144587 was already processed at Tue Sep 16 17:13:10 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 15389502673124437 was already processed at Tue Sep 16 17:13:08 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16313525043018429 was already processed at Tue Sep 16 17:13:08 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16311609492452975 was already processed at Tue Sep 16 17:13:08 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16307308491278046 was already processed at Tue Sep 16 17:20:34 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16308461489469056 was already processed at Tue Sep 16 17:20:34 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 15389052601735206 was already processed at Tue Sep 16 17:22:17 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:25:34 [Gax-8] WARN  c.e.p.s.NativeMessageSubscriber - DUPLICATE MESSAGE DETECTED! Pub/Sub message ID: 16312247916552181 was already processed at Tue Sep 16 17:13:10 JST 2025. Current receive time: Tue Sep 16 17:25:34 JST 2025. Acknowledging duplicate to prevent redelivery.
2025-09-16 17:26:23 [Gax-9] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16315272577144587
2025-09-16 17:26:39 [Subscriber-EOD-CallbackExecutor-14] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend
2025-09-16 17:26:52 [Gax-3] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16313525043018429
2025-09-16 17:26:55 [Gax-4] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 15389502673124437
2025-09-16 17:27:44 [Gax-7] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16307308491278046
2025-09-16 17:28:54 [Gax-5] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16312609047030287
2025-09-16 17:33:45 [Gax-11] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 15389052601735206
2025-09-16 17:33:46 [Subscriber-EOD-CallbackExecutor-19] WARN  c.g.c.p.v.StreamingSubscriberConnection - failed to send operations
2025-09-16 17:34:09 [Gax-12] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16308461489469056
2025-09-16 17:34:31 [Gax-10] INFO  c.e.p.s.NativeMessageSubscriber - Successfully processed and acknowledged native Pub/Sub message ID: 16309022239040559
2025-09-16 17:34:32 [Subscriber-EOD-CallbackExecutor-19] INFO  c.g.c.p.v.StreamingSubscriberConnection - Permanent error invalid ack id message, will not resend

Any additional information below

The duplicate message issue occurs only when there is a backlog. When I publish messages less than MaxOutstandingElementCount, this issue seems not happening.

Thanks!

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/java-pubsub API.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions