Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.cluster.routing.allocation.decider;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -141,6 +142,127 @@ private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

@Override
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
if (awarenessAttributes.isEmpty()) {
return allocation.decision(
Decision.YES,
NAME,
"allocation awareness is not enabled, set cluster setting [%s] to enable it",
CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey()
);
}

if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).autoExpandToAll()) {
return allocation.decision(Decision.YES, NAME, "allocation awareness is ignored, this index is set to auto-expand to all");
}

for (String awarenessAttribute : awarenessAttributes) {
String nodeAttributeValue = node.getAttributes().get(awarenessAttribute);
if (nodeAttributeValue == null) {
return allocation.decision(
Decision.NO,
NAME,
"node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]",
awarenessAttribute,
CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(),
allocation.debugDecision() ? Strings.collectionToCommaDelimitedString(awarenessAttributes) : null
);
}

Map<String, Integer> nodeCountPerAttributeValue = new HashMap<>();
for (DiscoveryNode dataNode : allocation.nodes().getDataNodes().values()) {
if (dataNode.isSearchNode()) {
continue;
}
String attrValue = dataNode.getAttributes().get(awarenessAttribute);
if (attrValue != null) {
nodeCountPerAttributeValue.merge(attrValue, 1, Integer::sum);
}
}

Set<String> allAttributeValues = new HashSet<>(nodeCountPerAttributeValue.keySet());
List<String> forcedValues = forcedAwarenessAttributes.get(awarenessAttribute);
if (forcedValues != null) {
allAttributeValues.addAll(forcedValues);
}
int numberOfAttributeValues = allAttributeValues.size();

if (numberOfAttributeValues <= 1) {
continue;
}

int totalDataNodes = nodeCountPerAttributeValue.values().stream().mapToInt(Integer::intValue).sum();
int autoExpandMaxReplicas = INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).getMaxReplicas();
int upperBound = Math.min(totalDataNodes, autoExpandMaxReplicas + 1);
int maxCopies = computeMaxAchievableCopies(nodeCountPerAttributeValue, numberOfAttributeValues, upperBound);
int maxPerAttributeValue = (maxCopies + numberOfAttributeValues - 1) / numberOfAttributeValues;

int nodesInSameGroup = nodeCountPerAttributeValue.getOrDefault(nodeAttributeValue, 0);
if (nodesInSameGroup > maxPerAttributeValue) {
long position = 0;
for (DiscoveryNode dataNode : allocation.nodes().getDataNodes().values()) {
if (dataNode.isSearchNode()) {
continue;
}
String attrValue = dataNode.getAttributes().get(awarenessAttribute);
if (nodeAttributeValue.equals(attrValue) && dataNode.getId().compareTo(node.getId()) < 0) {
position++;
}
}
if (position >= maxPerAttributeValue) {
return allocation.decision(
Decision.NO,
NAME,
"too many nodes in awareness group [%s=%s] for auto-expand replicas; "
+ "[%d] nodes in group but only [%d] are needed to satisfy awareness with [%d] attribute values",
awarenessAttribute,
nodeAttributeValue,
nodesInSameGroup,
maxPerAttributeValue,
numberOfAttributeValues
);
}
}
}

return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements for auto-expand");
}

/**
* Computes the maximum number of shard copies that can be placed while respecting
* awareness constraints. With K awareness attribute values and varying numbers of
* nodes per value, finds the largest S such that sum(min(nodesPerValue_i, ceil(S/K))) &gt;= S.
*/
public static int computeMaxAchievableCopies(Map<String, Integer> nodeCountPerAttributeValue, int numberOfAttributeValues) {
int totalNodes = nodeCountPerAttributeValue.values().stream().mapToInt(Integer::intValue).sum();
return computeMaxAchievableCopies(nodeCountPerAttributeValue, numberOfAttributeValues, totalNodes);
}

/**
* Computes the maximum number of shard copies that can be placed while respecting
* awareness constraints, with an upper bound on the number of copies to consider.
* The upper bound accounts for auto_expand_replicas max setting so we don't compute
* a node count that would be reduced by the max setting to a value awareness can't handle.
*/
public static int computeMaxAchievableCopies(
Map<String, Integer> nodeCountPerAttributeValue,
int numberOfAttributeValues,
int upperBound
) {
for (int s = upperBound; s >= 1; s--) {
int maxPerValue = (s + numberOfAttributeValues - 1) / numberOfAttributeValues;
int capacity = 0;
for (int nodeCount : nodeCountPerAttributeValue.values()) {
capacity += Math.min(nodeCount, maxPerValue);
}
if (capacity >= s) {
return s;
}
}
return 1;
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,4 +1154,262 @@ public void testIgnoredByAutoExpandReplicasToAll() {

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}

public void testAutoExpandReplicasWithAwarenessEqualZones() {
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

final AllocationService strategy = createAllocationService(settings);

final IndexMetadata.Builder metadataBuilder = IndexMetadata.builder("test")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-20")
);

final Metadata metadata = Metadata.builder().put(metadataBuilder).build();

final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("A-2", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
.add(newNode("B-1", singletonMap("zone", "b")))
.add(newNode("B-2", singletonMap("zone", "b")))
.add(newNode("C-0", singletonMap("zone", "c")))
.add(newNode("C-1", singletonMap("zone", "c")))
.add(newNode("C-2", singletonMap("zone", "c")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test")).build())
.nodes(nodes)
.build(),
strategy
);

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(9));
}

public void testAutoExpandReplicasWithAwarenessUnequalZones() {
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

final AllocationService strategy = createAllocationService(settings);

final IndexMetadata.Builder metadataBuilder = IndexMetadata.builder("test")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-20")
);

final Metadata metadata = Metadata.builder().put(metadataBuilder).build();

// Zone a: 2 nodes, Zone b: 5 nodes — unequal distribution
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
.add(newNode("B-1", singletonMap("zone", "b")))
.add(newNode("B-2", singletonMap("zone", "b")))
.add(newNode("B-3", singletonMap("zone", "b")))
.add(newNode("B-4", singletonMap("zone", "b")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test")).build())
.nodes(nodes)
.build(),
strategy
);

// With 2 zones: zone a has 2 nodes, zone b has 5 nodes
// Max achievable copies = 5: ceil(5/2)=3 per zone, a:min(2,3)=2, b:min(5,3)=3, total=5
// So 5 copies = 4 replicas, but capped by auto_expand_replicas max, all should be assigned
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}

public void testAutoExpandReplicasWithForcedAwarenessAndEmptyZone() {
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b,c")
.build();

final AllocationService strategy = createAllocationService(settings);

final IndexMetadata.Builder metadataBuilder = IndexMetadata.builder("test")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-20")
);

final Metadata metadata = Metadata.builder().put(metadataBuilder).build();

// Only 2 zones have nodes, zone c is forced but empty
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("A-2", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
.add(newNode("B-1", singletonMap("zone", "b")))
.add(newNode("B-2", singletonMap("zone", "b")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test")).build())
.nodes(nodes)
.build(),
strategy
);

// 3 forced zones (a, b, c), zone c is empty
// Max achievable: S=4 → ceil(4/3)=2 → a:2, b:2, c:0 → 4 ✓
// S=5 → ceil(5/3)=2 → a:2, b:2, c:0 → 4 < 5 ✗
// So max 4 copies = 3 replicas, all assigned
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}

public void testAutoExpandReplicasWithAwarenessComputeMaxCopies() {
// Unit test for computeMaxAchievableCopies helper
Map<String, Integer> nodeCountPerAttr;

// Equal zones: 3 zones × 3 nodes = 9 max copies
nodeCountPerAttr = new HashMap<>();
nodeCountPerAttr.put("a", 3);
nodeCountPerAttr.put("b", 3);
nodeCountPerAttr.put("c", 3);
assertThat(AwarenessAllocationDecider.computeMaxAchievableCopies(nodeCountPerAttr, 3), equalTo(9));

// Unequal: zone a=2, zone b=5. Max copies=5 (ceil(5/2)=3, a:min(2,3)+b:min(5,3)=2+3=5)
nodeCountPerAttr = new HashMap<>();
nodeCountPerAttr.put("a", 2);
nodeCountPerAttr.put("b", 5);
assertThat(AwarenessAllocationDecider.computeMaxAchievableCopies(nodeCountPerAttr, 2), equalTo(5));

// Forced empty zone: 3 zones with values, 1 forced empty zone (4 total)
// zone a=5, b=6, c=6, null=0 → max S where sum(min(n,ceil(S/4)))>=S
// S=9: ceil(9/4)=3 → 3+3+3+0=9 ✓
nodeCountPerAttr = new HashMap<>();
nodeCountPerAttr.put("a", 5);
nodeCountPerAttr.put("b", 6);
nodeCountPerAttr.put("c", 6);
assertThat(AwarenessAllocationDecider.computeMaxAchievableCopies(nodeCountPerAttr, 4), equalTo(9));

// Single zone: 5 nodes, 1 zone → all can be used
nodeCountPerAttr = new HashMap<>();
nodeCountPerAttr.put("a", 5);
assertThat(AwarenessAllocationDecider.computeMaxAchievableCopies(nodeCountPerAttr, 1), equalTo(5));

// With upper bound (auto_expand max): 7 racks, 40 nodes, max=20 replicas (21 copies)
// Without bound: maxCopies=30. With bound=21: S=21 → cap=20 < 21; S=20 → cap=20 ✓
nodeCountPerAttr = new HashMap<>();
nodeCountPerAttr.put("r1", 10);
nodeCountPerAttr.put("r2", 8);
nodeCountPerAttr.put("r3", 7);
nodeCountPerAttr.put("r4", 5);
nodeCountPerAttr.put("r5", 4);
nodeCountPerAttr.put("r6", 4);
nodeCountPerAttr.put("r7", 2);
assertThat(AwarenessAllocationDecider.computeMaxAchievableCopies(nodeCountPerAttr, 7), equalTo(30));
assertThat(AwarenessAllocationDecider.computeMaxAchievableCopies(nodeCountPerAttr, 7, 21), equalTo(20));
}

public void testAutoExpandReplicasWithManyRackIds() {
// Simulates issue #2984: 9 pods across 7 k8s nodes used as rack awareness
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "rack_id")
.build();

final AllocationService strategy = createAllocationService(settings);

final IndexMetadata.Builder metadataBuilder = IndexMetadata.builder("test")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "1-20")
);

final Metadata metadata = Metadata.builder().put(metadataBuilder).build();

// 9 pods across 7 k8s nodes (2 nodes have 2 pods each)
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(newNode("pod-0", singletonMap("rack_id", "node1")))
.add(newNode("pod-1", singletonMap("rack_id", "node1")))
.add(newNode("pod-2", singletonMap("rack_id", "node2")))
.add(newNode("pod-3", singletonMap("rack_id", "node2")))
.add(newNode("pod-4", singletonMap("rack_id", "node3")))
.add(newNode("pod-5", singletonMap("rack_id", "node4")))
.add(newNode("pod-6", singletonMap("rack_id", "node5")))
.add(newNode("pod-7", singletonMap("rack_id", "node6")))
.add(newNode("pod-8", singletonMap("rack_id", "node7")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test")).build())
.nodes(nodes)
.build(),
strategy
);

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}

public void testAutoExpandReplicasWithUnevenRacksAndExplicitMax() {
// Edge case: many nodes, uneven racks, auto_expand max constrains copies
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "rack_id")
.build();

final AllocationService strategy = createAllocationService(settings);

final IndexMetadata.Builder metadataBuilder = IndexMetadata.builder("test")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-5")
);

final Metadata metadata = Metadata.builder().put(metadataBuilder).build();

// 3 racks: r1 has 5 nodes, r2 has 1, r3 has 1 = 7 total
// Without fix: replicas=min(6,5)=5, copies=6, ceil(6/3)=2 per rack
// r2 can hold 1, r3 can hold 1, r1 can hold 2 → capacity=4 < 6 → YELLOW
// With fix: upper=min(7,6)=6, maxAchievable from 6:
// S=6: ceil(6/3)=2, cap=2+1+1=4 < 6; S=4: cap=2+1+1=4 ✓
// replicas=min(3,5)=3, copies=4 → all assigned
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(newNode("R1-0", singletonMap("rack_id", "r1")))
.add(newNode("R1-1", singletonMap("rack_id", "r1")))
.add(newNode("R1-2", singletonMap("rack_id", "r1")))
.add(newNode("R1-3", singletonMap("rack_id", "r1")))
.add(newNode("R1-4", singletonMap("rack_id", "r1")))
.add(newNode("R2-0", singletonMap("rack_id", "r2")))
.add(newNode("R3-0", singletonMap("rack_id", "r3")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test")).build())
.nodes(nodes)
.build(),
strategy
);

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}
}
Loading