Skip to content

Commit 8a64664

Browse files
committed
rare reroute with npe
1 parent 0bded88 commit 8a64664

File tree

4 files changed

+168
-3
lines changed

4 files changed

+168
-3
lines changed

Diff for: server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java

+155
Original file line numberDiff line numberDiff line change
@@ -32,36 +32,49 @@
3232

3333
package org.opensearch.cluster.coordination;
3434

35+
import java.util.Collection;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3537
import org.opensearch.OpenSearchParseException;
3638
import org.opensearch.Version;
3739
import org.opensearch.action.ActionRequest;
3840
import org.opensearch.action.ActionRequestBuilder;
3941
import org.opensearch.action.index.IndexResponse;
4042
import org.opensearch.action.support.master.AcknowledgedResponse;
43+
import org.opensearch.cluster.ClusterInfoService;
4144
import org.opensearch.cluster.ClusterState;
4245
import org.opensearch.cluster.ClusterStateUpdateTask;
46+
import org.opensearch.cluster.action.shard.ShardStateAction;
4347
import org.opensearch.cluster.block.ClusterBlocks;
4448
import org.opensearch.cluster.metadata.IndexMetadata;
4549
import org.opensearch.cluster.metadata.MappingMetadata;
4650
import org.opensearch.cluster.metadata.Metadata;
4751
import org.opensearch.cluster.node.DiscoveryNode;
4852
import org.opensearch.cluster.node.DiscoveryNodes;
53+
import org.opensearch.cluster.routing.RoutingChangesObserver;
54+
import org.opensearch.cluster.routing.RoutingNodes;
4955
import org.opensearch.cluster.routing.RoutingTable;
5056
import org.opensearch.cluster.routing.ShardRouting;
57+
import org.opensearch.cluster.routing.ShardRoutingState;
5158
import org.opensearch.cluster.routing.allocation.AllocationService;
59+
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
5260
import org.opensearch.cluster.service.ClusterService;
5361
import org.opensearch.common.action.ActionFuture;
5462
import org.opensearch.common.settings.Settings;
5563
import org.opensearch.common.unit.TimeValue;
5664
import org.opensearch.core.action.ActionResponse;
5765
import org.opensearch.core.index.Index;
66+
import org.opensearch.core.transport.TransportResponse;
5867
import org.opensearch.discovery.Discovery;
5968
import org.opensearch.index.IndexService;
6069
import org.opensearch.index.mapper.DocumentMapper;
6170
import org.opensearch.index.mapper.MapperService;
6271
import org.opensearch.indices.IndicesService;
72+
import org.opensearch.plugins.Plugin;
73+
import org.opensearch.snapshots.SnapshotsInfoService;
6374
import org.opensearch.test.OpenSearchIntegTestCase;
6475
import org.opensearch.test.disruption.BlockClusterStateProcessing;
76+
import org.opensearch.test.transport.MockTransportService;
77+
import org.opensearch.transport.TransportService;
6578
import org.opensearch.transport.TransportSettings;
6679

6780
import java.util.List;
@@ -71,6 +84,7 @@
7184
import static java.util.Collections.emptyMap;
7285
import static java.util.Collections.emptySet;
7386
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
87+
import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME;
7488
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
7589
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
7690
import static org.hamcrest.Matchers.equalTo;
@@ -409,4 +423,145 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
409423
assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
410424
}
411425

426+
427+
public void testDisassociateNodesWhileShardInit() throws InterruptedException {
428+
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s")
429+
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build());
430+
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
431+
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
432+
String node2 = internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
433+
434+
final ClusterService clusterService = internalCluster().clusterService(clusterManagerName);
435+
blockShardStartedResponse(clusterManagerName, clusterService);
436+
437+
final String index = "index";
438+
439+
//create index with 3 primary and 1 replica each
440+
prepareCreate(index).setSettings(
441+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
442+
//.put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")
443+
).get();
444+
ensureGreen(index);
445+
446+
// close to have some unassigned started shards shards..
447+
client().admin().indices().prepareClose(index).get();
448+
449+
//block so that replicas are always in init and not started
450+
blockReplicaStart.set(true);
451+
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName);
452+
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
453+
@Override
454+
public ClusterState execute(ClusterState currentState) {
455+
ClusterState.Builder builder = ClusterState.builder(currentState);
456+
// open index
457+
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
458+
.state(IndexMetadata.State.OPEN)
459+
.build();
460+
461+
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
462+
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
463+
ClusterState updatedState = builder.build();
464+
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
465+
routingTable.addAsRecovery(updatedState.metadata().index(index));
466+
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
467+
return allocationService.reroute(updatedState, "reroute");
468+
}
469+
470+
@Override
471+
public void onFailure(String source, Exception e) {
472+
logger.error(e.getMessage(), e);
473+
}
474+
});
475+
476+
ensureYellow(index);
477+
waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3);
478+
479+
logger.info("Initializing shards");
480+
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
481+
482+
//trigger 2nd reroute after shard in initialized
483+
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
484+
@Override
485+
public ClusterState execute(ClusterState currentState) {
486+
return allocationService.reroute(currentState, "reroute");
487+
}
488+
489+
@Override
490+
public void onFailure(String source, Exception e) {}
491+
});
492+
ensureYellow(index);
493+
waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3);
494+
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
495+
@Override
496+
public ClusterState execute(ClusterState currentState) throws Exception {
497+
//remove the primary node of replica shard which is in init
498+
ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0);
499+
ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId());
500+
501+
ClusterState.Builder builder = ClusterState.builder(currentState);
502+
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId()));
503+
currentState = builder.build();
504+
logger.info("removed the node {}", primaryShard.currentNodeId());
505+
logger.info("shard {}", next);
506+
return allocationService.disassociateDeadNodes(currentState, true, "reroute");
507+
}
508+
509+
@Override
510+
public void onFailure(String source, Exception e) {}
511+
});
512+
//sleep for reroute to get triggeredn
513+
Thread.sleep(300 * 1000);
514+
515+
waitUntil(() -> clusterService.state().nodes().getSize() == 3);
516+
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
517+
blockReplicaStart.set(false);
518+
519+
clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
520+
@Override
521+
public ClusterState execute(ClusterState currentState) {
522+
ClusterState.Builder builder = ClusterState.builder(currentState);
523+
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
524+
.state(IndexMetadata.State.OPEN)
525+
.build();
526+
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
527+
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
528+
ClusterState updatedState = builder.build();
529+
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
530+
routingTable.addAsRecovery(updatedState.metadata().index(index));
531+
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
532+
533+
return allocationService.reroute(updatedState, "reroute");
534+
}
535+
536+
@Override
537+
public void onFailure(String source, Exception e) {}
538+
});
539+
540+
ensureGreen(index);
541+
}
542+
543+
AtomicBoolean blockReplicaStart = new AtomicBoolean(false);
544+
private void blockShardStartedResponse(String master, ClusterService service) {
545+
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
546+
primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> {
547+
548+
if (blockReplicaStart.get()) {
549+
ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request;
550+
final ShardRouting matched = service.state().getRoutingTable().getByAllocationId(req.getShardId(), req.getAllocationId());
551+
552+
if (matched != null && matched.primary() == false) {
553+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
554+
} else {
555+
handler.messageReceived(request, channel, task);
556+
}
557+
} else {
558+
handler.messageReceived(request, channel, task);
559+
}
560+
});
561+
}
562+
563+
@Override
564+
protected Collection<Class<? extends Plugin>> nodePlugins() {
565+
return List.of(MockTransportService.TestPlugin.class);
566+
}
412567
}

Diff for: server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java

+10
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
843843
* @opensearch.internal
844844
*/
845845
public static class StartedShardEntry extends TransportRequest {
846+
846847
final ShardId shardId;
847848
final String allocationId;
848849
final long primaryTerm;
@@ -883,6 +884,15 @@ public String toString() {
883884
message
884885
);
885886
}
887+
888+
public ShardId getShardId() {
889+
return shardId;
890+
}
891+
892+
public String getAllocationId() {
893+
return allocationId;
894+
}
895+
886896
}
887897

888898
/**

Diff for: server/src/main/java/org/opensearch/cluster/service/MasterService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterSt
917917
throw new AssertionError("update task submitted to ClusterManagerService cannot remove cluster-manager");
918918
}
919919
} catch (Exception e) {
920-
logger.trace(
920+
logger.info(
921921
() -> new ParameterizedMessage(
922922
"failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
923923
previousClusterState.version(),

Diff for: server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch(
101101
RoutingNodes routingNodes = allocation.routingNodes();
102102
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
103103
if (primaryShard == null) {
104-
logger.trace("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard);
104+
logger.info("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard);
105105
return null;
106106
}
107107
assert primaryShard.currentNodeId() != null;
@@ -111,7 +111,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch(
111111
if (primaryStore == null) {
112112
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
113113
// just let the recovery find it out, no need to do anything about it for the initializing shard
114-
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
114+
logger.info("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
115115
return null;
116116
}
117117

0 commit comments

Comments
 (0)