-
Notifications
You must be signed in to change notification settings - Fork 99
Description
Summary
When publishing to pubsub with message ordering enabled, the library overrides the gRPC retry settings and forces unbounded retry. The application using this library is then unable to detect failures. The workaround is to add timeout when waiting for the returned future, but this makes the retry/timeout policy scattered and surprising to configure.
This is the code that enforces infinite retry.
https://github.com/googleapis/java-pubsub/blob/main/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L168 line 174 to line 180
The exact reason why the infinite retry does not eventually become successful is unclear. We have multiple random occurrences where applications running in a GKE cluster would stop publishing. In some instances, we had even waited for the application to retry for up to a week without success. Unfortunately we do not have a reliable way to trigger this type of failure and we do not have the low level gRPC logs when the failure happened.
My suggestion is to allow the library user to set a sane upper bound for retry, and let failures be propagated through the onFailure
callback.
Environment details
- API: Pubsub
- OS type and version: Ubuntu 22.04
- Java version: temurin-17.0.8
- version(s): google-cloud-pubsub-1.123.12
Steps to reproduce
- Enable message ordering in publisher
- Publish messages
- Simulate failure, such as network disconnect
Code example
Based on sample publish code in documentation.
Publisher publisher =
Publisher.newBuilder(topicName)
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.setEnableMessageOrdering(true)
.build();
try {
Map<String, String> messages = new LinkedHashMap<String, String>();
messages.put("message1", "key1");
// auto-generate many more messages here ...
for (Map.Entry<String, String> entry : messages.entrySet()) {
ByteString data = ByteString.copyFromUtf8(entry.getKey());
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
// onFailure is never called, because the publisher will retry indefinitely
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + pubsubMessage.getData());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData() + " : " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
Stack trace
No stack trace because of infinite retry