diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java index 522d63b22a0da..f947ca1ccbb85 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java @@ -364,7 +364,6 @@ public void testAwarenessZonesIncrementalNodes() { assertThat(counts.get(noZoneNode), equalTo(2)); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5908") public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws Exception { int nodeCountPerAZ = 5; int numOfShards = 30; @@ -504,4 +503,32 @@ public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws E assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1))); assertThat(health.isTimedOut(), equalTo(false)); } + + public void testAwarenessBalanceWithForcedAwarenessCreateAndUpdateIndex() { + Settings settings = Settings.builder() + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.balance", "true") + .build(); + + logger.info("--> starting 3 nodes on zones a,b,c"); + internalCluster().startNodes( + Settings.builder().put(settings).put("node.attr.zone", "a").build(), + Settings.builder().put(settings).put("node.attr.zone", "b").build(), + Settings.builder().put(settings).put("node.attr.zone", "c").build() + ); + + // Create index with 2 replicas ie total 3 shards + createIndex( + "test-idx", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build() + ); + + // Update the number of replicas to 4 + final Settings newsettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build(); + + assertThrows(IllegalArgumentException.class, () -> { + assertAcked(client().admin().indices().prepareUpdateSettings("test-idx").setSettings(newsettings)); + }); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java new file mode 100644 index 0000000000000..3e610df1887ed --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java @@ -0,0 +1,326 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.allocation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.test.NodeRoles.searchOnlyNode; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchReplicaAwarenessAllocationIT extends RemoteStoreBaseIntegTestCase { + + private final Logger logger = LogManager.getLogger(SearchReplicaAwarenessAllocationIT.class); + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build(); + } + + public void testAllocationAwarenessZones() { + Settings commonSettings = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .build(); + + logger.info("--> starting 8 nodes on different zones"); + List nodes = internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "a").put(searchOnlyNode()).build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "a").put(searchOnlyNode()).build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("6").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + logger.info("--> create index"); + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + + logger.info("--> waiting for shards to be allocated"); + ensureGreen("test"); + + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + final Map counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + /* + * Ensures that shards are distributed across different zones in the cluster. + * Given two zones (a and b) with one data node in each, the shards are evenly distributed, + * resulting in each data node being assigned three shards. + */ + for (int i = 0; i < 2; i++) { + assertThat(counts.get(nodes.get(i)), equalTo(3)); + } + + /* + * There are two search nodes in each zone, totaling four search nodes. + * With six search shards to allocate, they are assigned using a best-effort spread, + * ensuring each search node receives either one or two shards. + */ + for (int i = 2; i < 6; i++) { + assertThat(counts.get(nodes.get(i)), anyOf(equalTo(1), equalTo(2))); + } + } + + public void testAwarenessZonesIncrementalNodes() { + Settings commonSettings = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .build(); + + logger.info("--> starting 2 nodes on zones 'a' & 'b'"); + List nodes = internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "a").put(searchOnlyNode()).build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build() + ); + + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) + .build() + ); + + ensureGreen("test"); + + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + Map counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + /* + * The cluster consists of two zones, each containing one data node and one search node. + * Replicas and search replicas are evenly distributed across these zones. + */ + for (int i = 0; i < 4; i++) { + assertThat(counts.get(nodes.get(i)), equalTo(3)); + } + + logger.info("--> starting another data and search node in zone 'b'"); + + String B_2 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + String B_3 = internalCluster().startNode( + Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build() + ); + + ensureGreen("test"); + + client().admin().cluster().prepareReroute().get(); + + ensureGreen("test"); + + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + /* + * Adding a new data node and a new search node in zone B results in: + * - Zone A: 1 data node, 1 search node + * - Zone B: 2 data nodes, 2 search nodes + * + * As a result, shards are rerouted to maintain a best-effort balanced allocation. + */ + assertThat(counts.get(nodes.get(0)), equalTo(3)); + assertThat(counts.get(nodes.get(1)), equalTo(2)); + assertThat(counts.get(nodes.get(2)), equalTo(3)); + assertThat(counts.get(nodes.get(3)), equalTo(2)); + assertThat(counts.get(B_2), equalTo(1)); + assertThat(counts.get(B_3), equalTo(1)); + + logger.info("--> starting another data node without any zone"); + + String noZoneNode = internalCluster().startNode(); + ensureGreen("test"); + client().admin().cluster().prepareReroute().get(); + ensureGreen("test"); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + logger.info("--> Ensure there was not rerouting"); + + /* + * Adding another node to the cluster without a zone attribute + * does not trigger shard reallocation; existing shard assignments remain unchanged. + */ + assertThat(counts.get(nodes.get(0)), equalTo(3)); + assertThat(counts.get(nodes.get(1)), equalTo(2)); + assertThat(counts.get(nodes.get(2)), equalTo(3)); + assertThat(counts.get(nodes.get(3)), equalTo(2)); + assertThat(counts.get(B_2), equalTo(1)); + assertThat(counts.get(B_3), equalTo(1)); + assertThat(counts.containsKey(noZoneNode), equalTo(false)); + + logger.info("--> Remove the awareness attribute setting"); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "") + .build() + ) + .get(); + + ensureGreen("test"); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + /* + * Removing allocation awareness attributes from the cluster disables zone-based distribution. + * Shards are then assigned based solely the other deciders in the cluster manager. + */ + assertThat(counts.get(nodes.get(0)), equalTo(2)); + assertThat(counts.get(nodes.get(1)), equalTo(2)); + assertThat(counts.get(nodes.get(2)), equalTo(2)); + assertThat(counts.get(nodes.get(3)), equalTo(2)); + assertThat(counts.get(B_2), equalTo(1)); + assertThat(counts.get(B_3), equalTo(2)); + assertThat(counts.get(noZoneNode), equalTo(1)); + } + + public void testAwarenessBalanceWithForcedAwarenessCreateIndex() { + Settings settings = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b,c") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), "true") + .build(); + + logger.info("--> starting 3 nodes on zones a,b,c"); + internalCluster().startNodes( + Settings.builder().put(settings).put("node.attr.zone", "a").build(), + Settings.builder().put(settings).put("node.attr.zone", "b").build(), + Settings.builder().put(settings).put("node.attr.zone", "c").build() + ); + + // Create index with 2 replicas and 2 search replicas + assertThrows(IllegalArgumentException.class, () -> { + createIndex( + "test-idx", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) + .build() + ); + }); + } + + public void testAwarenessBalanceWithForcedAwarenessUpdateIndex() { + Settings settings = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b,c") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), "true") + .build(); + + logger.info("--> starting 3 nodes on zones a,b,c"); + internalCluster().startNodes( + Settings.builder().put(settings).put("node.attr.zone", "a").build(), + Settings.builder().put(settings).put("node.attr.zone", "b").build(), + Settings.builder().put(settings).put("node.attr.zone", "c").build() + ); + + // Create index with 2 replicas and 3 search replicas + createIndex( + "test-idx", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 3) + .build() + ); + + // Update the number of search replicas to 4 + assertThrows(IllegalArgumentException.class, () -> { + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings("test-idx") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 4).build()) + ); + }); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 2bdd31b23aee3..3483c14df6272 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1500,11 +1500,13 @@ List getIndexSettingsValidationErrors( IndexMetadata.SETTING_NUMBER_OF_REPLICAS, DEFAULT_REPLICA_COUNT_SETTING.get(this.clusterService.state().metadata().settings()) ); + int searchReplicaCount = settings.getAsInt(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0); AutoExpandReplicas autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); - Optional error = awarenessReplicaBalance.validate(replicaCount, autoExpandReplica); - if (error.isPresent()) { - validationErrors.add(error.get()); - } + + Optional replicaValidationError = awarenessReplicaBalance.validate(replicaCount, autoExpandReplica); + replicaValidationError.ifPresent(validationErrors::add); + Optional searchReplicaValidationError = awarenessReplicaBalance.validate(searchReplicaCount); + searchReplicaValidationError.ifPresent(validationErrors::add); } return validationErrors; } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 469bec7220721..fff704210ca7a 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -301,7 +301,18 @@ public ClusterState execute(ClusterState currentState) { } final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(openSettings); if (preserveExisting == false) { - // TODO: Honor awareness validation to search replicas. + for (Index index : request.indices()) { + if (index.getName().charAt(0) != '.') { + // No replica count validation for system indices + Optional error = awarenessReplicaBalance.validate(updatedNumberOfSearchReplicas); + + if (error.isPresent()) { + ValidationException ex = new ValidationException(); + ex.addValidationError(error.get()); + throw ex; + } + } + } // Verify that this won't take us over the cluster shard limit. int totalNewShards = Arrays.stream(request.indices()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 76111f623e0a5..6db70cc5f4fc5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -59,7 +59,6 @@ import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.function.Function; @@ -103,6 +102,7 @@ public class RoutingNodes implements Iterable { private int relocatingShards = 0; private final Map> nodesPerAttributeNames; + private final Map> searchNodesPerAttributeNames; private final Map recoveriesPerNode = new HashMap<>(); private final Map initialReplicaRecoveries = new HashMap<>(); private final Map initialPrimaryRecoveries = new HashMap<>(); @@ -116,6 +116,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>()); + this.searchNodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>()); // fill in the nodeToShards with the "live" nodes for (final DiscoveryNode cursor : clusterState.nodes().getDataNodes().values()) { @@ -297,10 +298,28 @@ public Stream stream() { return nodesToShards.values().stream(); } + /** + * Retrieves all unique values for a specific awareness attribute across all nodes + * Eg: "zone" : ["zone1", "zone2", "zone3"] + * @param attributeName The name of the awareness attribute to collect values for + * @return A set of unique attribute values for the specified attribute + */ public Set nodesPerAttributesCounts(String attributeName) { + return nodesPerAttributesCounts(attributeName, routingNode -> true); + } + + /** + * Retrieves all unique values for a specific awareness attribute across filtered nodes + * Eg: "zone" : ["zone1", "zone2", "zone3"] + * @param attributeName The name of the awareness attribute to collect values for + * @param routingNodeFilter filters the routing nodes based on given condition + * @return A set of unique attribute values for the specified attribute + */ + public Set nodesPerAttributesCounts(String attributeName, Predicate routingNodeFilter) { + return nodesPerAttributeNames.computeIfAbsent( attributeName, - ignored -> stream().map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet()) + ignored -> stream().filter(routingNodeFilter).map(r -> r.node().getAttributes().get(attributeName)).collect(Collectors.toSet()) ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java index 6fc0e535ef4dc..d2cf30bd31983 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java @@ -122,4 +122,14 @@ public Optional validate(int replicaCount, AutoExpandReplicas autoExpand return Optional.empty(); } + public Optional validate(int searchReplicaCount) { + // TODO: For now Search replicas do not support auto expand, when we add support update this validation + if (searchReplicaCount > 0 && searchReplicaCount % maxAwarenessAttributes() != 0) { + String errorMessage = "total search replicas needs to be a multiple of total awareness attributes [" + + maxAwarenessAttributes() + + "]"; + return Optional.of(errorMessage); + } + return Optional.empty(); + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 16c94acfbb553..17b8aa1d3cbb5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -160,7 +161,10 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); - int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary + int shardCount = shardRouting.isSearchOnly() + ? indexMetadata.getNumberOfSearchOnlyReplicas() + : indexMetadata.getNumberOfReplicas() + 1; // 1 for primary + for (String awarenessAttribute : awarenessAttributes) { // the node the shard exists on must be associated with an awareness attribute. if (isAwarenessAttributeAssociatedWithNode(node, awarenessAttribute) == false) { @@ -175,18 +179,14 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } int currentNodeCount = getCurrentNodeCountForAttribute(shardRouting, node, allocation, moveToNode, awarenessAttribute); + Set attributeValues = getAttributeValues(shardRouting, allocation, awarenessAttribute); + int numberOfAttributes = attributeValues.size(); - // build attr_value -> nodes map - Set nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); - int numberOfAttributes = nodesPerAttribute.size(); List fullValues = forcedAwarenessAttributes.get(awarenessAttribute); - if (fullValues != null) { // If forced awareness is enabled, numberOfAttributes = count(distinct((union(discovered_attributes, forced_attributes))) Set attributesSet = new HashSet<>(fullValues); - for (String stringObjectCursor : nodesPerAttribute) { - attributesSet.add(stringObjectCursor); - } + attributesSet.addAll(attributeValues); numberOfAttributes = attributesSet.size(); } @@ -211,6 +211,11 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements"); } + private Set getAttributeValues(ShardRouting shardRouting, RoutingAllocation allocation, String awarenessAttribute) { + return allocation.routingNodes() + .nodesPerAttributesCounts(awarenessAttribute, routingNode -> routingNode.node().isSearchNode() == shardRouting.isSearchOnly()); + } + private int getCurrentNodeCountForAttribute( ShardRouting shardRouting, RoutingNode node, @@ -220,9 +225,9 @@ private int getCurrentNodeCountForAttribute( ) { // build the count of shards per attribute value final String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute); + // Get all assigned shards of the same type + List assignedShards = getAssignedShards(allocation, shardRouting); int currentNodeCount = 0; - final List assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId()); - for (ShardRouting assignedShard : assignedShards) { if (assignedShard.started() || assignedShard.initializing()) { // Note: this also counts relocation targets as that will be the new location of the shard. @@ -255,6 +260,14 @@ private int getCurrentNodeCountForAttribute( return currentNodeCount; } + private List getAssignedShards(RoutingAllocation allocation, ShardRouting shardRouting) { + return allocation.routingNodes() + .assignedShards(shardRouting.shardId()) + .stream() + .filter(s -> s.isSearchOnly() == shardRouting.isSearchOnly()) + .collect(Collectors.toList()); + } + private boolean isAwarenessAttributeAssociatedWithNode(RoutingNode node, String awarenessAttribute) { return node.node().getAttributes().containsKey(awarenessAttribute); } @@ -262,5 +275,4 @@ private boolean isAwarenessAttributeAssociatedWithNode(RoutingNode node, String private String getAttributeValueForNode(final RoutingNode node, final String awarenessAttribute) { return node.node().getAttributes().get(awarenessAttribute); } - } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java index 02966b835fae0..d954e4675aa9a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -67,7 +67,6 @@ import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.sameInstance; @@ -1063,4 +1062,61 @@ public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() { } } + + public void testAllocationAwarenessWhenNotEnabled() { + AllocationService strategy = createAllocationService(Settings.builder().build()); + + logger.info("--> Building initial routing table"); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same zone and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder().add(newNode("node1", singletonMap("zone", "a"))).add(newNode("node2", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + + logger.info("--> add a a nodes without zone and reroute"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); + + logger.info("--> try to move the replica to new node"); + AllocationService.CommandsResult commandsResult = strategy.reroute( + clusterState, + new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")), + true, + false + ); + + assertEquals(commandsResult.explanations().explanations().size(), 1); + assertEquals(commandsResult.explanations().explanations().get(0).decisions().type(), Decision.Type.YES); + List decisions = commandsResult.explanations() + .explanations() + .get(0) + .decisions() + .getDecisions() + .stream() + .filter(item -> item.getExplanation().startsWith("allocation awareness is not enabled")) + .toList(); + assertEquals( + "allocation awareness is not enabled, set cluster setting " + "[cluster.routing.allocation.awareness.attributes] to enable it", + decisions.get(0).getExplanation() + ); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java index 019db47e74cc3..c6134330727aa 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java @@ -37,6 +37,9 @@ public void testNoForcedAwarenessAttribute() { assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); + + assertEquals(awarenessReplicaBalance.validate(0), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(1), Optional.empty()); } public void testForcedAwarenessAttribute() { @@ -68,6 +71,7 @@ public void testForcedAwarenessAttribute() { awarenessReplicaBalance = new AwarenessReplicaBalance(settings, EMPTY_CLUSTER_SETTINGS); autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); + assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(3)); assertEquals(awarenessReplicaBalance.validate(2, autoExpandReplica), Optional.empty()); assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); @@ -114,6 +118,16 @@ public void testForcedAwarenessAttribute() { Optional.of("expected total copies needs to be a multiple of total awareness attributes [3]") ); + assertEquals(awarenessReplicaBalance.validate(3), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(0), Optional.empty()); + assertEquals( + awarenessReplicaBalance.validate(2), + Optional.of("total search replicas needs to be a multiple of total awareness attributes [3]") + ); + assertEquals( + awarenessReplicaBalance.validate(1), + Optional.of("total search replicas needs to be a multiple of total awareness attributes [3]") + ); } public void testForcedAwarenessAttributeDisabled() { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java new file mode 100644 index 0000000000000..b757d5911d204 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java @@ -0,0 +1,267 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.command.AllocationCommands; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class SearchReplicaAwarenessAllocationTests extends OpenSearchAllocationTestCase { + + private final Logger logger = LogManager.getLogger(SearchReplicaAwarenessAllocationTests.class); + + public void testAllocationAwarenessForIndexWithSearchReplica() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("--> Building initial routing table"); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .numberOfSearchReplicas(2) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding four nodes on same zone and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", Map.of("zone", "a"))) + .add(newNode("node2", Map.of("zone", "a"))) + .add(newSearchNode("node3", Map.of("zone", "a"))) + .add(newSearchNode("node4", Map.of("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + + logger.info("--> add a two nodes with a new zone and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node5", Map.of("zone", "b"))) + .add(newSearchNode("node6", Map.of("zone", "b"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(2)); + + List shardRoutings = clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING); + shardRoutings.sort(Comparator.comparing(ShardRouting::currentNodeId)); + assertThat(shardRoutings.get(0).relocatingNodeId(), equalTo("node5")); + assertThat(shardRoutings.get(1).relocatingNodeId(), equalTo("node6")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add a new node with a new zone and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node7", Map.of("zone", "c")))) + .build(); + + ClusterState newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, equalTo(clusterState)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + } + + public void testMoveShardOnceNewNodeWithOutAwarenessAttributeAdded() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("--> Building initial routing table'"); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .numberOfSearchReplicas(1) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding four nodes on same zone and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", Map.of("zone", "a"))).add(newSearchNode("node2", Map.of("zone", "a")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + + logger.info("--> add a search node without zone and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newSearchNode("node3", Map.of("searchonly", "true")))) + .build(); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> try to move the replica to node without zone attribute"); + AllocationService.CommandsResult commandsResult = strategy.reroute( + clusterState, + new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")), + true, + false + ); + + assertEquals(commandsResult.explanations().explanations().size(), 1); + assertEquals(commandsResult.explanations().explanations().get(0).decisions().type(), Decision.Type.NO); + List decisions = commandsResult.explanations() + .explanations() + .get(0) + .decisions() + .getDecisions() + .stream() + .filter(item -> item.type() == Decision.Type.NO) + .toList(); + assertEquals( + "node does not contain the awareness attribute [zone]; " + + "required attributes cluster setting [cluster.routing.allocation.awareness.attributes=zone]", + decisions.get(0).getExplanation() + ); + } + + public void testFullAwarenessWithSearchReplica() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") + .build() + ); + + logger.info("Building initial routing table"); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .numberOfSearchReplicas(2) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding three nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", Map.of("zone", "a"))).add(newSearchNode("node2", Map.of("zone", "a")))) + .build(); + + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> one search replica will not start because we have only one zone value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(1)); + + logger.info("--> add a new node with a new zome and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newSearchNode("node3", Map.of("zone", "2")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node3")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new zone, make sure nothing moves"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "c")))) + .build(); + ClusterState newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, equalTo(clusterState)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index 102a0e5aa2e6d..f0e4502787b28 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -170,6 +170,10 @@ protected static DiscoveryNode newSearchNode(String nodeId, Version version) { return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), SEARCH_ROLE, version); } + protected static DiscoveryNode newSearchNode(String nodeId, Map attributes) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), attributes, SEARCH_ROLE, Version.CURRENT); + } + protected static DiscoveryNode newNode(String nodeName, String nodeId, Map attributes) { return new DiscoveryNode(nodeName, nodeId, buildNewFakeTransportAddress(), attributes, CLUSTER_MANAGER_DATA_ROLES, Version.CURRENT); }