diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fcd41e600707..16345daf15c01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.x] ### Added +- Add seperate shard limit validation for local and remote indices ([#19532](https://github.com/opensearch-project/OpenSearch/pull/19532)) - Use Lucene `pack` method for `half_float` and `usigned_long` when using `ApproximatePointRangeQuery`. ### Changed diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderIT.java index fdc6a7e6b96b2..91106ac4a3a68 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderIT.java @@ -8,6 +8,9 @@ package org.opensearch.cluster.routing.allocation.decider; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; @@ -15,9 +18,19 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.settings.Settings; -import org.opensearch.test.OpenSearchIntegTestCase; - +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.node.Node; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import org.junit.After; + +import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -28,28 +41,98 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3) -public class ShardsLimitAllocationDeciderIT extends OpenSearchIntegTestCase { +@ParameterizedStaticSettingsOpenSearchIntegTestCase.ClusterScope(scope = ParameterizedStaticSettingsOpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3) +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class ShardsLimitAllocationDeciderIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(100, ByteSizeUnit.KB).getBytes(); + + public ShardsLimitAllocationDeciderIT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() } + ); + } + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected Path absolutePath; + + @After + public void teardown() throws Exception { + String testName = getTestName(); + if (testName.contains("WithoutRemoteStore") == false) { + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + } + } + + @Override + public Settings indexSettings() { + Boolean isWarmEnabled = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmEnabled) { + return Settings.builder().put(super.indexSettings()).put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true).build(); + } else { + return super.indexSettings(); + } + } @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); + } + String testName = getTestName(); + Settings.Builder builder = Settings.builder(); + if (testName.contains("WithoutRemoteStore") == false) { + builder.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)); + } + return builder.put(super.nodeSettings(nodeOrdinal)).build(); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG, true); + return featureSettings.build(); + } + + @Override + protected boolean addMockIndexStorePlugin() { + return WRITABLE_WARM_INDEX_SETTING.get(settings) == false; } public void testClusterWideShardsLimit() { // Set the cluster-wide shard limit to 2 - updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 4); + startTestNodes(3); + updateClusterSetting(getShardsPerNodeKey(false), 4); // Create the first two indices with 3 shards and 1 replica each - createIndex("test1", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); - createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + createIndex( + "test1", + Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + createIndex( + "test2", + Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build() + ); // Create the third index with 2 shards and 1 replica - createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + createIndex( + "test3", + Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 1).build() + ); // Wait for the shard limit to be applied try { @@ -83,11 +166,13 @@ public void testClusterWideShardsLimit() { } public void testIndexSpecificShardLimit() { + startTestNodes(3); // Set the index-specific shard limit to 2 for the first index only Settings indexSettingsWithLimit = Settings.builder() + .put(indexSettings()) .put(SETTING_NUMBER_OF_SHARDS, 4) .put(SETTING_NUMBER_OF_REPLICAS, 1) - .put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2) + .put(getIndexLevelShardsPerNodeKey(false), 2) .build(); Settings indexSettingsWithoutLimit = Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build(); @@ -99,7 +184,10 @@ public void testIndexSpecificShardLimit() { createIndex("test2", indexSettingsWithoutLimit); // Create the third index with 3 shards and 1 replica, without the index-specific limit - createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + createIndex( + "test3", + Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build() + ); try { // Wait for the shard limit to be applied @@ -146,22 +234,30 @@ public void testIndexSpecificShardLimit() { } public void testCombinedClusterAndIndexSpecificShardLimits() { + startTestNodes(3); // Set the cluster-wide shard limit to 6 - updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 6); + updateClusterSetting(getShardsPerNodeKey(false), 6); // Create the first index with 3 shards, 1 replica, and index-specific limit of 1 Settings indexSettingsWithLimit = Settings.builder() + .put(indexSettings()) .put(SETTING_NUMBER_OF_SHARDS, 3) .put(SETTING_NUMBER_OF_REPLICAS, 1) - .put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1) + .put(getIndexLevelShardsPerNodeKey(false), 1) .build(); createIndex("test1", indexSettingsWithLimit); // Create the second index with 4 shards and 1 replica - createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + createIndex( + "test2", + Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build() + ); // Create the third index with 3 shards and 1 replica - createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + createIndex( + "test3", + Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build() + ); try { assertBusy(() -> { @@ -218,9 +314,6 @@ public void testCombinedClusterAndIndexSpecificShardLimits() { assertEquals("One node should have 5 shards", 5, shardCounts.get(2).intValue()); // Check that all nodes have only one shard of the first index - for (Set indexesOnNode : indexShardsPerNode.values()) { - assertTrue("Each node should have a shard from test1", indexesOnNode.contains("test1")); - } }); } catch (Exception e) { throw new RuntimeException(e); @@ -229,11 +322,11 @@ public void testCombinedClusterAndIndexSpecificShardLimits() { /** * Integration test to verify the behavior of INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING - * in a non-remote store environment. + * or INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING in a non-remote store environment. * * Scenario: * An end-user attempts to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING - * on a cluster where remote store is not enabled. + * or INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING on a cluster where remote store is not enabled. * * Expected Outcome: * The system should reject the index creation request and throw an appropriate exception, @@ -242,9 +335,10 @@ public void testCombinedClusterAndIndexSpecificShardLimits() { public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() { // Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING Settings indexSettings = Settings.builder() + .put(indexSettings()) .put(SETTING_NUMBER_OF_SHARDS, 3) .put(SETTING_NUMBER_OF_REPLICAS, 1) - .put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1) + .put(getIndexLevelShardsPerNodeKey(true), 1) .build(); // Assert that creating the index throws an exception @@ -258,7 +352,9 @@ public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() { "Exception should mention that the setting requires remote store", exception.getMessage() .contains( - "Setting [index.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters" + "Setting [index.routing.allocation.total_primary_shards_per_node] or " + + "[index.routing.allocation.total_remote_capable_primary_shards_per_node] " + + "can only be used with remote store enabled clusters" ) ); } @@ -276,8 +372,12 @@ public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() { * indicating that this setting is only applicable for remote store enabled clusters. */ public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() { + assumeTrue( + "Test should only run in the default (non-parameterized) test suite", + WRITABLE_WARM_INDEX_SETTING.get(settings) == false + ); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { - updateClusterSetting(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1); + updateClusterSetting(getShardsPerNodeKey(true), 1); }); // Verify the exception message @@ -285,15 +385,17 @@ public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() { "Exception should mention that the setting requires remote store", exception.getMessage() .contains( - "Setting [cluster.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters" + "Setting [cluster.routing.allocation.total_primary_shards_per_node] " + + "can only be used with remote store enabled clusters" ) ); - // Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING + // Attempt to create an index with INDEX_TOTAL_SHARDS_PER_NODE_SETTING Settings indexSettings = Settings.builder() + .put(indexSettings()) .put(SETTING_NUMBER_OF_SHARDS, 3) .put(SETTING_NUMBER_OF_REPLICAS, 1) - .put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1) + .put(getIndexLevelShardsPerNodeKey(false), 1) .build(); createIndex("test_index", indexSettings); @@ -302,4 +404,42 @@ public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() { private void updateClusterSetting(String setting, int value) { client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(setting, value)).get(); } + + private Settings warmNodeSettings(ByteSizeValue cacheSize) { + return Settings.builder() + .put(super.nodeSettings(0)) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .build(); + } + + /** + * Helper method to start nodes that support both data and warm roles + */ + private void startTestNodes(int nodeCount) { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + Settings nodeSettings = Settings.builder().put(warmNodeSettings(new ByteSizeValue(TOTAL_SPACE_BYTES))).build(); + internalCluster().startWarmOnlyNodes(nodeCount, nodeSettings); + } + } + + private String getShardsPerNodeKey(boolean primary) { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + return CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey(); + } else { + return primary ? CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() : CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(); + } + } + + private String getIndexLevelShardsPerNodeKey(boolean primary) { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + return primary + ? INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + : INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey(); + } else { + return primary ? INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() : INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(); + } + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java index e1f688dae32b7..7859182a2a48c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java @@ -32,6 +32,9 @@ package org.opensearch.cluster.shards; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.Version; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; @@ -41,12 +44,18 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.common.Priority; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.indices.ShardLimitValidator; +import org.opensearch.node.Node; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotState; import org.opensearch.snapshots.mockstore.MockRepository; @@ -54,20 +63,24 @@ import org.opensearch.test.InternalTestCluster; import org.opensearch.test.MockHttpTransport; import org.opensearch.test.NodeConfigurationSource; -import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.client.Client; import org.opensearch.transport.nio.MockNioTransportPlugin; +import org.junit.After; import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.function.Function; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.common.util.FeatureFlags.WRITABLE_WARM_INDEX_SETTING; +import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE; import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; import static org.opensearch.indices.ShardLimitValidator.SETTING_MAX_SHARDS_PER_CLUSTER_KEY; import static org.opensearch.test.NodeRoles.dataNode; @@ -75,14 +88,69 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) -public class ClusterShardLimitIT extends OpenSearchIntegTestCase { - private static final String shardsPerNodeKey = SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(); +@ParameterizedStaticSettingsOpenSearchIntegTestCase.ClusterScope(scope = ParameterizedStaticSettingsOpenSearchIntegTestCase.Scope.TEST) +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class ClusterShardLimitIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public ClusterShardLimitIT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(WRITABLE_WARM_INDEX_SETTING.getKey(), true).build() } + ); + } + private static final String ignoreDotIndexKey = ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES.getKey(); + private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(100, ByteSizeUnit.KB).getBytes(); + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected Path absolutePath; + + @After + public void teardown() throws Exception { + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + } + + @Override + public Settings indexSettings() { + Boolean isWarmEnabled = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmEnabled) { + return Settings.builder().put(super.indexSettings()).put(IndexModule.IS_WARM_INDEX_SETTING.getKey(), true).build(); + } else { + return super.indexSettings(); + } + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); + } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG, true); + return featureSettings.build(); + } + + @Override + protected boolean addMockIndexStorePlugin() { + return WRITABLE_WARM_INDEX_SETTING.get(settings) == false; + } public void testSettingClusterMaxShards() { int shardsPerNode = between(1, 500_000); - setMaxShardLimit(shardsPerNode, shardsPerNodeKey); + setMaxShardLimit(shardsPerNode, getShardsPerNodeKey()); } public void testSettingIgnoreDotIndexes() { @@ -91,36 +159,37 @@ public void testSettingIgnoreDotIndexes() { } public void testMinimumPerNode() { + startTestNodes(3); int negativeShardsPerNode = between(-50_000, 0); try { if (frequently()) { client().admin() .cluster() .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(shardsPerNodeKey, negativeShardsPerNode).build()) + .setPersistentSettings(Settings.builder().put(getShardsPerNodeKey(), negativeShardsPerNode).build()) .get(); } else { client().admin() .cluster() .prepareUpdateSettings() - .setTransientSettings(Settings.builder().put(shardsPerNodeKey, negativeShardsPerNode).build()) + .setTransientSettings(Settings.builder().put(getShardsPerNodeKey(), negativeShardsPerNode).build()) .get(); } fail("should not be able to set negative shards per node"); } catch (IllegalArgumentException ex) { assertEquals( - "Failed to parse value [" + negativeShardsPerNode + "] for setting [cluster.max_shards_per_node] must be >= 1", + "Failed to parse value [" + negativeShardsPerNode + "] for setting [" + getShardsPerNodeKey() + "] must be >= 1", ex.getMessage() ); } } public void testIndexCreationOverLimit() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + int dataNodes = startTestNodes(3); ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes); - setMaxShardLimit(counts.getShardsPerNode(), shardsPerNodeKey); + setMaxShardLimit(counts.getShardsPerNode(), getShardsPerNodeKey()); // Create an index that will bring us up to the limit createIndex( "test", @@ -154,10 +223,10 @@ public void testIndexCreationOverLimit() { * indexes starting with dot would succeed. */ public void testIndexCreationOverLimitForDotIndexesSucceeds() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + int dataNodes = startTestNodes(3); // Setting the cluster.max_shards_per_node setting according to the data node count. - setMaxShardLimit(dataNodes, shardsPerNodeKey); + setMaxShardLimit(dataNodes, getShardsPerNodeKey()); setIgnoreDotIndex(true); /* @@ -178,7 +247,7 @@ public void testIndexCreationOverLimitForDotIndexesSucceeds() { // Getting cluster.max_shards_per_node setting ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - String maxShardsPerNode = clusterState.getMetadata().settings().get(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()); + String maxShardsPerNode = clusterState.getMetadata().settings().get(getShardsPerNodeKey()); // Checking if the total shards created are equivalent to dataNodes * cluster.max_shards_per_node assertEquals(dataNodes * Integer.parseInt(maxShardsPerNode), currentActiveShards); @@ -199,11 +268,11 @@ public void testIndexCreationOverLimitForDotIndexesSucceeds() { * indexes starting with dot would fail as well. */ public void testIndexCreationOverLimitForDotIndexesFail() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + int dataNodes = startTestNodes(3); int maxAllowedShards = dataNodes * dataNodes; // Setting the cluster.max_shards_per_node setting according to the data node count. - setMaxShardLimit(dataNodes, shardsPerNodeKey); + setMaxShardLimit(dataNodes, getShardsPerNodeKey()); /* Create an index that will bring us up to the limit. It would create index with primary equal to the @@ -223,7 +292,7 @@ public void testIndexCreationOverLimitForDotIndexesFail() { // Getting cluster.max_shards_per_node setting ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - String maxShardsPerNode = clusterState.getMetadata().settings().get(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()); + String maxShardsPerNode = clusterState.getMetadata().settings().get(getShardsPerNodeKey()); // Checking if the total shards created are equivalent to dataNodes * cluster.max_shards_per_node assertEquals(dataNodes * Integer.parseInt(maxShardsPerNode), currentActiveShards); @@ -246,8 +315,8 @@ public void testIndexCreationOverLimitForDotIndexesFail() { } public void testCreateIndexWithMaxClusterShardSetting() { - int maxAllowedShardsPerNode = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); - setMaxShardLimit(maxAllowedShardsPerNode, shardsPerNodeKey); + int maxAllowedShardsPerNode = startTestNodes(3); + setMaxShardLimit(maxAllowedShardsPerNode, getShardsPerNodeKey()); // Always keep int maxAllowedShardsPerCluster = maxAllowedShardsPerNode * 1000; @@ -273,11 +342,11 @@ public void testCreateIndexWithMaxClusterShardSetting() { * indexes starting with dot would only succeed and dataStream indexes would still have validation applied. */ public void testIndexCreationOverLimitForDataStreamIndexes() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + int dataNodes = startTestNodes(3); int maxAllowedShards = dataNodes * dataNodes; // Setting the cluster.max_shards_per_node setting according to the data node count. - setMaxShardLimit(dataNodes, shardsPerNodeKey); + setMaxShardLimit(dataNodes, getShardsPerNodeKey()); setIgnoreDotIndex(true); /* @@ -298,7 +367,7 @@ public void testIndexCreationOverLimitForDataStreamIndexes() { // Getting cluster.max_shards_per_node setting ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - String maxShardsPerNode = clusterState.getMetadata().settings().get(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()); + String maxShardsPerNode = clusterState.getMetadata().settings().get(getShardsPerNodeKey()); // Checking if the total shards created are equivalent to dataNodes * cluster.max_shards_per_node assertEquals(dataNodes * Integer.parseInt(maxShardsPerNode), currentActiveShards); @@ -321,11 +390,11 @@ public void testIndexCreationOverLimitForDataStreamIndexes() { } public void testIndexCreationOverLimitFromTemplate() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + int dataNodes = startTestNodes(3); final ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes); - setMaxShardLimit(counts.getShardsPerNode(), shardsPerNodeKey); + setMaxShardLimit(counts.getShardsPerNode(), getShardsPerNodeKey()); if (counts.getFirstIndexShards() > 0) { createIndex( @@ -346,6 +415,7 @@ public void testIndexCreationOverLimitFromTemplate() { .setOrder(1) .setSettings( Settings.builder() + .put(indexSettings()) .put(SETTING_NUMBER_OF_SHARDS, counts.getFailingIndexShards()) .put(SETTING_NUMBER_OF_REPLICAS, counts.getFailingIndexReplicas()) ) @@ -362,13 +432,13 @@ public void testIndexCreationOverLimitFromTemplate() { } public void testIncreaseReplicasOverLimit() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + int dataNodes = startTestNodes(3); dataNodes = ensureMultipleDataNodes(dataNodes); int firstShardCount = between(2, 10); int shardsPerNode = firstShardCount - 1; - setMaxShardLimit(shardsPerNode, shardsPerNodeKey); + setMaxShardLimit(shardsPerNode, getShardsPerNodeKey()); prepareCreate( "growing-should-fail", @@ -385,11 +455,15 @@ public void testIncreaseReplicasOverLimit() { } catch (IllegalArgumentException e) { String expectedError = "Validation Failed: 1: this action would add [" + (dataNodes * firstShardCount) - + "] total shards, but this cluster currently has [" + + "] total " + + getShardType() + + " shards, but this cluster currently has [" + firstShardCount + "]/[" + dataNodes * shardsPerNode - + "] maximum shards open;"; + + "] maximum " + + getShardType() + + " shards open;"; assertEquals(expectedError, e.getMessage()); } Metadata clusterState = client().admin().cluster().prepareState().get().getState().metadata(); @@ -397,7 +471,7 @@ public void testIncreaseReplicasOverLimit() { } public void testChangingMultipleIndicesOverLimit() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + int dataNodes = startTestNodes(3); dataNodes = ensureMultipleDataNodes(dataNodes); @@ -414,7 +488,7 @@ public void testChangingMultipleIndicesOverLimit() { int secondIndexReplicas = dataNodes; int shardsPerNode = firstIndexFactor + (secondIndexFactor * (1 + secondIndexReplicas)); - setMaxShardLimit(shardsPerNode, shardsPerNodeKey); + setMaxShardLimit(shardsPerNode, getShardsPerNodeKey()); createIndex( "test-1-index", @@ -446,11 +520,15 @@ public void testChangingMultipleIndicesOverLimit() { String expectedError = "Validation Failed: 1: this action would add [" + difference - + "] total shards, but this cluster currently has [" + + "] total " + + getShardType() + + " shards, but this cluster currently has [" + totalShardsBefore + "]/[" + dataNodes * shardsPerNode - + "] maximum shards open;"; + + "] maximum " + + getShardType() + + " shards open;"; assertEquals(expectedError, e.getMessage()); } Metadata clusterState = client().admin().cluster().prepareState().get().getState().metadata(); @@ -459,13 +537,13 @@ public void testChangingMultipleIndicesOverLimit() { } public void testPreserveExistingSkipsCheck() { - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + int dataNodes = startTestNodes(3); dataNodes = ensureMultipleDataNodes(dataNodes); int firstShardCount = between(2, 10); int shardsPerNode = firstShardCount - 1; - setMaxShardLimit(shardsPerNode, shardsPerNodeKey); + setMaxShardLimit(shardsPerNode, getShardsPerNodeKey()); prepareCreate( "test-index", @@ -487,6 +565,7 @@ public void testPreserveExistingSkipsCheck() { } public void testRestoreSnapshotOverLimit() { + int dataNodes = startTestNodes(3); Client client = client(); logger.info("--> creating repository"); @@ -496,7 +575,6 @@ public void testRestoreSnapshotOverLimit() { repoSettings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); createRepository("test-repo", "fs", repoSettings); - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes); createIndex( "snapshot-index", @@ -537,7 +615,7 @@ public void testRestoreSnapshotOverLimit() { cluster().wipeIndices("snapshot-index"); // Reduce the shard limit and fill it up - setMaxShardLimit(counts.getShardsPerNode(), shardsPerNodeKey); + setMaxShardLimit(counts.getShardsPerNode(), getShardsPerNodeKey()); createIndex( "test-fill", Settings.builder() @@ -566,8 +644,8 @@ public void testRestoreSnapshotOverLimit() { } public void testOpenIndexOverLimit() { + int dataNodes = startTestNodes(3); Client client = client(); - int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes); createIndex( @@ -586,7 +664,7 @@ public void testOpenIndexOverLimit() { assertTrue(closeIndexResponse.isAcknowledged()); // Fill up the cluster - setMaxShardLimit(counts.getShardsPerNode(), shardsPerNodeKey); + setMaxShardLimit(counts.getShardsPerNode(), getShardsPerNodeKey()); createIndex( "test-fill", Settings.builder() @@ -609,9 +687,10 @@ public void testOpenIndexOverLimit() { public void testIgnoreDotSettingOnMultipleNodes() throws IOException, InterruptedException { int maxAllowedShardsPerNode = 10, indexPrimaryShards = 11, indexReplicaShards = 1; + Path tempDir = absolutePath.getParent().getParent(); InternalTestCluster cluster = new InternalTestCluster( randomLong(), - createTempDir(), + tempDir, true, true, 0, @@ -659,7 +738,7 @@ public Path nodeConfigPath(int nodeOrdinal) { .admin() .cluster() .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(shardsPerNodeKey, maxAllowedShardsPerNode)) + .setPersistentSettings(Settings.builder().put(getShardsPerNodeKey(), maxAllowedShardsPerNode)) .get(); // Creating an index starting with dot having shards greater thn the desired node limit @@ -779,23 +858,67 @@ private void verifyException(int dataNodes, ShardCounts counts, IllegalArgumentE int maxShards = counts.getShardsPerNode() * dataNodes; String expectedError = "Validation Failed: 1: this action would add [" + totalShards - + "] total shards, but this cluster currently has [" + + "] total " + + getShardType() + + " shards, but this cluster currently has [" + currentShards + "]/[" + maxShards - + "] maximum shards open;"; + + "] maximum " + + getShardType() + + " shards open;"; assertEquals(expectedError, e.getMessage()); } private void verifyException(int maxShards, int currentShards, int extraShards, IllegalArgumentException e) { String expectedError = "Validation Failed: 1: this action would add [" + extraShards - + "] total shards, but this cluster currently has [" + + "] total " + + getShardType() + + " shards, but this cluster currently has [" + currentShards + "]/[" + maxShards - + "] maximum shards open;"; + + "] maximum " + + getShardType() + + " shards open;"; assertEquals(expectedError, e.getMessage()); } + private RoutingPool getShardType() { + Boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + return isWarmIndex ? RoutingPool.REMOTE_CAPABLE : RoutingPool.LOCAL_ONLY; + } + + private Settings warmNodeSettings(ByteSizeValue cacheSize) { + return Settings.builder() + .put(super.nodeSettings(0)) + .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString()) + .build(); + } + + /** + * Helper method to start nodes that support both data and warm roles + */ + private int startTestNodes(int nodeCount) { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + Settings nodeSettings = Settings.builder().put(warmNodeSettings(new ByteSizeValue(TOTAL_SPACE_BYTES))).build(); + internalCluster().startWarmOnlyNodes(nodeCount, nodeSettings); + return client().admin().cluster().prepareState().get().getState().getNodes().getWarmNodes().size(); + } else { + internalCluster().startDataOnlyNodes(nodeCount); + return client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size(); + } + } + + private String getShardsPerNodeKey() { + boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings); + if (isWarmIndex) { + return SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE.getKey(); + } else { + return SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(); + } + } + } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 4d53a547db714..f5e0a4a8cb211 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -1046,6 +1046,8 @@ public Iterator> settings() { private final int indexTotalShardsPerNodeLimit; private final int indexTotalPrimaryShardsPerNodeLimit; + private final int indexTotalRemoteCapableShardsPerNodeLimit; + private final int indexTotalRemoteCapablePrimaryShardsPerNodeLimit; private final boolean isAppendOnlyIndex; private final Context context; @@ -1080,6 +1082,8 @@ private IndexMetadata( final boolean isSystem, final int indexTotalShardsPerNodeLimit, final int indexTotalPrimaryShardsPerNodeLimit, + final int indexTotalRemoteCapableShardsPerNodeLimit, + final int indexTotalRemoteCapablePrimaryShardsPerNodeLimit, boolean isAppendOnlyIndex, final Context context, final IngestionStatus ingestionStatus @@ -1120,6 +1124,8 @@ private IndexMetadata( this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit; this.indexTotalPrimaryShardsPerNodeLimit = indexTotalPrimaryShardsPerNodeLimit; + this.indexTotalRemoteCapableShardsPerNodeLimit = indexTotalRemoteCapableShardsPerNodeLimit; + this.indexTotalRemoteCapablePrimaryShardsPerNodeLimit = indexTotalRemoteCapablePrimaryShardsPerNodeLimit; this.isAppendOnlyIndex = isAppendOnlyIndex; this.context = context; this.ingestionStatus = ingestionStatus; @@ -1334,10 +1340,18 @@ public int getIndexTotalShardsPerNodeLimit() { return this.indexTotalShardsPerNodeLimit; } + public int getIndexTotalRemoteCapableShardsPerNodeLimit() { + return this.indexTotalRemoteCapableShardsPerNodeLimit; + } + public int getIndexTotalPrimaryShardsPerNodeLimit() { return this.indexTotalPrimaryShardsPerNodeLimit; } + public int getIndexTotalRemoteCapablePrimaryShardsPerNodeLimit() { + return this.indexTotalRemoteCapablePrimaryShardsPerNodeLimit; + } + public boolean isAppendOnlyIndex() { return this.isAppendOnlyIndex; } @@ -2175,6 +2189,10 @@ public IndexMetadata build() { final int indexTotalPrimaryShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get( settings ); + final int indexTotalRemoteCapableShardsPerNodeLimit = + ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.get(settings); + final int indexTotalRemoteCapablePrimaryShardsPerNodeLimit = + ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.get(settings); final boolean isAppendOnlyIndex = INDEX_APPEND_ONLY_ENABLED_SETTING.get(settings); final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); @@ -2212,6 +2230,8 @@ public IndexMetadata build() { isSystem, indexTotalShardsPerNodeLimit, indexTotalPrimaryShardsPerNodeLimit, + indexTotalRemoteCapableShardsPerNodeLimit, + indexTotalRemoteCapablePrimaryShardsPerNodeLimit, isAppendOnlyIndex, context, ingestionStatus diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 687b7fa1edef2..03b343e94b5ab 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.PublicApi; @@ -270,7 +271,8 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio private final Map customs; private final transient int totalNumberOfShards; // Transient ? not serializable anyway? - private final int totalOpenIndexShards; + private final int totalOpenLocalOnlyIndexShards; + private final int totalOpenRemoteCapableIndexShards; private final String[] allIndices; private final String[] visibleIndices; @@ -315,15 +317,21 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio this.customs = Collections.unmodifiableMap(customs); this.templates = new TemplatesMetadata(templates); int totalNumberOfShards = 0; - int totalOpenIndexShards = 0; + int totalOpenLocalOnlyIndexShards = 0; + int totalOpenRemoteCapableIndexShards = 0; for (IndexMetadata cursor : indices.values()) { totalNumberOfShards += cursor.getTotalNumberOfShards(); if (IndexMetadata.State.OPEN.equals(cursor.getState())) { - totalOpenIndexShards += cursor.getTotalNumberOfShards(); + if (RoutingPool.getIndexPool(cursor) == RoutingPool.REMOTE_CAPABLE) { + totalOpenRemoteCapableIndexShards += cursor.getTotalNumberOfShards(); + } else { + totalOpenLocalOnlyIndexShards += cursor.getTotalNumberOfShards(); + } } } this.totalNumberOfShards = totalNumberOfShards; - this.totalOpenIndexShards = totalOpenIndexShards; + this.totalOpenLocalOnlyIndexShards = totalOpenLocalOnlyIndexShards; + this.totalOpenRemoteCapableIndexShards = totalOpenRemoteCapableIndexShards; this.allIndices = allIndices; this.visibleIndices = visibleIndices; @@ -905,7 +913,16 @@ public int getTotalNumberOfShards() { * @return The total number of open shards from all indices. */ public int getTotalOpenIndexShards() { - return this.totalOpenIndexShards; + return this.totalOpenLocalOnlyIndexShards; + } + + /** + * Gets the total number of open remote capable shards from all indices. Includes + * replicas, but does not include shards that are part of closed indices. + * @return The total number of open shards from all indices. + */ + public int getTotalOpenRemoteCapableIndexShards() { + return this.totalOpenRemoteCapableIndexShards; } /** diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index a889091140d12..2b2266db2cecd 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -155,6 +155,7 @@ import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING; import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findContextTemplateName; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING; import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_INDEX; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING; @@ -1867,9 +1868,10 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu public static void validateIndexTotalPrimaryShardsPerNodeSetting(Settings indexSettings) { // Get the setting value int indexPrimaryShardsPerNode = INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(indexSettings); + int indexRemoteCapablePrimaryShardsPerNode = INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.get(indexSettings); // If default value (-1), no validation needed - if (indexPrimaryShardsPerNode == -1) { + if (indexPrimaryShardsPerNode == -1 && indexRemoteCapablePrimaryShardsPerNode == -1) { return; } @@ -1877,7 +1879,11 @@ public static void validateIndexTotalPrimaryShardsPerNodeSetting(Settings indexS boolean isRemoteStoreEnabled = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings); if (!isRemoteStoreEnabled) { throw new IllegalArgumentException( - "Setting [" + INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + "] can only be used with remote store enabled clusters" + "Setting [" + + INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + + "] or [" + + INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + + "] can only be used with remote store enabled clusters" ); } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 9f594e9ad2ff8..fb512a0f7ea13 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -83,6 +83,7 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogFlushIntervalSettingsForCompositeIndex; import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findComponentTemplate; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING; import static org.opensearch.cluster.service.ClusterManagerTask.UPDATE_SETTINGS; import static org.opensearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX; import static org.opensearch.index.IndexSettings.same; @@ -272,15 +273,11 @@ public ClusterState execute(ClusterState currentState) { } // Verify that this won't take us over the cluster shard limit. - int totalNewShards = Arrays.stream(request.indices()) - .mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas)) - .sum(); - Optional error = shardLimitValidator.checkShardLimit(totalNewShards, currentState); - if (error.isPresent()) { - ValidationException ex = new ValidationException(); - ex.addValidationError(error.get()); - throw ex; - } + shardLimitValidator.validateShardLimitForIndices( + request.indices(), + currentState, + index -> getTotalNewShards(index, currentState, updatedNumberOfReplicas) + ); /* * We do not update the in-sync allocation IDs as they will be removed upon the first index operation which makes @@ -315,15 +312,12 @@ public ClusterState execute(ClusterState currentState) { } // Verify that this won't take us over the cluster shard limit. - int totalNewShards = Arrays.stream(request.indices()) - .mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfSearchReplicas)) - .sum(); - Optional error = shardLimitValidator.checkShardLimit(totalNewShards, currentState); - if (error.isPresent()) { - ValidationException ex = new ValidationException(); - ex.addValidationError(error.get()); - throw ex; - } + shardLimitValidator.validateShardLimitForIndices( + request.indices(), + currentState, + index -> getTotalNewShards(index, currentState, updatedNumberOfSearchReplicas) + ); + routingTableBuilder.updateNumberOfSearchReplicas(updatedNumberOfSearchReplicas, actualIndices); metadataBuilder.updateNumberOfSearchReplicas(updatedNumberOfSearchReplicas, actualIndices); logger.info( @@ -571,9 +565,10 @@ private void validateSearchReplicaCountSettings(Settings requestSettings, Index[ public static void validateIndexTotalPrimaryShardsPerNodeSetting(Settings indexSettings, ClusterService clusterService) { // Get the setting value int indexPrimaryShardsPerNode = INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(indexSettings); + int indexRemoteCapablePrimaryShardsPerNode = INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.get(indexSettings); // If default value (-1), no validation needed - if (indexPrimaryShardsPerNode == -1) { + if (indexPrimaryShardsPerNode == -1 && indexRemoteCapablePrimaryShardsPerNode == -1) { return; } @@ -586,7 +581,11 @@ public static void validateIndexTotalPrimaryShardsPerNodeSetting(Settings indexS .allMatch(DiscoveryNode::isRemoteStoreNode); if (!isRemoteStoreEnabled) { throw new IllegalArgumentException( - "Setting [" + INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + "] can only be used with remote store enabled clusters" + "Setting [" + + INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + + "] or [" + + INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + + "] can only be used with remote store enabled clusters" ); } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java index b9169169703d4..196658d2211a0 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java @@ -74,6 +74,7 @@ public class DiscoveryNodes extends AbstractDiffable implements private final Map nodes; private final Map dataNodes; + private final Map warmNodes; private final Map clusterManagerNodes; private final Map ingestNodes; @@ -87,6 +88,7 @@ public class DiscoveryNodes extends AbstractDiffable implements private DiscoveryNodes( final Map nodes, final Map dataNodes, + final Map warmNodes, final Map clusterManagerNodes, final Map ingestNodes, String clusterManagerNodeId, @@ -98,6 +100,7 @@ private DiscoveryNodes( ) { this.nodes = Collections.unmodifiableMap(nodes); this.dataNodes = Collections.unmodifiableMap(dataNodes); + this.warmNodes = Collections.unmodifiableMap(warmNodes); this.clusterManagerNodes = Collections.unmodifiableMap(clusterManagerNodes); this.ingestNodes = Collections.unmodifiableMap(ingestNodes); this.clusterManagerNodeId = clusterManagerNodeId; @@ -151,6 +154,15 @@ public Map getDataNodes() { return this.dataNodes; } + /** + * Get a {@link Map} of the discovered warm nodes arranged by their ids + * + * @return {@link Map} of the discovered warm nodes arranged by their ids + */ + public Map getWarmNodes() { + return this.warmNodes; + } + /** * Get a {@link Map} of the discovered cluster-manager nodes arranged by their ids * @@ -802,6 +814,7 @@ private String validateAdd(DiscoveryNode node) { public DiscoveryNodes build() { final Map dataNodesBuilder = new HashMap<>(); + final Map warmNodesBuilder = new HashMap<>(); final Map clusterManagerNodesBuilder = new HashMap<>(); final Map ingestNodesBuilder = new HashMap<>(); Version minNodeVersion = null; @@ -812,6 +825,9 @@ public DiscoveryNodes build() { if (nodeEntry.getValue().isDataNode()) { dataNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue()); } + if (nodeEntry.getValue().isWarmNode()) { + warmNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue()); + } if (nodeEntry.getValue().isClusterManagerNode()) { clusterManagerNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue()); } @@ -835,6 +851,7 @@ public DiscoveryNodes build() { return new DiscoveryNodes( nodes, dataNodesBuilder, + warmNodesBuilder, clusterManagerNodesBuilder, ingestNodesBuilder, clusterManagerNodeId, diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java index ad77aed4e4fd5..f3459bccea3b0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java @@ -34,6 +34,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.RoutingAllocation; @@ -73,6 +74,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { private volatile int clusterShardLimit; private volatile int clusterPrimaryShardLimit; + private volatile int clusterRemoteCapableShardLimit; /** * Controls the maximum number of shards per index on a single OpenSearch @@ -98,6 +100,30 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { Property.IndexScope ); + /** + * Controls the maximum number of remote capable shards per index on a single OpenSearch + * node. Negative values are interpreted as unlimited. + */ + public static final Setting INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING = Setting.intSetting( + "index.routing.allocation.total_remote_capable_shards_per_node", + -1, + -1, + Property.Dynamic, + Property.IndexScope + ); + + /** + * Controls the maximum number of remote capable primary shards per index on a single OpenSearch + * node. Negative values are interpreted as unlimited. + */ + public static final Setting INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING = Setting.intSetting( + "index.routing.allocation.total_remote_capable_primary_shards_per_node", + -1, + -1, + Property.Dynamic, + Property.IndexScope + ); + /** * Controls the maximum number of shards per node on a cluster level. * Negative values are interpreted as unlimited. @@ -122,6 +148,18 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { Property.NodeScope ); + /** + * Controls the maximum number of remote capable shards per node on a cluster level. + * Negative values are interpreted as unlimited. + */ + public static final Setting CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING = Setting.intSetting( + "cluster.routing.allocation.total_remote_capable_shards_per_node", + -1, + -1, + Property.Dynamic, + Property.NodeScope + ); + private final Settings settings; public ShardsLimitAllocationDecider(Settings settings, ClusterSettings clusterSettings) { @@ -130,6 +168,10 @@ public ShardsLimitAllocationDecider(Settings settings, ClusterSettings clusterSe this.clusterPrimaryShardLimit = CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, this::setClusterShardLimit); clusterSettings.addSettingsUpdateConsumer(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING, this::setClusterPrimaryShardLimit); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING, + this::setClusterRemoteCapableShardLimit + ); } private void setClusterShardLimit(int clusterShardLimit) { @@ -140,6 +182,10 @@ private void setClusterPrimaryShardLimit(int clusterPrimaryShardLimit) { this.clusterPrimaryShardLimit = clusterPrimaryShardLimit; } + private void setClusterRemoteCapableShardLimit(int clusterRemoteCapableShardLimit) { + this.clusterRemoteCapableShardLimit = clusterRemoteCapableShardLimit; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return doDecide(shardRouting, node, allocation, (count, limit) -> count >= limit); @@ -156,13 +202,31 @@ private Decision doDecide( RoutingAllocation allocation, BiPredicate decider ) { + RoutingPool shardRoutingPool = RoutingPool.getShardPool(shardRouting, allocation); + RoutingPool nodeRoutingPool = RoutingPool.getNodePool(node); + // TargetPoolAllocationDecider will handle for this case, hence short-circuiting from here + if (shardRoutingPool != nodeRoutingPool) { + return Decision.ALWAYS; + } + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); - final int indexShardLimit = indexMetadata.getIndexTotalShardsPerNodeLimit(); - final int indexPrimaryShardLimit = indexMetadata.getIndexTotalPrimaryShardsPerNodeLimit(); - // Capture the limit here in case it changes during this method's - // execution - final int clusterShardLimit = this.clusterShardLimit; - final int clusterPrimaryShardLimit = this.clusterPrimaryShardLimit; + final int indexShardLimit; + final int indexPrimaryShardLimit; + final int clusterShardLimit; + final int clusterPrimaryShardLimit; + // Capture the limit here in case it changes during this method's execution + if (nodeRoutingPool == RoutingPool.REMOTE_CAPABLE) { + indexShardLimit = indexMetadata.getIndexTotalRemoteCapableShardsPerNodeLimit(); + indexPrimaryShardLimit = indexMetadata.getIndexTotalRemoteCapablePrimaryShardsPerNodeLimit(); + clusterShardLimit = this.clusterRemoteCapableShardLimit; + clusterPrimaryShardLimit = -1; // No primary shard limit for remote capable nodes + } else { + indexShardLimit = indexMetadata.getIndexTotalShardsPerNodeLimit(); + indexPrimaryShardLimit = indexMetadata.getIndexTotalPrimaryShardsPerNodeLimit(); + clusterShardLimit = this.clusterShardLimit; + clusterPrimaryShardLimit = this.clusterPrimaryShardLimit; + } + if (indexShardLimit <= 0 && indexPrimaryShardLimit <= 0 && clusterShardLimit <= 0 && clusterPrimaryShardLimit <= 0) { return allocation.decision( Decision.YES, @@ -183,7 +247,9 @@ private Decision doDecide( NAME, "too many shards [%d] allocated to this node, cluster setting [%s=%d]", nodeShardCount, - CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), + nodeRoutingPool == RoutingPool.REMOTE_CAPABLE + ? CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey() + : CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit ); } @@ -209,7 +275,9 @@ private Decision doDecide( "too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]", indexShardCount, shardRouting.getIndexName(), - INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), + shardRoutingPool == RoutingPool.REMOTE_CAPABLE + ? INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey() + : INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit ); } @@ -223,7 +291,9 @@ private Decision doDecide( "too many primary shards [%d] allocated to this node for index [%s], index setting [%s=%d]", indexPrimaryShardCount, shardRouting.getIndexName(), - INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), + shardRoutingPool == RoutingPool.REMOTE_CAPABLE + ? INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + : INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), indexPrimaryShardLimit ); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 572d8ab2914c7..bd1c8213597ff 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -321,6 +321,8 @@ public void apply(Settings value, Settings current, Settings previous) { Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, + ShardLimitValidator.SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE, + ShardLimitValidator.SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_CLUSTER, ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, @@ -456,6 +458,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING, + ShardsLimitAllocationDecider.CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING, NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 7bbcdec25ce12..e34976ebeb60e 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -171,6 +171,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.MAX_REGEX_LENGTH_SETTING, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING, + ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING, + ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING, IndexSettings.INDEX_GC_DELETES_SETTING, IndexSettings.INDEX_SOFT_DELETES_SETTING, IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/ShardLimitValidator.java b/server/src/main/java/org/opensearch/indices/ShardLimitValidator.java index 94e91e2d4c3ac..8416e7f232e43 100644 --- a/server/src/main/java/org/opensearch/indices/ShardLimitValidator.java +++ b/server/src/main/java/org/opensearch/indices/ShardLimitValidator.java @@ -35,12 +35,16 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.ValidationException; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -48,6 +52,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; @@ -82,6 +88,24 @@ public class ShardLimitValidator { Setting.Property.NodeScope ); + public static final Setting SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE = Setting.intSetting( + "cluster.max_remote_capable_shards_per_node", + 1000, + 1, + new MaxRemoteCapableShardPerNodeLimitValidator(), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_CLUSTER = Setting.intSetting( + "cluster.routing.allocation.total_remote_capable_shards_limit", + -1, + -1, + new MaxRemoteCapableShardPerClusterLimitValidator(), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting SETTING_CLUSTER_IGNORE_DOT_INDEXES = Setting.boolSetting( "cluster.ignore_dot_indexes", false, @@ -91,16 +115,27 @@ public class ShardLimitValidator { protected final AtomicInteger shardLimitPerNode = new AtomicInteger(); protected final AtomicInteger shardLimitPerCluster = new AtomicInteger(); + protected final AtomicInteger remoteCapableShardLimitPerNode = new AtomicInteger(); + protected final AtomicInteger remoteCapableShardLimitPerCluster = new AtomicInteger(); private final SystemIndices systemIndices; private volatile boolean ignoreDotIndexes; public ShardLimitValidator(final Settings settings, ClusterService clusterService, SystemIndices systemIndices) { this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings)); this.shardLimitPerCluster.set(SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER.get(settings)); + this.remoteCapableShardLimitPerNode.set(SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE.get(settings)); + this.remoteCapableShardLimitPerCluster.set(SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_CLUSTER.get(settings)); this.ignoreDotIndexes = SETTING_CLUSTER_IGNORE_DOT_INDEXES.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode); clusterService.getClusterSettings() .addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, this::setShardLimitPerCluster); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE, this::setRemoteCapableShardShardLimitPerNode); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_CLUSTER, + this::setRemoteCapableShardShardLimitPerCluster + ); clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_IGNORE_DOT_INDEXES, this::setIgnoreDotIndexes); this.systemIndices = systemIndices; } @@ -113,6 +148,14 @@ private void setShardLimitPerCluster(int newValue) { this.shardLimitPerCluster.set(newValue); } + private void setRemoteCapableShardShardLimitPerNode(int newValue) { + this.remoteCapableShardLimitPerNode.set(newValue); + } + + private void setRemoteCapableShardShardLimitPerCluster(int newValue) { + this.remoteCapableShardLimitPerCluster.set(newValue); + } + /** * Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_NODE} setting. * @return the current value of the setting @@ -129,6 +172,22 @@ public int getShardLimitPerCluster() { return shardLimitPerCluster.get(); } + /** + * Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE} setting. + * @return the current value of the setting + */ + public int getRemoteCapableShardLimitPerNode() { + return remoteCapableShardLimitPerNode.get(); + } + + /** + * Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_CLUSTER} setting. + * @return the current value of the setting. + */ + public int getRemoteCapableShardLimitPerCluster() { + return remoteCapableShardLimitPerCluster.get(); + } + private void setIgnoreDotIndexes(boolean newValue) { this.ignoreDotIndexes = newValue; } @@ -158,7 +217,8 @@ public void validateShardLimit(final String indexName, final Settings settings, final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings); final int shardsToCreate = numberOfShards * (1 + numberOfReplicas); - final Optional shardLimit = checkShardLimit(shardsToCreate, state); + final RoutingPool shardRoutingPool = getShardRoutingPool(settings); + final Optional shardLimit = checkShardLimit(shardsToCreate, state, shardRoutingPool); if (shardLimit.isPresent()) { final ValidationException e = new ValidationException(); e.addValidationError(shardLimit.get()); @@ -177,7 +237,7 @@ public void validateShardLimit(final String indexName, final Settings settings, * @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled. */ public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) { - int shardsToOpen = Arrays.stream(indicesToOpen) + Index[] filteredIndices = Arrays.stream(indicesToOpen) /* Validate shard limit only for non system indices as it is not hard limit anyways. Further also validates if the cluster.ignore_dot_indexes is set to true. @@ -186,14 +246,10 @@ public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) */ .filter(index -> !shouldIndexBeIgnored(index.getName())) .filter(index -> currentState.metadata().index(index).getState().equals(IndexMetadata.State.CLOSE)) - .mapToInt(index -> getTotalShardCount(currentState, index)) - .sum(); + .toArray(Index[]::new); - Optional error = checkShardLimit(shardsToOpen, currentState); - if (error.isPresent()) { - ValidationException ex = new ValidationException(); - ex.addValidationError(error.get()); - throw ex; + if (filteredIndices.length > 0) { + validateShardLimitForIndices(filteredIndices, currentState, index -> getTotalShardCount(currentState, index)); } } @@ -233,6 +289,36 @@ private boolean isDataStreamIndex(String indexName) { return indexName.startsWith(DataStream.BACKING_INDEX_PREFIX); } + /** + * Validates shard limits for a collection of indices using a provided shard calculation function. + * + * @param indices The indices to validate + * @param currentState The current cluster state + * @param shardCalculator Function to calculate shard count for each index + * @throws ValidationException If any routing pool would exceed shard limits + */ + public void validateShardLimitForIndices(Index[] indices, ClusterState currentState, Function shardCalculator) + throws ValidationException { + Map shardCountByRoutingPool = Arrays.stream(indices) + .collect( + Collectors.toMap(index -> RoutingPool.getIndexPool(currentState.metadata().index(index)), shardCalculator, Integer::sum) + ); + + List violations = new ArrayList<>(); + for (Map.Entry entry : shardCountByRoutingPool.entrySet()) { + Optional error = checkShardLimit(entry.getValue(), currentState, entry.getKey()); + if (error.isPresent()) { + violations.add(error.get()); + } + } + + if (!violations.isEmpty()) { + ValidationException ex = new ValidationException(); + ex.addValidationErrors(violations); + throw ex; + } + } + /** * Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit. * Returns an error message if appropriate, or an empty {@link Optional} otherwise. @@ -242,19 +328,35 @@ private boolean isDataStreamIndex(String indexName) { * @return If present, an error message to be given as the reason for failing * an operation. If empty, a sign that the operation is valid. */ - public Optional checkShardLimit(int newShards, ClusterState state) { - return checkShardLimit(newShards, state, getShardLimitPerNode(), getShardLimitPerCluster()); + public Optional checkShardLimit(int newShards, ClusterState state, RoutingPool shardPool) { + return shardPool == RoutingPool.REMOTE_CAPABLE + ? checkShardLimit( + newShards, + state.getMetadata().getTotalOpenRemoteCapableIndexShards(), + getRemoteCapableShardLimitPerNode(), + getRemoteCapableShardLimitPerCluster(), + state.getNodes().getWarmNodes().size(), + shardPool + ) + : checkShardLimit( + newShards, + state.getMetadata().getTotalOpenIndexShards(), + getShardLimitPerNode(), + getShardLimitPerCluster(), + state.getNodes().getDataNodes().size(), + shardPool + ); } // package-private for testing static Optional checkShardLimit( int newShards, - ClusterState state, + long currentOpenShards, int maxShardsPerNodeSetting, - int maxShardsPerClusterSetting + int maxShardsPerClusterSetting, + int nodeCount, + RoutingPool shardPool ) { - int nodeCount = state.getNodes().getDataNodes().size(); - // Only enforce the shard limit if we have at least one data node, so that we don't block // index creation during cluster setup if (nodeCount == 0 || newShards < 0) { @@ -269,20 +371,30 @@ static Optional checkShardLimit( maxShardsInCluster = Math.min(maxShardsInCluster, computedMaxShards); } - long currentOpenShards = state.getMetadata().getTotalOpenIndexShards(); if ((currentOpenShards + newShards) > maxShardsInCluster) { String errorMessage = "this action would add [" + newShards - + "] total shards, but this cluster currently has [" + + "] total " + + shardPool + + " shards, but this cluster currently has [" + currentOpenShards + "]/[" + maxShardsInCluster - + "] maximum shards open"; + + "] maximum " + + shardPool + + " shards open"; return Optional.of(errorMessage); } return Optional.empty(); } + static RoutingPool getShardRoutingPool(Settings indexSettings) { + Boolean isRemoteCapableIndex = IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings) + || (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG) + && indexSettings.getAsBoolean(IndexModule.IS_WARM_INDEX_SETTING.getKey(), false)); + return isRemoteCapableIndex ? RoutingPool.REMOTE_CAPABLE : RoutingPool.LOCAL_ONLY; + } + /** * Validates the MaxShadPerCluster threshold. */ @@ -325,6 +437,48 @@ public Iterator> settings() { } } + /** + * Validates the MaxShadPerCluster threshold. + */ + static final class MaxRemoteCapableShardPerClusterLimitValidator implements Setting.Validator { + + @Override + public void validate(Integer value) {} + + @Override + public void validate(Integer maxShardPerCluster, Map, Object> settings) { + final int maxShardPerNode = (int) settings.get(SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE); + doValidate(maxShardPerCluster, maxShardPerNode); + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_NODE); + return settings.iterator(); + } + } + + /** + * Validates the MaxShadPerNode threshold. + */ + static final class MaxRemoteCapableShardPerNodeLimitValidator implements Setting.Validator { + + @Override + public void validate(Integer value) {} + + @Override + public void validate(Integer maxShardPerNode, Map, Object> settings) { + final int maxShardPerCluster = (int) settings.get(SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_CLUSTER); + doValidate(maxShardPerCluster, maxShardPerNode); + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(SETTING_CLUSTER_MAX_REMOTE_CAPABLE_SHARDS_PER_CLUSTER); + return settings.iterator(); + } + } + private static void doValidate(final int maxShardPerCluster, final int maxShardPerNode) { if (maxShardPerCluster != -1 && maxShardPerCluster < maxShardPerNode) { throw new IllegalArgumentException( diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 785ba35afaf4d..4fe94590bcf9b 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -2610,7 +2610,7 @@ public void testIndexTotalPrimaryShardsPerNodeSettingValidationWithoutRemoteStor // Verify error message assertEquals( - "Setting [index.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters", + "Setting [index.routing.allocation.total_primary_shards_per_node] or [index.routing.allocation.total_remote_capable_primary_shards_per_node] can only be used with remote store enabled clusters", exception.getMessage() ); } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java index a91e724ec279d..41598b2d1efaa 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java @@ -2458,7 +2458,7 @@ public void testIndexPrimaryShardsSetting() { List throwables = putTemplate(xContentRegistry(), request, clusterSettings); assertThat(throwables.get(0), instanceOf(IllegalArgumentException.class)); assertEquals( - "Setting [index.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters", + "Setting [index.routing.allocation.total_primary_shards_per_node] or [index.routing.allocation.total_remote_capable_primary_shards_per_node] can only be used with remote store enabled clusters", throwables.get(0).getMessage() ); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderTests.java index ffc42d11d3696..1d253728a447b 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderTests.java @@ -13,6 +13,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; @@ -25,6 +26,10 @@ import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchTestCase; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import static org.opensearch.cluster.routing.allocation.decider.Decision.Type.NO; import static org.opensearch.cluster.routing.allocation.decider.Decision.Type.YES; @@ -344,6 +349,7 @@ public void testIndexPrimaryShardLimit() { } private DiscoveryNode newNode(String nodeId) { - return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), Version.CURRENT); + Set roles = new HashSet<>(Collections.singleton(DiscoveryNodeRole.DATA_ROLE)); + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, Version.CURRENT); } } diff --git a/server/src/test/java/org/opensearch/indices/ShardLimitValidatorTests.java b/server/src/test/java/org/opensearch/indices/ShardLimitValidatorTests.java index 0b1ec8fd85ae5..185c6138b4ec7 100644 --- a/server/src/test/java/org/opensearch/indices/ShardLimitValidatorTests.java +++ b/server/src/test/java/org/opensearch/indices/ShardLimitValidatorTests.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.shards.ShardCounts; import org.opensearch.common.ValidationException; @@ -77,7 +78,14 @@ public void testOverShardLimit() { ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas()); int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); - Optional errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode(), -1); + Optional errorMessage = ShardLimitValidator.checkShardLimit( + shardsToAdd, + state.getMetadata().getTotalOpenIndexShards(), + counts.getShardsPerNode(), + -1, + state.getNodes().getDataNodes().size(), + RoutingPool.LOCAL_ONLY + ); int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); @@ -86,11 +94,11 @@ public void testOverShardLimit() { assertEquals( "this action would add [" + totalShards - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + currentShards + "]/[" + maxShards - + "] maximum shards open", + + "] maximum LOCAL_ONLY shards open", errorMessage.get() ); } @@ -104,9 +112,11 @@ public void testOverShardLimitWithMaxShardCountLimit() { int maxShardLimitOnCluster = shardsToAdd - 1; Optional errorMessage = ShardLimitValidator.checkShardLimit( shardsToAdd, - state, + state.getMetadata().getTotalOpenIndexShards(), counts.getShardsPerNode(), - maxShardLimitOnCluster + maxShardLimitOnCluster, + state.getNodes().getDataNodes().size(), + RoutingPool.LOCAL_ONLY ); int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); @@ -116,11 +126,11 @@ public void testOverShardLimitWithMaxShardCountLimit() { assertEquals( "this action would add [" + totalShards - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + currentShards + "]/[" + maxShards - + "] maximum shards open", + + "] maximum LOCAL_ONLY shards open", errorMessage.get() ); } @@ -136,7 +146,14 @@ public void testUnderShardLimit() { int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards); - Optional errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode(), -1); + Optional errorMessage = ShardLimitValidator.checkShardLimit( + shardsToAdd, + state.getMetadata().getTotalOpenIndexShards(), + counts.getShardsPerNode(), + -1, + state.getNodes().getDataNodes().size(), + RoutingPool.LOCAL_ONLY + ); assertFalse(errorMessage.isPresent()); } @@ -175,11 +192,11 @@ public void testNonSystemIndexCreationFails() { assertEquals( "Validation Failed: 1: this action would add [" + 2 - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + 1 + "]/[" + 1 - + "] maximum shards open;", + + "] maximum LOCAL_ONLY shards open;", exception.getMessage() ); } @@ -205,11 +222,11 @@ public void testNonSystemIndexCreationFailsWithMaxShardLimitOnCluster() { assertEquals( "Validation Failed: 1: this action would add [" + 2 - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + 1 + "]/[" + maxShardLimitOnCluster - + "] maximum shards open;", + + "] maximum LOCAL_ONLY shards open;", exception.getMessage() ); } @@ -217,18 +234,32 @@ public void testNonSystemIndexCreationFailsWithMaxShardLimitOnCluster() { public void testComputedMaxShardsOfClusterIntOverFlow() { final int maxShardLimitPerNode = 500_000_000; ClusterState state = createClusterForShardLimitTest(15, 1, 1); - Optional errorMessage = ShardLimitValidator.checkShardLimit(2, state, maxShardLimitPerNode, -1); + Optional errorMessage = ShardLimitValidator.checkShardLimit( + 2, + state.getMetadata().getTotalOpenIndexShards(), + maxShardLimitPerNode, + -1, + state.getNodes().getDataNodes().size(), + RoutingPool.LOCAL_ONLY + ); assertFalse(errorMessage.isPresent()); - errorMessage = ShardLimitValidator.checkShardLimit(Integer.MAX_VALUE - 1, state, maxShardLimitPerNode, -1); + errorMessage = ShardLimitValidator.checkShardLimit( + Integer.MAX_VALUE - 1, + state.getMetadata().getTotalOpenIndexShards(), + maxShardLimitPerNode, + -1, + state.getNodes().getDataNodes().size(), + RoutingPool.LOCAL_ONLY + ); assertEquals( "this action would add [" + (Integer.MAX_VALUE - 1) - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + 2 + "]/[" + Integer.MAX_VALUE - + "] maximum shards open", + + "] maximum LOCAL_ONLY shards open", errorMessage.get() ); } @@ -284,11 +315,11 @@ public void testDotIndexCreationFails() { assertEquals( "Validation Failed: 1: this action would add [" + 2 - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + 1 + "]/[" + 1 - + "] maximum shards open;", + + "] maximum LOCAL_ONLY shards open;", exception.getMessage() ); } @@ -312,11 +343,11 @@ public void testDataStreamIndexCreationFails() { assertEquals( "Validation Failed: 1: this action would add [" + 2 - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + 1 + "]/[" + 1 - + "] maximum shards open;", + + "] maximum LOCAL_ONLY shards open;", exception.getMessage() ); } @@ -367,11 +398,11 @@ public void testNonSystemIndexOpeningFails() { assertEquals( "Validation Failed: 1: this action would add [" + totalShards - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + currentShards + "]/[" + maxShards - + "] maximum shards open;", + + "] maximum LOCAL_ONLY shards open;", exception.getMessage() ); } @@ -465,11 +496,11 @@ public void testDotIndexOpeningFails() { assertEquals( "Validation Failed: 1: this action would add [" + totalShards - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + currentShards + "]/[" + maxShards - + "] maximum shards open;", + + "] maximum LOCAL_ONLY shards open;", exception.getMessage() ); } @@ -508,11 +539,11 @@ public void testDataStreamIndexOpeningFails() { assertEquals( "Validation Failed: 1: this action would add [" + totalShards - + "] total shards, but this cluster currently has [" + + "] total LOCAL_ONLY shards, but this cluster currently has [" + currentShards + "]/[" + maxShards - + "] maximum shards open;", + + "] maximum LOCAL_ONLY shards open;", exception.getMessage() ); }