Skip to content

Commit 042e104

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix][broker] Stop to retry to read entries if the replicator has terminated (apache#24880)
(cherry picked from commit 313ae97) (cherry picked from commit 0d35aca)
1 parent 82e4977 commit 042e104

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,9 @@ private AvailablePermits getRateLimiterAvailablePermits(int availablePermits) {
275275
}
276276

277277
protected void readMoreEntries() {
278+
if (state.equals(Terminated) || state.equals(Terminating)) {
279+
return;
280+
}
278281
// Acquire permits and check state of producer.
279282
InFlightTask newInFlightTask = acquirePermitsIfNotFetchingSchema();
280283
if (newInFlightTask == null) {
@@ -957,12 +960,19 @@ public void beforeTerminate() {
957960
protected boolean hasPendingRead() {
958961
synchronized (inFlightTasks) {
959962
for (InFlightTask task : inFlightTasks) {
960-
if (task.readPos != null && task.entries == null) {
963+
// The purpose of calling "getReadPos" instead of calling "readPos" is to make the test
964+
// "testReplicationTaskStoppedAfterTopicClosed" can counter the calling times of "readMoreEntries".
965+
if (task.getReadPos() != null && task.entries == null) {
961966
// Skip the current reading if there is a pending cursor reading.
962967
return true;
963968
}
964969
}
965970
}
966971
return false;
967972
}
973+
974+
@VisibleForTesting
975+
String getReplicatorId() {
976+
return replicatorId;
977+
}
968978
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentReplicatorInflightTaskTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import static org.mockito.Mockito.doAnswer;
2122
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.spy;
24+
import static org.testng.Assert.assertEquals;
25+
import static org.testng.Assert.assertTrue;
2226
import java.util.ArrayList;
2327
import java.util.Arrays;
2428
import java.util.Collections;
2529
import java.util.LinkedList;
2630
import java.util.List;
27-
31+
import java.util.concurrent.atomic.AtomicInteger;
2832
import lombok.extern.slf4j.Slf4j;
2933
import org.apache.bookkeeper.mledger.Entry;
3034
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -34,6 +38,8 @@
3438
import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
3539
import org.apache.pulsar.broker.service.persistent.PersistentReplicator.InFlightTask;
3640
import org.apache.pulsar.client.api.MessageId;
41+
import org.mockito.invocation.InvocationOnMock;
42+
import org.mockito.stubbing.Answer;
3743
import org.testng.Assert;
3844
import org.testng.annotations.AfterClass;
3945
import org.testng.annotations.BeforeClass;
@@ -66,6 +72,40 @@ private void createTopics() throws Exception {
6672
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
6773
}
6874

75+
@Test
76+
public void testReplicationTaskStoppedAfterTopicClosed() throws Exception {
77+
// Close a topic, which has enabled replication.
78+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
79+
admin1.topics().createNonPartitionedTopic(topicName);
80+
waitReplicatorStarted(topicName);
81+
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false)
82+
.join().get();
83+
PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators().get(cluster2);
84+
admin1.topics().unload(topicName);
85+
86+
// Inject a task into the "inFlightTasks" to calculate how many times the method "replicator.readMoreEntries"
87+
// has been called.
88+
AtomicInteger counter = new AtomicInteger();
89+
InFlightTask injectedTask = new InFlightTask(PositionImpl.get(1, 1), 1, replicator.getReplicatorId());
90+
injectedTask.setEntries(Collections.emptyList());
91+
InFlightTask spyTask = spy(injectedTask);
92+
replicator.inFlightTasks.add(spyTask);
93+
doAnswer(new Answer() {
94+
@Override
95+
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
96+
counter.incrementAndGet();
97+
return invocationOnMock.callRealMethod();
98+
}
99+
}).when(spyTask).getReadPos();
100+
101+
// Verify: there is no scheduled task to retry to read entries to replicate.
102+
// Call "readMoreEntries" to make the issue happen.
103+
replicator.readMoreEntries();
104+
Thread.sleep(PersistentTopic.MESSAGE_RATE_BACKOFF_MS * 10);
105+
assertEquals(replicator.getState(), AbstractReplicator.State.Terminated);
106+
assertTrue(counter.get() <= 1);
107+
}
108+
69109
@Test
70110
public void testCreateOrRecycleInFlightTaskIntoQueue() throws Exception {
71111
log.info("Starting testCreateOrRecycleInFlightTaskIntoQueue");

0 commit comments

Comments
 (0)