Skip to content

Commit 9f2e461

Browse files
authored
[fix][client] PIP-475: make async producer survive regular-to-scalable migration (apache#25882)
1 parent ef27cd2 commit 9f2e461

2 files changed

Lines changed: 112 additions & 37 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,62 @@ public void testV5ProducerSurvivesMigrationAndAllDataIsConsumable() throws Excep
9696
assertEquals(received, sent, "every pre- and post-migration message must be consumable");
9797
}
9898

99+
@Test
100+
public void testV5AsyncProducerSurvivesMigration() throws Exception {
101+
// Same as the producer-survives-migration case, but driving the *async* producer API
102+
// (producer.async()...send()). Async sends issued right after migration must ride
103+
// through the synthetic→real-DAG transition — the per-segment producer for a
104+
// just-terminated partition fails, and the send must retry onto an active child
105+
// rather than fail the user's future.
106+
String topic = baseName("e2e-async");
107+
admin.topics().createPartitionedTopic(topic, 2);
108+
109+
@Cleanup
110+
Producer<String> producer = v5Client.newProducer(Schema.string())
111+
.topic("persistent://" + topic)
112+
.create();
113+
114+
java.util.List<java.util.concurrent.CompletableFuture<MessageId>> sends =
115+
new java.util.ArrayList<>();
116+
Set<String> sent = new HashSet<>();
117+
for (int i = 0; i < 20; i++) {
118+
String v = "pre-" + i;
119+
sends.add(producer.async().newMessage().key("k-" + i).value(v).send());
120+
sent.add(v);
121+
}
122+
123+
admin.scalableTopics().migrateToScalable(topic, false);
124+
125+
// Issue the post-migration batch via async sends without awaiting in between, so the
126+
// retry-across-transition path is exercised.
127+
for (int i = 0; i < 20; i++) {
128+
String v = "post-" + i;
129+
sends.add(producer.async().newMessage().key("k-" + i).value(v).send());
130+
sent.add(v);
131+
}
132+
// Every async send must eventually complete (none fail across the migration boundary).
133+
java.util.concurrent.CompletableFuture
134+
.allOf(sends.toArray(new java.util.concurrent.CompletableFuture[0]))
135+
.get(60, java.util.concurrent.TimeUnit.SECONDS);
136+
137+
@Cleanup
138+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
139+
.topic("persistent://" + topic)
140+
.subscriptionName("e2e-async-sub")
141+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
142+
.subscribe();
143+
144+
Set<String> received = new HashSet<>();
145+
for (int i = 0; i < 40; i++) {
146+
org.apache.pulsar.client.api.v5.Message<String> m = consumer.receive(Duration.ofSeconds(10));
147+
assertNotNull(m, "expected 40 messages, missing after " + received.size());
148+
received.add(m.value());
149+
consumer.acknowledge(m.id());
150+
}
151+
assertEquals(received, sent,
152+
"every async pre- and post-migration message must be consumable");
153+
}
154+
99155
@Test
100156
public void testV4ProducerLockedOutAfterMigration() throws Exception {
101157
// After migration the old topic is terminated, so a legacy v4 producer can no longer

pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -293,61 +293,80 @@ private void dispatchSendAttempt(
293293
}
294294
final long routedSegmentId = segmentId;
295295

296-
appendToDispatchChain(routedSegmentId, producer -> {
297-
var ackFuture = buildV4Message(producer, key, value, properties,
298-
eventTime, sequenceId, deliverAfter, deliverAt, replicationClusters, txn)
299-
.sendAsync();
300-
ackFuture.whenComplete((v4MsgId, ex) -> {
301-
if (ex == null) {
302-
userFuture.complete(new MessageIdV5(v4MsgId, routedSegmentId));
303-
return;
304-
}
305-
Throwable cause = ex instanceof java.util.concurrent.CompletionException
306-
? ex.getCause() : ex;
307-
boolean segmentSealed = cause
308-
instanceof org.apache.pulsar.client.api.PulsarClientException
309-
.TopicTerminatedException
310-
|| cause instanceof org.apache.pulsar.client.api.PulsarClientException
311-
.AlreadyClosedException;
312-
if (segmentSealed && attempt < 3) {
313-
log.info().attr("segmentId", routedSegmentId)
314-
.attr("attempt", attempt + 1).log("Segment sealed, retrying");
315-
segmentProducers.remove(routedSegmentId);
316-
dispatchChains.remove(routedSegmentId);
317-
CompletableFuture.delayedExecutor(
318-
100L * (attempt + 1),
319-
java.util.concurrent.TimeUnit.MILLISECONDS)
320-
.execute(() -> dispatchSendAttempt(userFuture, key, value, properties,
321-
eventTime, sequenceId, deliverAfter, deliverAt,
322-
replicationClusters, txn, attempt + 1));
323-
} else {
324-
userFuture.completeExceptionally(ex);
325-
}
326-
});
327-
}, userFuture);
296+
// Re-dispatch this message on the next attempt. Used when the target segment is gone
297+
// — sealed by a split/merge or terminated by a regular-to-scalable migration — and
298+
// the DAG watch is expected to refresh the layout shortly so routeMessage lands on an
299+
// active child.
300+
Runnable retry = () -> {
301+
segmentProducers.remove(routedSegmentId);
302+
dispatchChains.remove(routedSegmentId);
303+
CompletableFuture.delayedExecutor(
304+
Math.min(100L * (attempt + 1), SEND_RETRY_MAX_BACKOFF_MS),
305+
java.util.concurrent.TimeUnit.MILLISECONDS)
306+
.execute(() -> dispatchSendAttempt(userFuture, key, value, properties,
307+
eventTime, sequenceId, deliverAfter, deliverAt,
308+
replicationClusters, txn, attempt + 1));
309+
};
310+
311+
appendToDispatchChain(routedSegmentId,
312+
producer -> {
313+
var ackFuture = buildV4Message(producer, key, value, properties,
314+
eventTime, sequenceId, deliverAfter, deliverAt, replicationClusters, txn)
315+
.sendAsync();
316+
ackFuture.whenComplete((v4MsgId, ex) -> {
317+
if (ex == null) {
318+
userFuture.complete(new MessageIdV5(v4MsgId, routedSegmentId));
319+
} else {
320+
// Failure from the v4 send (e.g. the segment sealed mid-flight).
321+
handleAsyncSegmentFailure(userFuture, routedSegmentId, attempt, ex, retry);
322+
}
323+
});
324+
},
325+
// Failure while (re)creating the per-segment producer — e.g. the partition was
326+
// terminated by a migration between routing and creation.
327+
createEx -> handleAsyncSegmentFailure(userFuture, routedSegmentId, attempt, createEx, retry));
328+
}
329+
330+
/**
331+
* Decide whether an async send failure should be retried. If the target segment is gone
332+
* (a split/merge seal or a migration termination) and the retry budget isn't exhausted,
333+
* run {@code retry}; otherwise fail the user-visible future. Covers both the v4 send
334+
* failure and the per-segment producer-creation failure.
335+
*/
336+
private void handleAsyncSegmentFailure(CompletableFuture<MessageIdV5> userFuture, long segmentId,
337+
int attempt, Throwable ex, Runnable retry) {
338+
Throwable cause = ex instanceof java.util.concurrent.CompletionException ? ex.getCause() : ex;
339+
if (isSegmentGoneError(cause) && attempt < SEND_RETRY_MAX_ATTEMPTS) {
340+
log.info().attr("segmentId", segmentId).attr("attempt", attempt + 1)
341+
.log("Target segment gone, retrying async send after layout update");
342+
retry.run();
343+
} else {
344+
userFuture.completeExceptionally(ex);
345+
}
328346
}
329347

330348
/**
331349
* Append a dispatch step to the per-segment chain. The chain head is the
332350
* segment-producer-creation future; subsequent links complete as soon as
333351
* their {@code dispatchOp} returns (which calls v4 {@code sendAsync} — a
334352
* fast queue insert), so dispatch order strictly mirrors call order.
335-
* If the chain itself fails (e.g., segment producer creation failed), the
336-
* user-visible future is failed too.
353+
* If the chain itself fails (e.g., segment producer creation failed),
354+
* {@code onCreateFailure} is invoked so the caller can retry (when the segment
355+
* is merely gone) or fail the user-visible future.
337356
*/
338357
private void appendToDispatchChain(long segmentId,
339358
Consumer<org.apache.pulsar.client.api.Producer<T>> dispatchOp,
340-
CompletableFuture<MessageIdV5> userFuture) {
359+
Consumer<Throwable> onCreateFailure) {
341360
synchronized (dispatchLock) {
342361
var prev = dispatchChains.computeIfAbsent(segmentId,
343362
id -> getOrCreateSegmentProducerAsync(id));
344363
var next = prev.thenApply(producer -> {
345364
dispatchOp.accept(producer);
346365
return producer;
347366
});
348-
// If the chain link itself faults (creation failure), surface it.
367+
// If the chain link itself faults (creation failure), hand it to the caller.
349368
next.exceptionally(ex -> {
350-
userFuture.completeExceptionally(ex);
369+
onCreateFailure.accept(ex);
351370
return null;
352371
});
353372
dispatchChains.put(segmentId, next);

0 commit comments

Comments
 (0)