Skip to content

Commit 0d35aca

Browse files
poorbarcodelhotari
authored andcommitted
[fix][broker] Stop to retry to read entries if the replicator has terminated (#24880)
(cherry picked from commit 313ae97)
1 parent 37c024c commit 0d35aca

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,9 @@ private AvailablePermits getRateLimiterAvailablePermits(int availablePermits) {
274274
}
275275

276276
protected void readMoreEntries() {
277+
if (state.equals(Terminated) || state.equals(Terminating)) {
278+
return;
279+
}
277280
// Acquire permits and check state of producer.
278281
InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
279282
if (newInFlightTask == null) {
@@ -953,12 +956,19 @@ public void beforeTerminate() {
953956
protected boolean hasPendingRead() {
954957
synchronized (inFlightTasks) {
955958
for (InFlightTask task : inFlightTasks) {
956-
if (task.readPos != null && task.entries == null) {
959+
// The purpose of calling "getReadPos" instead of calling "readPos" is to make the test
960+
// "testReplicationTaskStoppedAfterTopicClosed" can counter the calling times of "readMoreEntries".
961+
if (task.getReadPos() != null && task.entries == null) {
957962
// Skip the current reading if there is a pending cursor reading.
958963
return true;
959964
}
960965
}
961966
}
962967
return false;
963968
}
969+
970+
@VisibleForTesting
971+
String getReplicatorId() {
972+
return replicatorId;
973+
}
964974
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import static org.mockito.Mockito.doAnswer;
2122
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.spy;
24+
import static org.testng.Assert.assertEquals;
25+
import static org.testng.Assert.assertTrue;
2226
import java.util.ArrayList;
2327
import java.util.Arrays;
2428
import java.util.Collections;
2529
import java.util.LinkedList;
2630
import java.util.List;
27-
31+
import java.util.concurrent.atomic.AtomicInteger;
2832
import lombok.extern.slf4j.Slf4j;
2933
import org.apache.bookkeeper.mledger.Entry;
3034
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -34,6 +38,8 @@
3438
import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
3539
import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
3640
import org.apache.pulsar.client.api.MessageId;
41+
import org.mockito.invocation.InvocationOnMock;
42+
import org.mockito.stubbing.Answer;
3743
import org.testng.Assert;
3844
import org.testng.annotations.AfterClass;
3945
import org.testng.annotations.BeforeClass;
@@ -66,6 +72,40 @@ private void createTopics() throws Exception {
6672
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
6773
}
6874

75+
@Test
76+
public void testReplicationTaskStoppedAfterTopicClosed() throws Exception {
77+
// Close a topic, which has enabled replication.
78+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
79+
admin1.topics().createNonPartitionedTopic(topicName);
80+
waitReplicatorStarted(topicName);
81+
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false)
82+
.join().get();
83+
PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators().get(cluster2);
84+
admin1.topics().unload(topicName);
85+
86+
// Inject a task into the "inFlightTasks" to calculate how many times the method "replicator.readMoreEntries"
87+
// has been called.
88+
AtomicInteger counter = new AtomicInteger();
89+
InFlightTask injectedTask = new InFlightTask(PositionImpl.get(1, 1), 1, replicator.getReplicatorId());
90+
injectedTask.setEntries(Collections.emptyList());
91+
InFlightTask spyTask = spy(injectedTask);
92+
replicator.inFlightTasks.add(spyTask);
93+
doAnswer(new Answer() {
94+
@Override
95+
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
96+
counter.incrementAndGet();
97+
return invocationOnMock.callRealMethod();
98+
}
99+
}).when(spyTask).getReadPos();
100+
101+
// Verify: there is no scheduled task to retry to read entries to replicate.
102+
// Call "readMoreEntries" to make the issue happen.
103+
replicator.readMoreEntries();
104+
Thread.sleep(PersistentTopic.MESSAGE_RATE_BACKOFF_MS * 10);
105+
assertEquals(replicator.getState(), AbstractReplicator.State.Terminated);
106+
assertTrue(counter.get() <= 1);
107+
}
108+
69109
@Test
70110
public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
71111
log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");

0 commit comments

Comments
 (0)