From d473d25a2a53c59fe27a1dae91c169cc62b0ee19 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Tue, 11 Mar 2025 17:27:39 -0700 Subject: [PATCH 1/8] Added allocation awareness attribute support for search replica Signed-off-by: Vinay Krishna Pudyodu --- .../SearchReplicaAwarenessAllocationIT.java | 257 ++++++++++++++++ .../cluster/routing/RoutingNodes.java | 30 +- .../decider/AwarenessAllocationDecider.java | 102 ++++-- .../allocation/AwarenessAllocationTests.java | 58 +++- ...SearchReplicaAwarenessAllocationTests.java | 291 ++++++++++++++++++ .../cluster/OpenSearchAllocationTestCase.java | 4 + 6 files changed, 713 insertions(+), 29 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java new file mode 100644 index 0000000000000..4f42d76362d65 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java @@ -0,0 +1,257 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.allocation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.test.NodeRoles.searchOnlyNode; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchReplicaAwarenessAllocationIT extends RemoteStoreBaseIntegTestCase { + + private final Logger logger = LogManager.getLogger(SearchReplicaAwarenessAllocationIT.class); + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build(); + } + + public void testAllocationAwarenessZones() { + Settings commonSettings = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .build(); + + logger.info("--> starting 8 nodes on different zones"); + List nodes = internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + + Settings.builder().put(commonSettings).put("node.attr.zone", "a").put(searchOnlyNode()).build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "a").put(searchOnlyNode()).build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("8").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + logger.info("--> create index"); + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + + logger.info("--> waiting for shards to be allocated"); + ensureGreen("test"); + + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + final Map counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + assertThat(counts.get(nodes.get(3)), anyOf(equalTo(2), equalTo(3))); + assertThat(counts.get(nodes.get(2)), anyOf(equalTo(2), equalTo(3))); + assertThat(counts.get(nodes.get(0)), anyOf(equalTo(2), equalTo(3))); + assertThat(counts.get(nodes.get(1)), anyOf(equalTo(2), equalTo(3))); + assertThat(counts.get(nodes.get(4)), anyOf(equalTo(2), equalTo(3))); + assertThat(counts.get(nodes.get(5)), anyOf(equalTo(2), equalTo(3))); + assertThat(counts.get(nodes.get(6)), anyOf(equalTo(2), equalTo(3))); + assertThat(counts.get(nodes.get(7)), anyOf(equalTo(2), equalTo(3))); + } + + public void testAwarenessZonesIncrementalNodes() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build(); + + logger.info("--> starting 2 nodes on zones 'a' & 'b'"); + List nodes = internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "a").put(searchOnlyNode()).build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build() + ); + + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) + .build() + ); + + ensureGreen("test"); + + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + Map counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + assertThat(counts.get(nodes.get(0)), equalTo(5)); + assertThat(counts.get(nodes.get(1)), equalTo(5)); + assertThat(counts.get(nodes.get(2)), equalTo(5)); + assertThat(counts.get(nodes.get(3)), equalTo(5)); + + logger.info("--> starting another data and search node in zone 'b'"); + + String B_2 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + String B_3 = internalCluster().startNode( + Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build() + ); + + ensureGreen("test"); + + client().admin().cluster().prepareReroute().get(); + + ensureGreen("test"); + + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + assertThat(counts.get(nodes.get(0)), equalTo(5)); + assertThat(counts.get(nodes.get(1)), equalTo(3)); + assertThat(counts.get(nodes.get(2)), equalTo(5)); + assertThat(counts.get(nodes.get(3)), equalTo(3)); + assertThat(counts.get(B_2), equalTo(2)); + assertThat(counts.get(B_3), equalTo(2)); + + logger.info("--> starting another data node without any zone"); + + String noZoneNode = internalCluster().startNode(); + ensureGreen("test"); + client().admin().cluster().prepareReroute().get(); + ensureGreen("test"); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + logger.info("--> Ensure there was not rerouting"); + + assertThat(counts.get(nodes.get(0)), equalTo(5)); + assertThat(counts.get(nodes.get(1)), equalTo(3)); + assertThat(counts.get(nodes.get(2)), equalTo(5)); + assertThat(counts.get(nodes.get(3)), equalTo(3)); + assertThat(counts.get(B_2), equalTo(2)); + assertThat(counts.get(B_3), equalTo(2)); + assertThat(counts.containsKey(noZoneNode), equalTo(false)); + + logger.info("--> Remove the awareness attribute setting"); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "").build()) + .get(); + + ensureGreen("test"); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + counts = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + + assertThat(counts.get(nodes.get(0)), equalTo(3)); + assertThat(counts.get(nodes.get(1)), equalTo(3)); + assertThat(counts.get(nodes.get(2)), equalTo(4)); + assertThat(counts.get(nodes.get(3)), equalTo(3)); + assertThat(counts.get(B_2), equalTo(2)); + assertThat(counts.get(B_3), equalTo(3)); + assertThat(counts.get(noZoneNode), equalTo(2)); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 76111f623e0a5..0b7c5981caa76 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -59,7 +59,6 @@ import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.function.Function; @@ -103,6 +102,7 @@ public class RoutingNodes implements Iterable { private int relocatingShards = 0; private final Map> nodesPerAttributeNames; + private final Map> searchNodesPerAttributeNames; private final Map recoveriesPerNode = new HashMap<>(); private final Map initialReplicaRecoveries = new HashMap<>(); private final Map initialPrimaryRecoveries = new HashMap<>(); @@ -116,6 +116,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>()); + this.searchNodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>()); // fill in the nodeToShards with the "live" nodes for (final DiscoveryNode cursor : clusterState.nodes().getDataNodes().values()) { @@ -297,10 +298,35 @@ public Stream stream() { return nodesToShards.values().stream(); } + /** + * Retrieves all unique values for a specific awareness attribute across nodes + * which are not dedicated search nodes. + * Eg: "zone" : ["zone1", "zone2", "zone3"] + * @param attributeName The name of the awareness attribute to collect values for + * @return A set of unique attribute values for the specified attribute + */ public Set nodesPerAttributesCounts(String attributeName) { return nodesPerAttributeNames.computeIfAbsent( attributeName, - ignored -> stream().map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet()) + ignored -> stream().filter(r -> r.node().isSearchNode() == false) + .map(r -> r.node().getAttributes().get(attributeName)) + .collect(Collectors.toSet()) + ); + } + + /** + * Retrieves all unique values for a specific awareness attribute across nodes + * which are dedicated search nodes. + * Eg: "zone" : ["zone1", "zone2", "zone3"] + * @param attributeName The name of the awareness attribute to collect values for + * @return A set of unique attribute values for the specified attribute + */ + public Set searchNodesPerAttributesCounts(String attributeName) { + return searchNodesPerAttributeNames.computeIfAbsent( + attributeName, + ignored -> stream().filter(r -> r.node().isSearchNode()) + .map(r -> r.node().getAttributes().get(attributeName)) + .collect(Collectors.toSet()) ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 16c94acfbb553..6a05ca85ce542 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -32,6 +32,8 @@ package org.opensearch.cluster.routing.allocation.decider; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; @@ -48,6 +50,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -97,6 +100,8 @@ public class AwarenessAllocationDecider extends AllocationDecider { public static final String NAME = "awareness"; + public static Logger logger = LogManager.getLogger(AwarenessAllocationDecider.class); + public static final Setting> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING = Setting.listSetting( "cluster.routing.allocation.awareness.attributes", emptyList(), @@ -160,7 +165,10 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); - int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary + int shardCount = shardRouting.isSearchOnly() + ? indexMetadata.getNumberOfSearchOnlyReplicas() + : indexMetadata.getNumberOfReplicas() + 1; // 1 for primary + for (String awarenessAttribute : awarenessAttributes) { // the node the shard exists on must be associated with an awareness attribute. if (isAwarenessAttributeAssociatedWithNode(node, awarenessAttribute) == false) { @@ -175,18 +183,14 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } int currentNodeCount = getCurrentNodeCountForAttribute(shardRouting, node, allocation, moveToNode, awarenessAttribute); + Set attributeValues = getAttributeValues(shardRouting, allocation, awarenessAttribute); + int numberOfAttributes = attributeValues.size(); - // build attr_value -> nodes map - Set nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); - int numberOfAttributes = nodesPerAttribute.size(); List fullValues = forcedAwarenessAttributes.get(awarenessAttribute); - if (fullValues != null) { // If forced awareness is enabled, numberOfAttributes = count(distinct((union(discovered_attributes, forced_attributes))) Set attributesSet = new HashSet<>(fullValues); - for (String stringObjectCursor : nodesPerAttribute) { - attributesSet.add(stringObjectCursor); - } + attributesSet.addAll(attributeValues); numberOfAttributes = attributesSet.size(); } @@ -211,6 +215,13 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements"); } + private static Set getAttributeValues(ShardRouting shardRouting, RoutingAllocation allocation, String awarenessAttribute) { + if (shardRouting.isSearchOnly()) { + return allocation.routingNodes().searchNodesPerAttributesCounts(awarenessAttribute); + } + return allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); + } + private int getCurrentNodeCountForAttribute( ShardRouting shardRouting, RoutingNode node, @@ -218,40 +229,80 @@ private int getCurrentNodeCountForAttribute( boolean moveToNode, String awarenessAttribute ) { - // build the count of shards per attribute value - final String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute); - int currentNodeCount = 0; - final List assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId()); + // Get the attribute value for the current node + String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute); + + List assignedShards = getAssignedShards(allocation, shardRouting); + + // Count assigned shards with matching attribute values + int currentNodeCount = countMatchingShards(assignedShards, allocation, awarenessAttribute, shardAttributeForNode); + + // Adjust count for node movement scenarios + currentNodeCount = adjustNodeCount( + shardRouting, + node, + allocation, + moveToNode, + awarenessAttribute, + shardAttributeForNode, + currentNodeCount + ); + + return currentNodeCount; + } + + private List getAssignedShards(RoutingAllocation allocation, ShardRouting shardRouting) { + // only consider assigned shards of the same type (search vs write) + List shardRoutingList = allocation.routingNodes().assignedShards(shardRouting.shardId()); + return shardRouting.isSearchOnly() + ? shardRoutingList.stream().filter(ShardRouting::isSearchOnly).collect(Collectors.toList()) + : shardRoutingList.stream().filter(s -> !s.isSearchOnly()).collect(Collectors.toList()); + } + + private int countMatchingShards( + List assignedShards, + RoutingAllocation allocation, + String awarenessAttribute, + String shardAttributeForNode + ) { + int count = 0; for (ShardRouting assignedShard : assignedShards) { if (assignedShard.started() || assignedShard.initializing()) { - // Note: this also counts relocation targets as that will be the new location of the shard. - // Relocation sources should not be counted as the shard is moving away RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); - // Increase node count when if (getAttributeValueForNode(routingNode, awarenessAttribute).equals(shardAttributeForNode)) { - ++currentNodeCount; + count++; } } } + return count; + } + private int adjustNodeCount( + ShardRouting shardRouting, + RoutingNode node, + RoutingAllocation allocation, + boolean moveToNode, + String awarenessAttribute, + String shardAttributeForNode, + int currentNodeCount + ) { if (moveToNode) { if (shardRouting.assignedToNode()) { String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); - if (node.nodeId().equals(nodeId) == false) { - // we work on different nodes, move counts around - if (getAttributeValueForNode(allocation.routingNodes().node(nodeId), awarenessAttribute).equals(shardAttributeForNode) - && currentNodeCount > 0) { - --currentNodeCount; - } - ++currentNodeCount; + if (!node.nodeId().equals(nodeId)) { + // Adjust count when moving between nodes + RoutingNode sourceNode = allocation.routingNodes().node(nodeId); + if (getAttributeValueForNode(sourceNode, awarenessAttribute).equals(shardAttributeForNode) && currentNodeCount > 0) { + currentNodeCount--; + } + currentNodeCount++; } } else { - ++currentNodeCount; + currentNodeCount++; } } - return currentNodeCount; } @@ -262,5 +313,4 @@ private boolean isAwarenessAttributeAssociatedWithNode(RoutingNode node, String private String getAttributeValueForNode(final RoutingNode node, final String awarenessAttribute) { return node.node().getAttributes().get(awarenessAttribute); } - } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java index 02966b835fae0..d954e4675aa9a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -67,7 +67,6 @@ import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.sameInstance; @@ -1063,4 +1062,61 @@ public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() { } } + + public void testAllocationAwarenessWhenNotEnabled() { + AllocationService strategy = createAllocationService(Settings.builder().build()); + + logger.info("--> Building initial routing table"); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same zone and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder().add(newNode("node1", singletonMap("zone", "a"))).add(newNode("node2", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + + logger.info("--> add a a nodes without zone and reroute"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); + + logger.info("--> try to move the replica to new node"); + AllocationService.CommandsResult commandsResult = strategy.reroute( + clusterState, + new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")), + true, + false + ); + + assertEquals(commandsResult.explanations().explanations().size(), 1); + assertEquals(commandsResult.explanations().explanations().get(0).decisions().type(), Decision.Type.YES); + List decisions = commandsResult.explanations() + .explanations() + .get(0) + .decisions() + .getDecisions() + .stream() + .filter(item -> item.getExplanation().startsWith("allocation awareness is not enabled")) + .toList(); + assertEquals( + "allocation awareness is not enabled, set cluster setting " + "[cluster.routing.allocation.awareness.attributes] to enable it", + decisions.get(0).getExplanation() + ); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java new file mode 100644 index 0000000000000..02c636b28e80e --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java @@ -0,0 +1,291 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.command.AllocationCommands; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class SearchReplicaAwarenessAllocationTests extends OpenSearchAllocationTestCase { + + private final Logger logger = LogManager.getLogger(SearchReplicaAwarenessAllocationTests.class); + + public void testAllocationAwarenessForIndexWithSearchReplica() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("--> Building initial routing table"); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .numberOfSearchReplicas(2) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding four nodes on same zone and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", Map.of("zone", "a"))) + .add(newNode("node2", Map.of("zone", "a"))) + .add(newSearchNode("node3", Map.of("zone", "a"))) + .add(newSearchNode("node4", Map.of("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + + logger.info("--> add a two nodes with a new zone and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node5", Map.of("zone", "b"))) + .add(newSearchNode("node6", Map.of("zone", "b"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(2)); + + List shardRoutings = clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING); + shardRoutings.sort(Comparator.comparing(ShardRouting::currentNodeId)); + assertThat(shardRoutings.get(0).relocatingNodeId(), equalTo("node5")); + assertThat(shardRoutings.get(1).relocatingNodeId(), equalTo("node6")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add a new node with a new zone and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node7", Map.of("zone", "c")))) + .build(); + + ClusterState newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, equalTo(clusterState)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + } + + public void testMoveShardOnceNewNodeWithOutAwarenessAttributeAdded() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("--> Building initial routing table'"); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .numberOfSearchReplicas(1) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding four nodes on same zone and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", Map.of("zone", "a"))).add(newSearchNode("node2", Map.of("zone", "a")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + + logger.info("--> add a search node without zone and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newSearchNode("node3", Map.of("searchonly", "true")))) + .build(); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> try to move the replica to node without zone attribute"); + AllocationService.CommandsResult commandsResult = strategy.reroute( + clusterState, + new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")), + true, + false + ); + + assertEquals(commandsResult.explanations().explanations().size(), 1); + assertEquals(commandsResult.explanations().explanations().get(0).decisions().type(), Decision.Type.NO); + List decisions = commandsResult.explanations() + .explanations() + .get(0) + .decisions() + .getDecisions() + .stream() + .filter(item -> item.type() == Decision.Type.NO) + .toList(); + assertEquals( + "node does not contain the awareness attribute [zone]; " + + "required attributes cluster setting [cluster.routing.allocation.awareness.attributes=zone]", + decisions.get(0).getExplanation() + ); + } + + public void testFullAwarenessWithSearchReplica() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") + .build() + ); + + logger.info("Building initial routing table"); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .numberOfSearchReplicas(2) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding three nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", Map.of("zone", "a"))).add(newSearchNode("node2", Map.of("zone", "a")))) + .build(); + + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> one search replica will not start because we have only one zone value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(1)); + + logger.info("--> add a new node with a new zome and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newSearchNode("node3", Map.of("zone", "2")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node3")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new zone, make sure nothing moves"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "c")))) + .build(); + ClusterState newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, equalTo(clusterState)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index 102a0e5aa2e6d..f0e4502787b28 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -170,6 +170,10 @@ protected static DiscoveryNode newSearchNode(String nodeId, Version version) { return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), SEARCH_ROLE, version); } + protected static DiscoveryNode newSearchNode(String nodeId, Map attributes) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), attributes, SEARCH_ROLE, Version.CURRENT); + } + protected static DiscoveryNode newNode(String nodeName, String nodeId, Map attributes) { return new DiscoveryNode(nodeName, nodeId, buildNewFakeTransportAddress(), attributes, CLUSTER_MANAGER_DATA_ROLES, Version.CURRENT); } From 4d100fdf638c0c8a0e9eb4d90ab175d25ba25fbd Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Wed, 19 Mar 2025 13:48:41 -0700 Subject: [PATCH 2/8] Support allocation awareness balance for search replica Signed-off-by: Vinay Krishna Pudyodu --- .../allocation/AwarenessAllocationIT.java | 29 ++++++++- .../SearchReplicaAwarenessAllocationIT.java | 63 +++++++++++++++++++ .../metadata/MetadataCreateIndexService.java | 13 ++-- .../MetadataUpdateSettingsService.java | 18 +++++- .../allocation/AwarenessReplicaBalance.java | 15 ++++- .../AwarenessReplicaBalanceTests.java | 49 ++++++++++----- 6 files changed, 164 insertions(+), 23 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java index 522d63b22a0da..f947ca1ccbb85 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java @@ -364,7 +364,6 @@ public void testAwarenessZonesIncrementalNodes() { assertThat(counts.get(noZoneNode), equalTo(2)); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5908") public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws Exception { int nodeCountPerAZ = 5; int numOfShards = 30; @@ -504,4 +503,32 @@ public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws E assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1))); assertThat(health.isTimedOut(), equalTo(false)); } + + public void testAwarenessBalanceWithForcedAwarenessCreateAndUpdateIndex() { + Settings settings = Settings.builder() + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.balance", "true") + .build(); + + logger.info("--> starting 3 nodes on zones a,b,c"); + internalCluster().startNodes( + Settings.builder().put(settings).put("node.attr.zone", "a").build(), + Settings.builder().put(settings).put("node.attr.zone", "b").build(), + Settings.builder().put(settings).put("node.attr.zone", "c").build() + ); + + // Create index with 2 replicas ie total 3 shards + createIndex( + "test-idx", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build() + ); + + // Update the number of replicas to 4 + final Settings newsettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build(); + + assertThrows(IllegalArgumentException.class, () -> { + assertAcked(client().admin().indices().prepareUpdateSettings("test-idx").setSettings(newsettings)); + }); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java index 4f42d76362d65..bc13cabe11e92 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java @@ -54,6 +54,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.test.NodeRoles.searchOnlyNode; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; @@ -254,4 +255,66 @@ public void testAwarenessZonesIncrementalNodes() { assertThat(counts.get(B_3), equalTo(3)); assertThat(counts.get(noZoneNode), equalTo(2)); } + + public void testAwarenessBalanceWithForcedAwarenessCreateIndex() { + Settings settings = Settings.builder() + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.balance", "true") + .build(); + + logger.info("--> starting 3 nodes on zones a,b,c"); + internalCluster().startNodes( + Settings.builder().put(settings).put("node.attr.zone", "a").build(), + Settings.builder().put(settings).put("node.attr.zone", "b").build(), + Settings.builder().put(settings).put("node.attr.zone", "c").build() + ); + + // Create index with 2 replicas and 2 search replicas + assertThrows(IllegalArgumentException.class, () -> { + createIndex( + "test-idx", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) + .build() + ); + }); + } + + public void testAwarenessBalanceWithForcedAwarenessUpdateIndex() { + Settings settings = Settings.builder() + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.balance", "true") + .build(); + + logger.info("--> starting 3 nodes on zones a,b,c"); + internalCluster().startNodes( + Settings.builder().put(settings).put("node.attr.zone", "a").build(), + Settings.builder().put(settings).put("node.attr.zone", "b").build(), + Settings.builder().put(settings).put("node.attr.zone", "c").build() + ); + + // Create index with 2 replicas and 3 search replicas + createIndex( + "test-idx", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 3) + .build() + ); + + // Update the number of search replicas to 4 + assertThrows(IllegalArgumentException.class, () -> { + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings("test-idx") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 4).build()) + ); + }); + } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 2bdd31b23aee3..df0a68e50115c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1500,11 +1500,16 @@ List getIndexSettingsValidationErrors( IndexMetadata.SETTING_NUMBER_OF_REPLICAS, DEFAULT_REPLICA_COUNT_SETTING.get(this.clusterService.state().metadata().settings()) ); + int searchReplicaCount = settings.getAsInt(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0); AutoExpandReplicas autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); - Optional error = awarenessReplicaBalance.validate(replicaCount, autoExpandReplica); - if (error.isPresent()) { - validationErrors.add(error.get()); - } + + Optional replicaValidationError = awarenessReplicaBalance.validateReplicas(replicaCount, autoExpandReplica); + replicaValidationError.ifPresent(validationErrors::add); + Optional searchReplicaValidationError = awarenessReplicaBalance.validateSearchReplicas( + searchReplicaCount, + autoExpandReplica + ); + searchReplicaValidationError.ifPresent(validationErrors::add); } return validationErrors; } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 469bec7220721..0045938a5db96 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -259,7 +259,7 @@ public ClusterState execute(ClusterState currentState) { for (Index index : request.indices()) { if (index.getName().charAt(0) != '.') { // No replica count validation for system indices - Optional error = awarenessReplicaBalance.validate( + Optional error = awarenessReplicaBalance.validateReplicas( updatedNumberOfReplicas, AutoExpandReplicas.SETTING.get(openSettings) ); @@ -301,7 +301,21 @@ public ClusterState execute(ClusterState currentState) { } final int updatedNumberOfSearchReplicas = IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(openSettings); if (preserveExisting == false) { - // TODO: Honor awareness validation to search replicas. + for (Index index : request.indices()) { + if (index.getName().charAt(0) != '.') { + // No replica count validation for system indices + Optional error = awarenessReplicaBalance.validateSearchReplicas( + updatedNumberOfSearchReplicas, + AutoExpandReplicas.SETTING.get(openSettings) + ); + + if (error.isPresent()) { + ValidationException ex = new ValidationException(); + ex.addValidationError(error.get()); + throw ex; + } + } + } // Verify that this won't take us over the cluster shard limit. int totalNewShards = Arrays.stream(request.indices()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java index 6fc0e535ef4dc..07bdd9936bc78 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java @@ -102,7 +102,7 @@ public int maxAwarenessAttributes() { return awarenessAttributes; } - public Optional validate(int replicaCount, AutoExpandReplicas autoExpandReplica) { + public Optional validateReplicas(int replicaCount, AutoExpandReplicas autoExpandReplica) { if (autoExpandReplica.isEnabled()) { if ((autoExpandReplica.getMaxReplicas() != Integer.MAX_VALUE) && ((autoExpandReplica.getMaxReplicas() + 1) % maxAwarenessAttributes() != 0)) { @@ -122,4 +122,17 @@ public Optional validate(int replicaCount, AutoExpandReplicas autoExpand return Optional.empty(); } + public Optional validateSearchReplicas(int searchReplicaCount, AutoExpandReplicas autoExpandReplica) { + if (autoExpandReplica.isEnabled()) { + // TODO: For now Search replicas do not support auto expand, when we add support update this validation + } else { + if (searchReplicaCount > 0 && searchReplicaCount % maxAwarenessAttributes() != 0) { + String errorMessage = "total search replicas needs to be a multiple of total awareness attributes [" + + maxAwarenessAttributes() + + "]"; + return Optional.of(errorMessage); + } + } + return Optional.empty(); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java index 019db47e74cc3..794745d5fda21 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java @@ -35,8 +35,11 @@ public void testNoForcedAwarenessAttribute() { AwarenessReplicaBalance awarenessReplicaBalance = new AwarenessReplicaBalance(settings, EMPTY_CLUSTER_SETTINGS); assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(1)); - assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); + + assertEquals(awarenessReplicaBalance.validateSearchReplicas(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(1, autoExpandReplica), Optional.empty()); } public void testForcedAwarenessAttribute() { @@ -52,9 +55,9 @@ public void testForcedAwarenessAttribute() { AwarenessReplicaBalance awarenessReplicaBalance = new AwarenessReplicaBalance(settings, EMPTY_CLUSTER_SETTINGS); AutoExpandReplicas autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(3)); - assertEquals(awarenessReplicaBalance.validate(2, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(2, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); // When auto expand replica settings is passed as max cap settings = Settings.builder() @@ -68,9 +71,15 @@ public void testForcedAwarenessAttribute() { awarenessReplicaBalance = new AwarenessReplicaBalance(settings, EMPTY_CLUSTER_SETTINGS); autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); - assertEquals(awarenessReplicaBalance.validate(2, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); + assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(3)); + assertEquals(awarenessReplicaBalance.validateReplicas(2, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); + + assertEquals(awarenessReplicaBalance.validateSearchReplicas(3, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(2, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(0, autoExpandReplica), Optional.empty()); // when auto expand is not valid set as per zone awareness settings = Settings.builder() @@ -85,11 +94,11 @@ public void testForcedAwarenessAttribute() { autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); assertEquals( - awarenessReplicaBalance.validate(1, autoExpandReplica), + awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.of("expected max cap on auto expand to be a multiple of total awareness attributes [3]") ); assertEquals( - awarenessReplicaBalance.validate(2, autoExpandReplica), + awarenessReplicaBalance.validateReplicas(2, autoExpandReplica), Optional.of("expected max cap on auto expand to be a multiple of total awareness attributes [3]") ); @@ -104,16 +113,26 @@ public void testForcedAwarenessAttribute() { awarenessReplicaBalance = new AwarenessReplicaBalance(settings, EMPTY_CLUSTER_SETTINGS); autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); - assertEquals(awarenessReplicaBalance.validate(2, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(2, autoExpandReplica), Optional.empty()); assertEquals( - awarenessReplicaBalance.validate(1, autoExpandReplica), + awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.of("expected total copies needs to be a multiple of total awareness attributes [3]") ); assertEquals( - awarenessReplicaBalance.validate(0, autoExpandReplica), + awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.of("expected total copies needs to be a multiple of total awareness attributes [3]") ); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(3, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(0, autoExpandReplica), Optional.empty()); + assertEquals( + awarenessReplicaBalance.validateSearchReplicas(2, autoExpandReplica), + Optional.of("total search replicas needs to be a multiple of total awareness attributes [3]") + ); + assertEquals( + awarenessReplicaBalance.validateSearchReplicas(1, autoExpandReplica), + Optional.of("total search replicas needs to be a multiple of total awareness attributes [3]") + ); } public void testForcedAwarenessAttributeDisabled() { @@ -127,8 +146,8 @@ public void testForcedAwarenessAttributeDisabled() { AutoExpandReplicas autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(1)); - assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); } } From 5bdf41da655fca2113845699c513b872fff2f267 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Fri, 21 Mar 2025 15:01:40 -0700 Subject: [PATCH 3/8] Updated awareness balance condition Signed-off-by: Vinay Krishna Pudyodu --- .../metadata/MetadataCreateIndexService.java | 5 +---- .../metadata/MetadataUpdateSettingsService.java | 5 +---- .../allocation/AwarenessReplicaBalance.java | 17 +++++++---------- .../AwarenessReplicaBalanceTests.java | 17 ++++++----------- 4 files changed, 15 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index df0a68e50115c..9bf90fbba58fa 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1505,10 +1505,7 @@ List getIndexSettingsValidationErrors( Optional replicaValidationError = awarenessReplicaBalance.validateReplicas(replicaCount, autoExpandReplica); replicaValidationError.ifPresent(validationErrors::add); - Optional searchReplicaValidationError = awarenessReplicaBalance.validateSearchReplicas( - searchReplicaCount, - autoExpandReplica - ); + Optional searchReplicaValidationError = awarenessReplicaBalance.validateSearchReplicas(searchReplicaCount); searchReplicaValidationError.ifPresent(validationErrors::add); } return validationErrors; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 0045938a5db96..54c8722186057 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -304,10 +304,7 @@ public ClusterState execute(ClusterState currentState) { for (Index index : request.indices()) { if (index.getName().charAt(0) != '.') { // No replica count validation for system indices - Optional error = awarenessReplicaBalance.validateSearchReplicas( - updatedNumberOfSearchReplicas, - AutoExpandReplicas.SETTING.get(openSettings) - ); + Optional error = awarenessReplicaBalance.validateSearchReplicas(updatedNumberOfSearchReplicas); if (error.isPresent()) { ValidationException ex = new ValidationException(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java index 07bdd9936bc78..ecc0184b8714c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java @@ -122,16 +122,13 @@ public Optional validateReplicas(int replicaCount, AutoExpandReplicas au return Optional.empty(); } - public Optional validateSearchReplicas(int searchReplicaCount, AutoExpandReplicas autoExpandReplica) { - if (autoExpandReplica.isEnabled()) { - // TODO: For now Search replicas do not support auto expand, when we add support update this validation - } else { - if (searchReplicaCount > 0 && searchReplicaCount % maxAwarenessAttributes() != 0) { - String errorMessage = "total search replicas needs to be a multiple of total awareness attributes [" - + maxAwarenessAttributes() - + "]"; - return Optional.of(errorMessage); - } + public Optional validateSearchReplicas(int searchReplicaCount) { + // TODO: For now Search replicas do not support auto expand, when we add support update this validation + if (searchReplicaCount > 0 && searchReplicaCount % maxAwarenessAttributes() != 0) { + String errorMessage = "total search replicas needs to be a multiple of total awareness attributes [" + + maxAwarenessAttributes() + + "]"; + return Optional.of(errorMessage); } return Optional.empty(); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java index 794745d5fda21..f6d1ef7e55191 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java @@ -38,8 +38,8 @@ public void testNoForcedAwarenessAttribute() { assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(0, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(0), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(1), Optional.empty()); } public void testForcedAwarenessAttribute() { @@ -76,11 +76,6 @@ public void testForcedAwarenessAttribute() { assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(3, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(2, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(1, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(0, autoExpandReplica), Optional.empty()); - // when auto expand is not valid set as per zone awareness settings = Settings.builder() .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone, rack") @@ -123,14 +118,14 @@ public void testForcedAwarenessAttribute() { Optional.of("expected total copies needs to be a multiple of total awareness attributes [3]") ); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(3, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(3), Optional.empty()); + assertEquals(awarenessReplicaBalance.validateSearchReplicas(0), Optional.empty()); assertEquals( - awarenessReplicaBalance.validateSearchReplicas(2, autoExpandReplica), + awarenessReplicaBalance.validateSearchReplicas(2), Optional.of("total search replicas needs to be a multiple of total awareness attributes [3]") ); assertEquals( - awarenessReplicaBalance.validateSearchReplicas(1, autoExpandReplica), + awarenessReplicaBalance.validateSearchReplicas(1), Optional.of("total search replicas needs to be a multiple of total awareness attributes [3]") ); } From 3864e3a70d32c2afed248f82ad5fea255cc48944 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Thu, 27 Mar 2025 12:47:53 -0700 Subject: [PATCH 4/8] Address PR comments Signed-off-by: Vinay Krishna Pudyodu --- .../SearchReplicaAwarenessAllocationIT.java | 146 +++++++++--------- .../decider/AwarenessAllocationDecider.java | 62 ++------ ...SearchReplicaAwarenessAllocationTests.java | 24 --- 3 files changed, 88 insertions(+), 144 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java index bc13cabe11e92..3e610df1887ed 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAwarenessAllocationIT.java @@ -6,30 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.cluster.allocation; import org.apache.logging.log4j.LogManager; @@ -40,6 +16,7 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; @@ -78,9 +55,6 @@ public void testAllocationAwarenessZones() { List nodes = internalCluster().startNodes( Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), - Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), - Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), - Settings.builder().put(commonSettings).put("node.attr.zone", "a").put(searchOnlyNode()).build(), Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build(), Settings.builder().put(commonSettings).put("node.attr.zone", "b").put(searchOnlyNode()).build(), @@ -88,14 +62,14 @@ public void testAllocationAwarenessZones() { ); logger.info("--> waiting for nodes to form a cluster"); - ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("8").execute().actionGet(); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("6").execute().actionGet(); assertThat(health.isTimedOut(), equalTo(false)); logger.info("--> create index"); createIndex( "test", Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) @@ -116,20 +90,29 @@ public void testAllocationAwarenessZones() { } } - assertThat(counts.get(nodes.get(3)), anyOf(equalTo(2), equalTo(3))); - assertThat(counts.get(nodes.get(2)), anyOf(equalTo(2), equalTo(3))); - assertThat(counts.get(nodes.get(0)), anyOf(equalTo(2), equalTo(3))); - assertThat(counts.get(nodes.get(1)), anyOf(equalTo(2), equalTo(3))); - assertThat(counts.get(nodes.get(4)), anyOf(equalTo(2), equalTo(3))); - assertThat(counts.get(nodes.get(5)), anyOf(equalTo(2), equalTo(3))); - assertThat(counts.get(nodes.get(6)), anyOf(equalTo(2), equalTo(3))); - assertThat(counts.get(nodes.get(7)), anyOf(equalTo(2), equalTo(3))); + /* + * Ensures that shards are distributed across different zones in the cluster. + * Given two zones (a and b) with one data node in each, the shards are evenly distributed, + * resulting in each data node being assigned three shards. + */ + for (int i = 0; i < 2; i++) { + assertThat(counts.get(nodes.get(i)), equalTo(3)); + } + + /* + * There are two search nodes in each zone, totaling four search nodes. + * With six search shards to allocate, they are assigned using a best-effort spread, + * ensuring each search node receives either one or two shards. + */ + for (int i = 2; i < 6; i++) { + assertThat(counts.get(nodes.get(i)), anyOf(equalTo(1), equalTo(2))); + } } public void testAwarenessZonesIncrementalNodes() { Settings commonSettings = Settings.builder() - .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") - .put("cluster.routing.allocation.awareness.attributes", "zone") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") .build(); logger.info("--> starting 2 nodes on zones 'a' & 'b'"); @@ -143,7 +126,7 @@ public void testAwarenessZonesIncrementalNodes() { createIndex( "test", Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) .build() @@ -161,10 +144,14 @@ public void testAwarenessZonesIncrementalNodes() { } } } - assertThat(counts.get(nodes.get(0)), equalTo(5)); - assertThat(counts.get(nodes.get(1)), equalTo(5)); - assertThat(counts.get(nodes.get(2)), equalTo(5)); - assertThat(counts.get(nodes.get(3)), equalTo(5)); + + /* + * The cluster consists of two zones, each containing one data node and one search node. + * Replicas and search replicas are evenly distributed across these zones. + */ + for (int i = 0; i < 4; i++) { + assertThat(counts.get(nodes.get(i)), equalTo(3)); + } logger.info("--> starting another data and search node in zone 'b'"); @@ -191,12 +178,19 @@ public void testAwarenessZonesIncrementalNodes() { } } - assertThat(counts.get(nodes.get(0)), equalTo(5)); - assertThat(counts.get(nodes.get(1)), equalTo(3)); - assertThat(counts.get(nodes.get(2)), equalTo(5)); - assertThat(counts.get(nodes.get(3)), equalTo(3)); - assertThat(counts.get(B_2), equalTo(2)); - assertThat(counts.get(B_3), equalTo(2)); + /* + * Adding a new data node and a new search node in zone B results in: + * - Zone A: 1 data node, 1 search node + * - Zone B: 2 data nodes, 2 search nodes + * + * As a result, shards are rerouted to maintain a best-effort balanced allocation. + */ + assertThat(counts.get(nodes.get(0)), equalTo(3)); + assertThat(counts.get(nodes.get(1)), equalTo(2)); + assertThat(counts.get(nodes.get(2)), equalTo(3)); + assertThat(counts.get(nodes.get(3)), equalTo(2)); + assertThat(counts.get(B_2), equalTo(1)); + assertThat(counts.get(B_3), equalTo(1)); logger.info("--> starting another data node without any zone"); @@ -218,12 +212,16 @@ public void testAwarenessZonesIncrementalNodes() { logger.info("--> Ensure there was not rerouting"); - assertThat(counts.get(nodes.get(0)), equalTo(5)); - assertThat(counts.get(nodes.get(1)), equalTo(3)); - assertThat(counts.get(nodes.get(2)), equalTo(5)); - assertThat(counts.get(nodes.get(3)), equalTo(3)); - assertThat(counts.get(B_2), equalTo(2)); - assertThat(counts.get(B_3), equalTo(2)); + /* + * Adding another node to the cluster without a zone attribute + * does not trigger shard reallocation; existing shard assignments remain unchanged. + */ + assertThat(counts.get(nodes.get(0)), equalTo(3)); + assertThat(counts.get(nodes.get(1)), equalTo(2)); + assertThat(counts.get(nodes.get(2)), equalTo(3)); + assertThat(counts.get(nodes.get(3)), equalTo(2)); + assertThat(counts.get(B_2), equalTo(1)); + assertThat(counts.get(B_3), equalTo(1)); assertThat(counts.containsKey(noZoneNode), equalTo(false)); logger.info("--> Remove the awareness attribute setting"); @@ -231,7 +229,11 @@ public void testAwarenessZonesIncrementalNodes() { client().admin() .cluster() .prepareUpdateSettings() - .setTransientSettings(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "").build()) + .setTransientSettings( + Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "") + .build() + ) .get(); ensureGreen("test"); @@ -247,20 +249,24 @@ public void testAwarenessZonesIncrementalNodes() { } } - assertThat(counts.get(nodes.get(0)), equalTo(3)); - assertThat(counts.get(nodes.get(1)), equalTo(3)); - assertThat(counts.get(nodes.get(2)), equalTo(4)); - assertThat(counts.get(nodes.get(3)), equalTo(3)); - assertThat(counts.get(B_2), equalTo(2)); - assertThat(counts.get(B_3), equalTo(3)); - assertThat(counts.get(noZoneNode), equalTo(2)); + /* + * Removing allocation awareness attributes from the cluster disables zone-based distribution. + * Shards are then assigned based solely the other deciders in the cluster manager. + */ + assertThat(counts.get(nodes.get(0)), equalTo(2)); + assertThat(counts.get(nodes.get(1)), equalTo(2)); + assertThat(counts.get(nodes.get(2)), equalTo(2)); + assertThat(counts.get(nodes.get(3)), equalTo(2)); + assertThat(counts.get(B_2), equalTo(1)); + assertThat(counts.get(B_3), equalTo(2)); + assertThat(counts.get(noZoneNode), equalTo(1)); } public void testAwarenessBalanceWithForcedAwarenessCreateIndex() { Settings settings = Settings.builder() - .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put("cluster.routing.allocation.awareness.balance", "true") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b,c") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), "true") .build(); logger.info("--> starting 3 nodes on zones a,b,c"); @@ -285,9 +291,9 @@ public void testAwarenessBalanceWithForcedAwarenessCreateIndex() { public void testAwarenessBalanceWithForcedAwarenessUpdateIndex() { Settings settings = Settings.builder() - .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put("cluster.routing.allocation.awareness.balance", "true") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b,c") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), "true") .build(); logger.info("--> starting 3 nodes on zones a,b,c"); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 6a05ca85ce542..2a323196bb542 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -32,8 +32,6 @@ package org.opensearch.cluster.routing.allocation.decider; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; @@ -100,8 +98,6 @@ public class AwarenessAllocationDecider extends AllocationDecider { public static final String NAME = "awareness"; - public static Logger logger = LogManager.getLogger(AwarenessAllocationDecider.class); - public static final Setting> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING = Setting.listSetting( "cluster.routing.allocation.awareness.attributes", emptyList(), @@ -231,62 +227,20 @@ private int getCurrentNodeCountForAttribute( ) { // Get the attribute value for the current node String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute); - + // Get all assigned shards of the same type List assignedShards = getAssignedShards(allocation, shardRouting); - // Count assigned shards with matching attribute values - int currentNodeCount = countMatchingShards(assignedShards, allocation, awarenessAttribute, shardAttributeForNode); - - // Adjust count for node movement scenarios - currentNodeCount = adjustNodeCount( - shardRouting, - node, - allocation, - moveToNode, - awarenessAttribute, - shardAttributeForNode, - currentNodeCount - ); - - return currentNodeCount; - } - - private List getAssignedShards(RoutingAllocation allocation, ShardRouting shardRouting) { - // only consider assigned shards of the same type (search vs write) - List shardRoutingList = allocation.routingNodes().assignedShards(shardRouting.shardId()); - - return shardRouting.isSearchOnly() - ? shardRoutingList.stream().filter(ShardRouting::isSearchOnly).collect(Collectors.toList()) - : shardRoutingList.stream().filter(s -> !s.isSearchOnly()).collect(Collectors.toList()); - } - - private int countMatchingShards( - List assignedShards, - RoutingAllocation allocation, - String awarenessAttribute, - String shardAttributeForNode - ) { - int count = 0; + int currentNodeCount = 0; for (ShardRouting assignedShard : assignedShards) { if (assignedShard.started() || assignedShard.initializing()) { RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); if (getAttributeValueForNode(routingNode, awarenessAttribute).equals(shardAttributeForNode)) { - count++; + currentNodeCount++; } } } - return count; - } - private int adjustNodeCount( - ShardRouting shardRouting, - RoutingNode node, - RoutingAllocation allocation, - boolean moveToNode, - String awarenessAttribute, - String shardAttributeForNode, - int currentNodeCount - ) { + // Adjust count for node movement scenarios if (moveToNode) { if (shardRouting.assignedToNode()) { String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); @@ -306,6 +260,14 @@ private int adjustNodeCount( return currentNodeCount; } + private List getAssignedShards(RoutingAllocation allocation, ShardRouting shardRouting) { + return allocation.routingNodes() + .assignedShards(shardRouting.shardId()) + .stream() + .filter(s -> s.isSearchOnly() == shardRouting.isSearchOnly()) + .collect(Collectors.toList()); + } + private boolean isAwarenessAttributeAssociatedWithNode(RoutingNode node, String awarenessAttribute) { return node.node().getAttributes().containsKey(awarenessAttribute); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java index 02c636b28e80e..b757d5911d204 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAwarenessAllocationTests.java @@ -6,30 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.cluster.routing.allocation.decider; import org.apache.logging.log4j.LogManager; From 69ad375365ece1eded663af2e5244ce257f8a639 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Thu, 27 Mar 2025 16:45:34 -0700 Subject: [PATCH 5/8] updated the if condition check Signed-off-by: Vinay Krishna Pudyodu --- .../routing/allocation/decider/AwarenessAllocationDecider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 2a323196bb542..addce7fb8d5fb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -245,7 +245,7 @@ private int getCurrentNodeCountForAttribute( if (shardRouting.assignedToNode()) { String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); - if (!node.nodeId().equals(nodeId)) { + if (node.nodeId().equals(nodeId) == false) { // Adjust count when moving between nodes RoutingNode sourceNode = allocation.routingNodes().node(nodeId); if (getAttributeValueForNode(sourceNode, awarenessAttribute).equals(shardAttributeForNode) && currentNodeCount > 0) { From d36232577571279dda2690837bd0b46064360232 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Mon, 31 Mar 2025 13:03:02 -0700 Subject: [PATCH 6/8] changes based on the PR comments, reducing diffs Signed-off-by: Vinay Krishna Pudyodu --- .../metadata/MetadataCreateIndexService.java | 4 +- .../MetadataUpdateSettingsService.java | 4 +- .../cluster/routing/RoutingNodes.java | 23 ++++------ .../allocation/AwarenessReplicaBalance.java | 4 +- .../decider/AwarenessAllocationDecider.java | 30 ++++++------- .../AwarenessReplicaBalanceTests.java | 42 +++++++++---------- 6 files changed, 51 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 9bf90fbba58fa..3483c14df6272 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1503,9 +1503,9 @@ List getIndexSettingsValidationErrors( int searchReplicaCount = settings.getAsInt(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0); AutoExpandReplicas autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); - Optional replicaValidationError = awarenessReplicaBalance.validateReplicas(replicaCount, autoExpandReplica); + Optional replicaValidationError = awarenessReplicaBalance.validate(replicaCount, autoExpandReplica); replicaValidationError.ifPresent(validationErrors::add); - Optional searchReplicaValidationError = awarenessReplicaBalance.validateSearchReplicas(searchReplicaCount); + Optional searchReplicaValidationError = awarenessReplicaBalance.validate(searchReplicaCount); searchReplicaValidationError.ifPresent(validationErrors::add); } return validationErrors; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 54c8722186057..fff704210ca7a 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -259,7 +259,7 @@ public ClusterState execute(ClusterState currentState) { for (Index index : request.indices()) { if (index.getName().charAt(0) != '.') { // No replica count validation for system indices - Optional error = awarenessReplicaBalance.validateReplicas( + Optional error = awarenessReplicaBalance.validate( updatedNumberOfReplicas, AutoExpandReplicas.SETTING.get(openSettings) ); @@ -304,7 +304,7 @@ public ClusterState execute(ClusterState currentState) { for (Index index : request.indices()) { if (index.getName().charAt(0) != '.') { // No replica count validation for system indices - Optional error = awarenessReplicaBalance.validateSearchReplicas(updatedNumberOfSearchReplicas); + Optional error = awarenessReplicaBalance.validate(updatedNumberOfSearchReplicas); if (error.isPresent()) { ValidationException ex = new ValidationException(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 0b7c5981caa76..6db70cc5f4fc5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -299,34 +299,27 @@ public Stream stream() { } /** - * Retrieves all unique values for a specific awareness attribute across nodes - * which are not dedicated search nodes. + * Retrieves all unique values for a specific awareness attribute across all nodes * Eg: "zone" : ["zone1", "zone2", "zone3"] * @param attributeName The name of the awareness attribute to collect values for * @return A set of unique attribute values for the specified attribute */ public Set nodesPerAttributesCounts(String attributeName) { - return nodesPerAttributeNames.computeIfAbsent( - attributeName, - ignored -> stream().filter(r -> r.node().isSearchNode() == false) - .map(r -> r.node().getAttributes().get(attributeName)) - .collect(Collectors.toSet()) - ); + return nodesPerAttributesCounts(attributeName, routingNode -> true); } /** - * Retrieves all unique values for a specific awareness attribute across nodes - * which are dedicated search nodes. + * Retrieves all unique values for a specific awareness attribute across filtered nodes * Eg: "zone" : ["zone1", "zone2", "zone3"] * @param attributeName The name of the awareness attribute to collect values for + * @param routingNodeFilter filters the routing nodes based on given condition * @return A set of unique attribute values for the specified attribute */ - public Set searchNodesPerAttributesCounts(String attributeName) { - return searchNodesPerAttributeNames.computeIfAbsent( + public Set nodesPerAttributesCounts(String attributeName, Predicate routingNodeFilter) { + + return nodesPerAttributeNames.computeIfAbsent( attributeName, - ignored -> stream().filter(r -> r.node().isSearchNode()) - .map(r -> r.node().getAttributes().get(attributeName)) - .collect(Collectors.toSet()) + ignored -> stream().filter(routingNodeFilter).map(r -> r.node().getAttributes().get(attributeName)).collect(Collectors.toSet()) ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java index ecc0184b8714c..d2cf30bd31983 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalance.java @@ -102,7 +102,7 @@ public int maxAwarenessAttributes() { return awarenessAttributes; } - public Optional validateReplicas(int replicaCount, AutoExpandReplicas autoExpandReplica) { + public Optional validate(int replicaCount, AutoExpandReplicas autoExpandReplica) { if (autoExpandReplica.isEnabled()) { if ((autoExpandReplica.getMaxReplicas() != Integer.MAX_VALUE) && ((autoExpandReplica.getMaxReplicas() + 1) % maxAwarenessAttributes() != 0)) { @@ -122,7 +122,7 @@ public Optional validateReplicas(int replicaCount, AutoExpandReplicas au return Optional.empty(); } - public Optional validateSearchReplicas(int searchReplicaCount) { + public Optional validate(int searchReplicaCount) { // TODO: For now Search replicas do not support auto expand, when we add support update this validation if (searchReplicaCount > 0 && searchReplicaCount % maxAwarenessAttributes() != 0) { String errorMessage = "total search replicas needs to be a multiple of total awareness attributes [" diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index addce7fb8d5fb..1d049141e9f8b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -212,10 +212,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } private static Set getAttributeValues(ShardRouting shardRouting, RoutingAllocation allocation, String awarenessAttribute) { - if (shardRouting.isSearchOnly()) { - return allocation.routingNodes().searchNodesPerAttributesCounts(awarenessAttribute); - } - return allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); + return allocation.routingNodes() + .nodesPerAttributesCounts(awarenessAttribute, routingNode -> routingNode.node().isSearchNode() == shardRouting.isSearchOnly()); } private int getCurrentNodeCountForAttribute( @@ -225,38 +223,42 @@ private int getCurrentNodeCountForAttribute( boolean moveToNode, String awarenessAttribute ) { - // Get the attribute value for the current node - String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute); + // build the count of shards per attribute value + final String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute); // Get all assigned shards of the same type List assignedShards = getAssignedShards(allocation, shardRouting); // Count assigned shards with matching attribute values int currentNodeCount = 0; for (ShardRouting assignedShard : assignedShards) { if (assignedShard.started() || assignedShard.initializing()) { + // Note: this also counts relocation targets as that will be the new location of the shard. + // Relocation sources should not be counted as the shard is moving away RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); + // Increase node count when if (getAttributeValueForNode(routingNode, awarenessAttribute).equals(shardAttributeForNode)) { - currentNodeCount++; + ++currentNodeCount; } } } - // Adjust count for node movement scenarios if (moveToNode) { if (shardRouting.assignedToNode()) { String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); if (node.nodeId().equals(nodeId) == false) { - // Adjust count when moving between nodes - RoutingNode sourceNode = allocation.routingNodes().node(nodeId); - if (getAttributeValueForNode(sourceNode, awarenessAttribute).equals(shardAttributeForNode) && currentNodeCount > 0) { - currentNodeCount--; + // we work on different nodes, move counts around + if (getAttributeValueForNode(allocation.routingNodes().node(nodeId), awarenessAttribute).equals(shardAttributeForNode) + && currentNodeCount > 0) { + --currentNodeCount; } - currentNodeCount++; + + ++currentNodeCount; } } else { - currentNodeCount++; + ++currentNodeCount; } } + return currentNodeCount; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java index f6d1ef7e55191..c6134330727aa 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessReplicaBalanceTests.java @@ -35,11 +35,11 @@ public void testNoForcedAwarenessAttribute() { AwarenessReplicaBalance awarenessReplicaBalance = new AwarenessReplicaBalance(settings, EMPTY_CLUSTER_SETTINGS); assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(1)); - assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(0), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(1), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(0), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(1), Optional.empty()); } public void testForcedAwarenessAttribute() { @@ -55,9 +55,9 @@ public void testForcedAwarenessAttribute() { AwarenessReplicaBalance awarenessReplicaBalance = new AwarenessReplicaBalance(settings, EMPTY_CLUSTER_SETTINGS); AutoExpandReplicas autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(3)); - assertEquals(awarenessReplicaBalance.validateReplicas(2, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(2, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); // When auto expand replica settings is passed as max cap settings = Settings.builder() @@ -72,9 +72,9 @@ public void testForcedAwarenessAttribute() { autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(3)); - assertEquals(awarenessReplicaBalance.validateReplicas(2, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(2, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); // when auto expand is not valid set as per zone awareness settings = Settings.builder() @@ -89,11 +89,11 @@ public void testForcedAwarenessAttribute() { autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); assertEquals( - awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), + awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.of("expected max cap on auto expand to be a multiple of total awareness attributes [3]") ); assertEquals( - awarenessReplicaBalance.validateReplicas(2, autoExpandReplica), + awarenessReplicaBalance.validate(2, autoExpandReplica), Optional.of("expected max cap on auto expand to be a multiple of total awareness attributes [3]") ); @@ -108,24 +108,24 @@ public void testForcedAwarenessAttribute() { awarenessReplicaBalance = new AwarenessReplicaBalance(settings, EMPTY_CLUSTER_SETTINGS); autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); - assertEquals(awarenessReplicaBalance.validateReplicas(2, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(2, autoExpandReplica), Optional.empty()); assertEquals( - awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), + awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.of("expected total copies needs to be a multiple of total awareness attributes [3]") ); assertEquals( - awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), + awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.of("expected total copies needs to be a multiple of total awareness attributes [3]") ); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(3), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateSearchReplicas(0), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(3), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(0), Optional.empty()); assertEquals( - awarenessReplicaBalance.validateSearchReplicas(2), + awarenessReplicaBalance.validate(2), Optional.of("total search replicas needs to be a multiple of total awareness attributes [3]") ); assertEquals( - awarenessReplicaBalance.validateSearchReplicas(1), + awarenessReplicaBalance.validate(1), Optional.of("total search replicas needs to be a multiple of total awareness attributes [3]") ); } @@ -141,8 +141,8 @@ public void testForcedAwarenessAttributeDisabled() { AutoExpandReplicas autoExpandReplica = AutoExpandReplicas.SETTING.get(settings); assertThat(awarenessReplicaBalance.maxAwarenessAttributes(), equalTo(1)); - assertEquals(awarenessReplicaBalance.validateReplicas(0, autoExpandReplica), Optional.empty()); - assertEquals(awarenessReplicaBalance.validateReplicas(1, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(0, autoExpandReplica), Optional.empty()); + assertEquals(awarenessReplicaBalance.validate(1, autoExpandReplica), Optional.empty()); } } From 830d190af90d6583f9ec59a517e4b380d499d887 Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Mon, 31 Mar 2025 13:09:15 -0700 Subject: [PATCH 7/8] reducing the diffs Signed-off-by: Vinay Krishna Pudyodu --- .../routing/allocation/decider/AwarenessAllocationDecider.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 1d049141e9f8b..ec83300aa7f16 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -227,7 +227,6 @@ private int getCurrentNodeCountForAttribute( final String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute); // Get all assigned shards of the same type List assignedShards = getAssignedShards(allocation, shardRouting); - // Count assigned shards with matching attribute values int currentNodeCount = 0; for (ShardRouting assignedShard : assignedShards) { if (assignedShard.started() || assignedShard.initializing()) { @@ -244,7 +243,6 @@ private int getCurrentNodeCountForAttribute( if (moveToNode) { if (shardRouting.assignedToNode()) { String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); - if (node.nodeId().equals(nodeId) == false) { // we work on different nodes, move counts around if (getAttributeValueForNode(allocation.routingNodes().node(nodeId), awarenessAttribute).equals(shardAttributeForNode) From 23600db05e60163c7d1802dbacd7b7fcf51f483f Mon Sep 17 00:00:00 2001 From: Vinay Krishna Pudyodu Date: Mon, 31 Mar 2025 13:13:40 -0700 Subject: [PATCH 8/8] removed unnecessary static keyword Signed-off-by: Vinay Krishna Pudyodu --- .../routing/allocation/decider/AwarenessAllocationDecider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index ec83300aa7f16..17b8aa1d3cbb5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -211,7 +211,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements"); } - private static Set getAttributeValues(ShardRouting shardRouting, RoutingAllocation allocation, String awarenessAttribute) { + private Set getAttributeValues(ShardRouting shardRouting, RoutingAllocation allocation, String awarenessAttribute) { return allocation.routingNodes() .nodesPerAttributesCounts(awarenessAttribute, routingNode -> routingNode.node().isSearchNode() == shardRouting.isSearchOnly()); }