Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.x]
### Added
- Add seperate shard limit validation for local and remote indices ([#19532](https://github.com/opensearch-project/OpenSearch/pull/19532))
- Use Lucene `pack` method for `half_float` and `usigned_long` when using `ApproximatePointRangeQuery`.

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@

package org.opensearch.cluster.routing.allocation.decider;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.node.Node;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.junit.After;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -28,28 +41,97 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING;

@ParameterizedStaticSettingsOpenSearchIntegTestCase.ClusterScope(scope = ParameterizedStaticSettingsOpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3)
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class ShardsLimitAllocationDeciderIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(100, ByteSizeUnit.KB).getBytes();

public ShardsLimitAllocationDeciderIT(Settings nodeSettings) {
super(nodeSettings);
}

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3)
public class ShardsLimitAllocationDeciderIT extends OpenSearchIntegTestCase {
@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() }
);
}

protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

@After
public void teardown() throws Exception {
String testName = getTestName();
if (testName.contains("WithoutRemoteStore") == false) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
Boolean isWarmEnabled = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmEnabled) {
return Settings.builder().put(super.indexSettings()).put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true).build();
} else {
return super.indexSettings();
}
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
String testName = getTestName();
Settings.Builder builder = Settings.builder();
if (testName.contains("WithoutRemoteStore") == false) {
builder.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath));
}
return builder.put(super.nodeSettings(nodeOrdinal)).build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG, true);
return featureSettings.build();
}

@Override
protected boolean addMockIndexStorePlugin() {
return WRITABLE_WARM_INDEX_SETTING.get(settings) == false;
}

public void testClusterWideShardsLimit() {
// Set the cluster-wide shard limit to 2
updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 4);
startTestNodes(3);
updateClusterSetting(getShardsPerNodeKey(false), 4);

// Create the first two indices with 3 shards and 1 replica each
createIndex("test1", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test1",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);
createIndex(
"test2",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

// Create the third index with 2 shards and 1 replica
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test3",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

// Wait for the shard limit to be applied
try {
Expand Down Expand Up @@ -83,11 +165,13 @@ public void testClusterWideShardsLimit() {
}

public void testIndexSpecificShardLimit() {
startTestNodes(3);
// Set the index-specific shard limit to 2 for the first index only
Settings indexSettingsWithLimit = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 4)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2)
.put(getIndexLevelShardsPerNodeKey(false), 2)
.build();

Settings indexSettingsWithoutLimit = Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build();
Expand All @@ -99,7 +183,10 @@ public void testIndexSpecificShardLimit() {
createIndex("test2", indexSettingsWithoutLimit);

// Create the third index with 3 shards and 1 replica, without the index-specific limit
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test3",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

try {
// Wait for the shard limit to be applied
Expand Down Expand Up @@ -146,22 +233,30 @@ public void testIndexSpecificShardLimit() {
}

public void testCombinedClusterAndIndexSpecificShardLimits() {
startTestNodes(3);
// Set the cluster-wide shard limit to 6
updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 6);
updateClusterSetting(getShardsPerNodeKey(false), 6);

// Create the first index with 3 shards, 1 replica, and index-specific limit of 1
Settings indexSettingsWithLimit = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 3)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1)
.put(getIndexLevelShardsPerNodeKey(false), 1)
.build();
createIndex("test1", indexSettingsWithLimit);

// Create the second index with 4 shards and 1 replica
createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test2",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

// Create the third index with 3 shards and 1 replica
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
createIndex(
"test3",
Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

try {
assertBusy(() -> {
Expand Down Expand Up @@ -218,9 +313,6 @@ public void testCombinedClusterAndIndexSpecificShardLimits() {
assertEquals("One node should have 5 shards", 5, shardCounts.get(2).intValue());

// Check that all nodes have only one shard of the first index
for (Set<String> indexesOnNode : indexShardsPerNode.values()) {
assertTrue("Each node should have a shard from test1", indexesOnNode.contains("test1"));
}
});
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -242,6 +334,7 @@ public void testCombinedClusterAndIndexSpecificShardLimits() {
public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
// Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 3)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1)
Expand Down Expand Up @@ -276,24 +369,30 @@ public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
* indicating that this setting is only applicable for remote store enabled clusters.
*/
public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
assumeTrue(
"Test should only run in the default (non-parameterized) test suite",
WRITABLE_WARM_INDEX_SETTING.get(settings) == false
);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
updateClusterSetting(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1);
updateClusterSetting(getShardsPerNodeKey(true), 1);
});

// Verify the exception message
assertTrue(
"Exception should mention that the setting requires remote store",
exception.getMessage()
.contains(
"Setting [cluster.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters"
"Setting [cluster.routing.allocation.total_primary_shards_per_node] "
+ "can only be used with remote store enabled clusters"
)
);

// Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 3)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1)
.put(getIndexLevelShardsPerNodeKey(false), 1)
.build();

createIndex("test_index", indexSettings);
Expand All @@ -302,4 +401,40 @@ public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
private void updateClusterSetting(String setting, int value) {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(setting, value)).get();
}

private Settings warmNodeSettings(ByteSizeValue cacheSize) {
return Settings.builder()
.put(super.nodeSettings(0))
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
.build();
}

/**
* Helper method to start nodes that support both data and warm roles
*/
private void startTestNodes(int nodeCount) {
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmIndex) {
Settings nodeSettings = Settings.builder().put(warmNodeSettings(new ByteSizeValue(TOTAL_SPACE_BYTES))).build();
internalCluster().startWarmOnlyNodes(nodeCount, nodeSettings);
}
}

private String getShardsPerNodeKey(boolean primary) {
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmIndex) {
return CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey();
} else {
return primary ? CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() : CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey();
}
}

private String getIndexLevelShardsPerNodeKey(boolean primary) {
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmIndex) {
return INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey();
} else {
return primary ? INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() : INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey();
}
}
}
Loading
Loading