Skip to content

Commit 9f7d3b6

Browse files
authored
using the routing allocation to cancel existing recoveries (#16468)
Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
1 parent 0fcb3ab commit 9f7d3b6

File tree

3 files changed

+212
-10
lines changed

3 files changed

+212
-10
lines changed

Diff for: server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java

+179
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.action.support.master.AcknowledgedResponse;
4141
import org.opensearch.cluster.ClusterState;
4242
import org.opensearch.cluster.ClusterStateUpdateTask;
43+
import org.opensearch.cluster.action.shard.ShardStateAction;
4344
import org.opensearch.cluster.block.ClusterBlocks;
4445
import org.opensearch.cluster.metadata.IndexMetadata;
4546
import org.opensearch.cluster.metadata.MappingMetadata;
@@ -48,29 +49,39 @@
4849
import org.opensearch.cluster.node.DiscoveryNodes;
4950
import org.opensearch.cluster.routing.RoutingTable;
5051
import org.opensearch.cluster.routing.ShardRouting;
52+
import org.opensearch.cluster.routing.ShardRoutingState;
5153
import org.opensearch.cluster.routing.allocation.AllocationService;
54+
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
5255
import org.opensearch.cluster.service.ClusterService;
5356
import org.opensearch.common.action.ActionFuture;
5457
import org.opensearch.common.settings.Settings;
5558
import org.opensearch.common.unit.TimeValue;
5659
import org.opensearch.core.action.ActionResponse;
5760
import org.opensearch.core.index.Index;
61+
import org.opensearch.core.transport.TransportResponse;
5862
import org.opensearch.discovery.Discovery;
5963
import org.opensearch.index.IndexService;
6064
import org.opensearch.index.mapper.DocumentMapper;
6165
import org.opensearch.index.mapper.MapperService;
6266
import org.opensearch.indices.IndicesService;
67+
import org.opensearch.plugins.Plugin;
6368
import org.opensearch.test.OpenSearchIntegTestCase;
6469
import org.opensearch.test.disruption.BlockClusterStateProcessing;
70+
import org.opensearch.test.transport.MockTransportService;
71+
import org.opensearch.transport.TransportService;
6572
import org.opensearch.transport.TransportSettings;
6673

74+
import java.util.Collection;
6775
import java.util.List;
6876
import java.util.Map;
77+
import java.util.Optional;
6978
import java.util.concurrent.TimeUnit;
79+
import java.util.concurrent.atomic.AtomicBoolean;
7080

7181
import static java.util.Collections.emptyMap;
7282
import static java.util.Collections.emptySet;
7383
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
84+
import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME;
7485
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
7586
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
7687
import static org.hamcrest.Matchers.equalTo;
@@ -409,4 +420,172 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
409420
assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
410421
}
411422

423+
public void testDisassociateNodesWhileShardInit() throws InterruptedException {
424+
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(
425+
Settings.builder()
426+
.put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s")
427+
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
428+
.build()
429+
);
430+
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
431+
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
432+
String node2 = internalCluster().startDataOnlyNode(
433+
Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()
434+
);
435+
436+
final ClusterService clusterService = internalCluster().clusterService(clusterManagerName);
437+
blockShardStartedResponse(clusterManagerName, clusterService);
438+
439+
final String index = "index";
440+
441+
// create index with 3 primary and 1 replica each
442+
prepareCreate(index).setSettings(
443+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
444+
// .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")
445+
).get();
446+
ensureGreen(index);
447+
448+
// close to have some unassigned started shards shards..
449+
client().admin().indices().prepareClose(index).get();
450+
451+
// block so that replicas are always in init and not started
452+
blockReplicaStart.set(true);
453+
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName);
454+
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
455+
@Override
456+
public ClusterState execute(ClusterState currentState) {
457+
ClusterState.Builder builder = ClusterState.builder(currentState);
458+
// open index
459+
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
460+
.state(IndexMetadata.State.OPEN)
461+
.build();
462+
463+
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
464+
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
465+
ClusterState updatedState = builder.build();
466+
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
467+
routingTable.addAsRecovery(updatedState.metadata().index(index));
468+
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
469+
ClusterState state = allocationService.reroute(updatedState, "reroute");
470+
return state;
471+
}
472+
473+
@Override
474+
public void onFailure(String source, Exception e) {
475+
logger.error(e.getMessage(), e);
476+
}
477+
});
478+
479+
ensureYellow(index);
480+
assertTrue(waitUntil(() -> {
481+
ClusterState state = clusterService.state();
482+
return state.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3;
483+
484+
}));
485+
486+
logger.info("Initializing shards");
487+
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
488+
489+
// trigger 2nd reroute after shard in initialized
490+
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
491+
@Override
492+
public ClusterState execute(ClusterState currentState) {
493+
return allocationService.reroute(currentState, "reroute");
494+
}
495+
496+
@Override
497+
public void onFailure(String source, Exception e) {}
498+
});
499+
500+
ensureYellow(index);
501+
assertTrue(waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3));
502+
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
503+
@Override
504+
public ClusterState execute(ClusterState currentState) throws Exception {
505+
// remove the primary node of replica shard which is in init
506+
ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0);
507+
ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId());
508+
509+
ClusterState.Builder builder = ClusterState.builder(currentState);
510+
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId()));
511+
currentState = builder.build();
512+
logger.info("removed the node {}", primaryShard.currentNodeId());
513+
logger.info("shard {}", next);
514+
ClusterState state = allocationService.disassociateDeadNodes(currentState, true, "reroute");
515+
return state;
516+
}
517+
518+
@Override
519+
public void onFailure(String source, Exception e) {}
520+
});
521+
assertTrue(waitUntil(() -> {
522+
ClusterState state = clusterService.state();
523+
logger.info("current state {} ", state);
524+
return clusterService.state().nodes().getSize() == 3;
525+
526+
}));
527+
528+
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
529+
blockReplicaStart.set(false);
530+
531+
clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
532+
@Override
533+
public ClusterState execute(ClusterState currentState) {
534+
ClusterState.Builder builder = ClusterState.builder(currentState);
535+
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
536+
.state(IndexMetadata.State.OPEN)
537+
.build();
538+
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
539+
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
540+
ClusterState updatedState = builder.build();
541+
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
542+
routingTable.addAsRecovery(updatedState.metadata().index(index));
543+
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
544+
545+
return allocationService.reroute(updatedState, "reroute");
546+
}
547+
548+
@Override
549+
public void onFailure(String source, Exception e) {}
550+
});
551+
552+
ensureGreen(index);
553+
}
554+
555+
AtomicBoolean blockReplicaStart = new AtomicBoolean(false);
556+
557+
private void blockShardStartedResponse(String master, ClusterService service) {
558+
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
559+
primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> {
560+
561+
if (blockReplicaStart.get()) {
562+
ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request;
563+
String stringRep = req.toString();
564+
logger.info("ShardStateAction.StartedShardEntry {}", stringRep);
565+
566+
String incomingRequest = req.toString();
567+
Optional<ShardRouting> matchReplica = service.state()
568+
.routingTable()
569+
.allShardsSatisfyingPredicate(r -> !r.primary())
570+
.getShardRoutings()
571+
.stream()
572+
.filter(r -> r.allocationId() != null)
573+
.filter(r -> incomingRequest.contains(r.allocationId().getId()))
574+
.findAny();
575+
576+
if (matchReplica.isPresent()) {
577+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
578+
} else {
579+
handler.messageReceived(request, channel, task);
580+
}
581+
} else {
582+
handler.messageReceived(request, channel, task);
583+
}
584+
});
585+
}
586+
587+
@Override
588+
protected Collection<Class<? extends Plugin>> nodePlugins() {
589+
return List.of(MockTransportService.TestPlugin.class);
590+
}
412591
}

Diff for: server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -241,11 +241,13 @@ private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integ
241241
for (Map.Entry<ShardId, V> shardData : shardDataFromNode.entrySet()) {
242242
if (shardData.getValue() != null) {
243243
ShardId shardId = shardData.getKey();
244-
if (emptyShardResponsePredicate.test(shardData.getValue())) {
245-
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
246-
this.shardData[shardIdKey.get(shardId)] = null;
247-
} else {
248-
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
244+
if (shardIdKey.get(shardId) != null) {// the response might be for shard which is no longer present in cache
245+
if (emptyShardResponsePredicate.test(shardData.getValue())) {
246+
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
247+
this.shardData[shardIdKey.get(shardId)] = null;
248+
} else {
249+
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
250+
}
249251
}
250252
}
251253
}

Diff for: server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java

+26-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.cluster.node.DiscoveryNode;
1313
import org.opensearch.cluster.routing.RoutingNodes;
1414
import org.opensearch.cluster.routing.ShardRouting;
15+
import org.opensearch.cluster.routing.ShardRoutingState;
1516
import org.opensearch.cluster.routing.UnassignedInfo;
1617
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
1718
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
@@ -51,13 +52,25 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
5152
*/
5253
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
5354
List<Runnable> shardCancellationActions = new ArrayList<>();
55+
Map<ShardId, List<ShardRouting>> initReplicasFromRouting = new HashMap<>();
56+
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).stream().filter(r -> !r.primary()).forEach(r -> {
57+
initReplicasFromRouting.putIfAbsent(r.shardId(), new ArrayList<>());
58+
initReplicasFromRouting.get(r.shardId()).add(r);
59+
});
60+
5461
// iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch
5562
for (List<ShardRouting> shardBatch : shardBatches) {
5663
List<ShardRouting> eligibleShards = new ArrayList<>();
5764
List<ShardRouting> ineligibleShards = new ArrayList<>();
5865
// iterate over shards to check for match for each of those
5966
for (ShardRouting shard : shardBatch) {
6067
if (shard != null && !shard.primary()) {
68+
// check if the shard is in Initializing state in RoutingTable
69+
// as the batch is not refreshed yet
70+
if (!initReplicasFromRouting.containsKey(shard.shardId())) {
71+
logger.trace("skipping the shardRouting {} as the state is updated in routing table", shard);
72+
continue;
73+
}
6174
// need to iterate over all the nodes to find matching shard
6275
if (shouldSkipFetchForRecovery(shard)) {
6376
// shard should just be skipped for fetchData, no need to remove from batch
@@ -72,11 +85,19 @@ public void processExistingRecoveries(RoutingAllocation allocation, List<List<Sh
7285
continue; // still fetching
7386
}
7487
for (ShardRouting shard : eligibleShards) {
75-
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(shard, shardState);
76-
77-
Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(shard, allocation, nodeShardStores);
78-
if (cancellationAction != null) {
79-
shardCancellationActions.add(cancellationAction);
88+
for (ShardRouting initShardsFromAllocation : initReplicasFromRouting.get(shard.shardId())) {
89+
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(
90+
initShardsFromAllocation,
91+
shardState
92+
);
93+
Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(
94+
initShardsFromAllocation,
95+
allocation,
96+
nodeShardStores
97+
);
98+
if (cancellationAction != null) {
99+
shardCancellationActions.add(cancellationAction);
100+
}
80101
}
81102
}
82103
}

0 commit comments

Comments
 (0)