Skip to content

Commit 83a5525

Browse files
committed
rare reroute with npe
1 parent 0bded88 commit 83a5525

File tree

5 files changed

+211
-3
lines changed

5 files changed

+211
-3
lines changed

Diff for: server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java

+179
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.action.support.master.AcknowledgedResponse;
4141
import org.opensearch.cluster.ClusterState;
4242
import org.opensearch.cluster.ClusterStateUpdateTask;
43+
import org.opensearch.cluster.action.shard.ShardStateAction;
4344
import org.opensearch.cluster.block.ClusterBlocks;
4445
import org.opensearch.cluster.metadata.IndexMetadata;
4546
import org.opensearch.cluster.metadata.MappingMetadata;
@@ -48,29 +49,39 @@
4849
import org.opensearch.cluster.node.DiscoveryNodes;
4950
import org.opensearch.cluster.routing.RoutingTable;
5051
import org.opensearch.cluster.routing.ShardRouting;
52+
import org.opensearch.cluster.routing.ShardRoutingState;
5153
import org.opensearch.cluster.routing.allocation.AllocationService;
54+
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
5255
import org.opensearch.cluster.service.ClusterService;
5356
import org.opensearch.common.action.ActionFuture;
5457
import org.opensearch.common.settings.Settings;
5558
import org.opensearch.common.unit.TimeValue;
5659
import org.opensearch.core.action.ActionResponse;
5760
import org.opensearch.core.index.Index;
61+
import org.opensearch.core.transport.TransportResponse;
5862
import org.opensearch.discovery.Discovery;
5963
import org.opensearch.index.IndexService;
6064
import org.opensearch.index.mapper.DocumentMapper;
6165
import org.opensearch.index.mapper.MapperService;
6266
import org.opensearch.indices.IndicesService;
67+
import org.opensearch.plugins.Plugin;
6368
import org.opensearch.test.OpenSearchIntegTestCase;
6469
import org.opensearch.test.disruption.BlockClusterStateProcessing;
70+
import org.opensearch.test.transport.MockTransportService;
71+
import org.opensearch.transport.TransportService;
6572
import org.opensearch.transport.TransportSettings;
6673

74+
import java.util.Collection;
6775
import java.util.List;
6876
import java.util.Map;
77+
import java.util.Optional;
6978
import java.util.concurrent.TimeUnit;
79+
import java.util.concurrent.atomic.AtomicBoolean;
7080

7181
import static java.util.Collections.emptyMap;
7282
import static java.util.Collections.emptySet;
7383
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
84+
import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME;
7485
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
7586
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
7687
import static org.hamcrest.Matchers.equalTo;
@@ -409,4 +420,172 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
409420
assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
410421
}
411422

423+
public void testDisassociateNodesWhileShardInit() throws InterruptedException {
424+
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(
425+
Settings.builder()
426+
.put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s")
427+
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
428+
.build()
429+
);
430+
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
431+
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
432+
String node2 = internalCluster().startDataOnlyNode(
433+
Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()
434+
);
435+
436+
final ClusterService clusterService = internalCluster().clusterService(clusterManagerName);
437+
blockShardStartedResponse(clusterManagerName, clusterService);
438+
439+
final String index = "index";
440+
441+
// create index with 3 primary and 1 replica each
442+
prepareCreate(index).setSettings(
443+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
444+
// .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")
445+
).get();
446+
ensureGreen(index);
447+
448+
// close to have some unassigned started shards shards..
449+
client().admin().indices().prepareClose(index).get();
450+
451+
// block so that replicas are always in init and not started
452+
blockReplicaStart.set(true);
453+
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName);
454+
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
455+
@Override
456+
public ClusterState execute(ClusterState currentState) {
457+
ClusterState.Builder builder = ClusterState.builder(currentState);
458+
// open index
459+
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
460+
.state(IndexMetadata.State.OPEN)
461+
.build();
462+
463+
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
464+
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
465+
ClusterState updatedState = builder.build();
466+
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
467+
routingTable.addAsRecovery(updatedState.metadata().index(index));
468+
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
469+
ClusterState state = allocationService.reroute(updatedState, "reroute");
470+
return state;
471+
}
472+
473+
@Override
474+
public void onFailure(String source, Exception e) {
475+
logger.error(e.getMessage(), e);
476+
}
477+
});
478+
479+
ensureYellow(index);
480+
assertTrue(waitUntil(() -> {
481+
ClusterState state = clusterService.state();
482+
return state.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3;
483+
484+
}));
485+
486+
logger.info("Initializing shards");
487+
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
488+
489+
// trigger 2nd reroute after shard in initialized
490+
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
491+
@Override
492+
public ClusterState execute(ClusterState currentState) {
493+
return allocationService.reroute(currentState, "reroute");
494+
}
495+
496+
@Override
497+
public void onFailure(String source, Exception e) {}
498+
});
499+
500+
ensureYellow(index);
501+
assertTrue(waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3));
502+
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
503+
@Override
504+
public ClusterState execute(ClusterState currentState) throws Exception {
505+
// remove the primary node of replica shard which is in init
506+
ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0);
507+
ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId());
508+
509+
ClusterState.Builder builder = ClusterState.builder(currentState);
510+
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId()));
511+
currentState = builder.build();
512+
logger.info("removed the node {}", primaryShard.currentNodeId());
513+
logger.info("shard {}", next);
514+
ClusterState state = allocationService.disassociateDeadNodes(currentState, true, "reroute");
515+
return state;
516+
}
517+
518+
@Override
519+
public void onFailure(String source, Exception e) {}
520+
});
521+
assertTrue(waitUntil(() -> {
522+
ClusterState state = clusterService.state();
523+
logger.info("current state {} ", state);
524+
return clusterService.state().nodes().getSize() == 3;
525+
526+
}));
527+
528+
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
529+
blockReplicaStart.set(false);
530+
531+
clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
532+
@Override
533+
public ClusterState execute(ClusterState currentState) {
534+
ClusterState.Builder builder = ClusterState.builder(currentState);
535+
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
536+
.state(IndexMetadata.State.OPEN)
537+
.build();
538+
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
539+
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
540+
ClusterState updatedState = builder.build();
541+
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
542+
routingTable.addAsRecovery(updatedState.metadata().index(index));
543+
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
544+
545+
return allocationService.reroute(updatedState, "reroute");
546+
}
547+
548+
@Override
549+
public void onFailure(String source, Exception e) {}
550+
});
551+
552+
ensureGreen(index);
553+
}
554+
555+
AtomicBoolean blockReplicaStart = new AtomicBoolean(false);
556+
557+
private void blockShardStartedResponse(String master, ClusterService service) {
558+
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
559+
primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> {
560+
561+
if (blockReplicaStart.get()) {
562+
ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request;
563+
String stringRep = req.toString();
564+
logger.info("ShardStateAction.StartedShardEntry {}", stringRep);
565+
566+
String incomingRequest = req.toString();
567+
Optional<ShardRouting> matchReplica = service.state()
568+
.routingTable()
569+
.allShardsSatisfyingPredicate(r -> !r.primary())
570+
.getShardRoutings()
571+
.stream()
572+
.filter(r -> r.allocationId() != null)
573+
.filter(r -> incomingRequest.contains(r.allocationId().getId()))
574+
.findAny();
575+
576+
if (matchReplica.isPresent()) {
577+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
578+
} else {
579+
handler.messageReceived(request, channel, task);
580+
}
581+
} else {
582+
handler.messageReceived(request, channel, task);
583+
}
584+
});
585+
}
586+
587+
@Override
588+
protected Collection<Class<? extends Plugin>> nodePlugins() {
589+
return List.of(MockTransportService.TestPlugin.class);
590+
}
412591
}

Diff for: server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
843843
* @opensearch.internal
844844
*/
845845
public static class StartedShardEntry extends TransportRequest {
846+
846847
final ShardId shardId;
847848
final String allocationId;
848849
final long primaryTerm;

Diff for: server/src/main/java/org/opensearch/cluster/service/MasterService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterSt
917917
throw new AssertionError("update task submitted to ClusterManagerService cannot remove cluster-manager");
918918
}
919919
} catch (Exception e) {
920-
logger.trace(
920+
logger.info(
921921
() -> new ParameterizedMessage(
922922
"failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
923923
previousClusterState.version(),

Diff for: server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch(
101101
RoutingNodes routingNodes = allocation.routingNodes();
102102
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
103103
if (primaryShard == null) {
104-
logger.trace("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard);
104+
logger.info("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard);
105105
return null;
106106
}
107107
assert primaryShard.currentNodeId() != null;
@@ -111,7 +111,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch(
111111
if (primaryStore == null) {
112112
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
113113
// just let the recovery find it out, no need to do anything about it for the initializing shard
114-
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
114+
logger.info("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
115115
return null;
116116
}
117117

Diff for: server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java

+28
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010

1111
import org.apache.logging.log4j.Logger;
1212
import org.opensearch.cluster.node.DiscoveryNode;
13+
import org.opensearch.cluster.routing.IndexRoutingTable;
1314
import org.opensearch.cluster.routing.RoutingNodes;
1415
import org.opensearch.cluster.routing.ShardRouting;
16+
import org.opensearch.cluster.routing.ShardRoutingState;
1517
import org.opensearch.cluster.routing.UnassignedInfo;
1618
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
1719
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
@@ -33,6 +35,7 @@
3335
import java.util.Map;
3436
import java.util.Set;
3537
import java.util.function.Supplier;
38+
import java.util.stream.Collectors;
3639

3740
/**
3841
* Allocates replica shards in a batch mode
@@ -51,13 +54,38 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
5154
*/
5255
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
5356
List<Runnable> shardCancellationActions = new ArrayList<>();
57+
58+
Map<String, IndexRoutingTable> initShardsFromRouting = allocation.routingTable().indicesRouting();
59+
Map<String, Set<ShardId>> initShardsForIndex = new HashMap<>();
60+
5461
// iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch
5562
for (List<ShardRouting> shardBatch : shardBatches) {
5663
List<ShardRouting> eligibleShards = new ArrayList<>();
5764
List<ShardRouting> ineligibleShards = new ArrayList<>();
5865
// iterate over shards to check for match for each of those
5966
for (ShardRouting shard : shardBatch) {
6067
if (shard != null && !shard.primary()) {
68+
// check if the shard is in Initializing state in RoutingTable
69+
// as the batch is not refreshed yet
70+
String indexName = shard.shardId().getIndexName();
71+
if (!initShardsFromRouting.containsKey(indexName)) {
72+
logger.info("skipping the shardRouting {} as the index is not present in routing table", shard);
73+
continue;
74+
}
75+
Set<ShardId> initShards = initShardsForIndex.putIfAbsent(
76+
indexName,
77+
initShardsFromRouting.get(shard.shardId().getIndexName())
78+
.shardsWithState(ShardRoutingState.INITIALIZING)
79+
.stream()
80+
.map(ShardRouting::shardId)
81+
.collect(Collectors.toSet())
82+
);
83+
84+
if (initShards == null || !initShards.contains(shard.shardId())) {
85+
logger.info("skipping the shardRouting {} as the state is updated in routing table", shard);
86+
continue;
87+
}
88+
6189
// need to iterate over all the nodes to find matching shard
6290
if (shouldSkipFetchForRecovery(shard)) {
6391
// shard should just be skipped for fetchData, no need to remove from batch

0 commit comments

Comments
 (0)