Skip to content

Commit 54945b2

Browse files
committed
modify test
Signed-off-by: guojialiang <[email protected]>
1 parent e067c6b commit 54945b2

File tree

4 files changed

+40
-6
lines changed

4 files changed

+40
-6
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

+37-2
Original file line numberDiff line numberDiff line change
@@ -185,16 +185,51 @@ public void testPublishCheckPointFail() throws Exception {
185185
}
186186

187187
public void testPrimaryStopped_ReplicaPromoted() throws Exception {
188-
final String primary = internalCluster().startDataOnlyNode();
188+
Settings mockNodeSetting = Settings.builder()
189+
.put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), TimeValue.timeValueSeconds(0))
190+
.build();
191+
192+
final String primary = internalCluster().startDataOnlyNode(mockNodeSetting);
189193
createIndex(INDEX_NAME);
190194
ensureYellowAndNoInitializingShards(INDEX_NAME);
191-
final String replica = internalCluster().startDataOnlyNode();
195+
final String replica = internalCluster().startDataOnlyNode(mockNodeSetting);
192196
ensureGreen(INDEX_NAME);
193197

198+
MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance(
199+
TransportService.class,
200+
replica
201+
));
202+
AtomicBoolean mockReplicaReceivePublishCheckpointException = new AtomicBoolean(true);
203+
replicaTransportService.addRequestHandlingBehavior(
204+
PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX,
205+
(handler, request, channel, task) -> {
206+
if (mockReplicaReceivePublishCheckpointException.get()) {
207+
logger.info("mock remote transport exception");
208+
throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException());
209+
}
210+
logger.info("replica receive publish checkpoint request");
211+
handler.messageReceived(request, channel, task);
212+
}
213+
);
214+
194215
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
195216
refresh(INDEX_NAME);
217+
logger.info("ensure publish checkpoint request can be process");
218+
mockReplicaReceivePublishCheckpointException.set(false);
196219

197220
waitForSearchableDocs(1, primary, replica);
221+
replicaTransportService.clearAllRules();
222+
223+
assertAcked(
224+
client().admin()
225+
.indices()
226+
.prepareUpdateSettings(INDEX_NAME)
227+
.setSettings(
228+
Settings.builder()
229+
.put(IndexSettings.INDEX_PUBLISH_CHECKPOINT_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3))
230+
.put(IndexSettings.INDEX_LAG_TIME_BEFORE_RESEND_CHECKPOINT_SETTING.getKey(), TimeValue.timeValueSeconds(3))
231+
)
232+
);
198233

199234
// index another doc but don't refresh, we will ensure this is searchable once replica is promoted.
200235
client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

server/src/main/java/org/opensearch/index/IndexSettings.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -792,8 +792,8 @@ public static IndexMergePolicy fromString(String text) {
792792

793793
public static final Setting<TimeValue> INDEX_LAG_TIME_BEFORE_RESEND_CHECKPOINT_SETTING = Setting.timeSetting(
794794
"index.lag_time_before_resend_checkpoint",
795-
TimeValue.timeValueSeconds(1),
796-
TimeValue.timeValueSeconds(1),
795+
TimeValue.timeValueSeconds(0),
796+
TimeValue.timeValueSeconds(0),
797797
Property.Dynamic,
798798
Property.IndexScope
799799
);

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1305,7 +1305,7 @@ public synchronized boolean hasLaggingReplicas() {
13051305
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
13061306
&& shouldSkipReplicationTimer(allocationId) == false
13071307
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)
1308-
&& cps.checkpointTimers.get(latestReplicationCheckpoint).time() > indexSettings.getLagTimeBeforeResendCheckpoint()
1308+
&& cps.checkpointTimers.get(latestReplicationCheckpoint).time() >= indexSettings.getLagTimeBeforeResendCheckpoint()
13091309
.millis()) {
13101310
hasLaggingReplicas = true;
13111311
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

-1
Original file line numberDiff line numberDiff line change
@@ -4720,7 +4720,6 @@ public void scheduledPublishCheckpoint() {
47204720
assert indexSettings.isSegRepEnabledOrRemoteNode();
47214721
assert isPrimaryMode();
47224722
if (replicationTracker.hasLaggingReplicas()) {
4723-
logger.trace("publish checkpoint {} to replicas", getLatestReplicationCheckpoint());
47244723
checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
47254724
}
47264725
}

0 commit comments

Comments
 (0)