Skip to content

Enabled default throttling for all tasks submitted to cluster manager #17711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))
- Support sub agg in filter rewrite optimization ([#17447](https://github.com/opensearch-project/OpenSearch/pull/17447)
- Disable scoring of keyword term search by default, fallback logic with new use_similarity:true parameter ([#17889](https://github.com/opensearch-project/OpenSearch/pull/17889))
- Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.WorkloadGroup.updateExistingWorkloadGroup;
import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.CREATE_QUERY_GROUP_KEY;
import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.DELETE_QUERY_GROUP_KEY;
import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.UPDATE_QUERY_GROUP_KEY;

/**
* This class defines the functions for WorkloadGroup persistence
*/
public class WorkloadGroupPersistenceService {
static final String SOURCE = "query-group-persistence-service";
private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group";
private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group";
private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group";
private static final Logger logger = LogManager.getLogger(WorkloadGroupPersistenceService.class);
/**
* max WorkloadGroup count setting name
Expand Down Expand Up @@ -94,9 +94,9 @@ public WorkloadGroupPersistenceService(
final ClusterSettings clusterSettings
) {
this.clusterService = clusterService;
this.createWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_THROTTLING_KEY, true);
this.deleteWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_THROTTLING_KEY, true);
this.updateWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true);
this.createWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_KEY, true);
this.deleteWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_KEY, true);
this.updateWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_KEY, true);
setMaxWorkloadGroupCount(MAX_QUERY_GROUP_COUNT.get(settings));
clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxWorkloadGroupCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.opensearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.AllocationCommand;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand All @@ -70,6 +69,8 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.CLUSTER_REROUTE_API_KEY;

/**
* Transport action for rerouting cluster allocation commands
*
Expand Down Expand Up @@ -102,7 +103,7 @@ public TransportClusterRerouteAction(
);
this.allocationService = allocationService;
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
clusterRerouteTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_REROUTE_API_KEY, true);
clusterRerouteTaskKey = clusterService.registerClusterManagerTask(CLUSTER_REROUTE_API_KEY, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand All @@ -65,6 +64,7 @@
import java.io.IOException;

import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.CLUSTER_UPDATE_SETTINGS_KEY;
import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration;

/**
Expand Down Expand Up @@ -108,7 +108,7 @@ public TransportClusterUpdateSettingsAction(
this.clusterSettings = clusterSettings;

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
clusterUpdateSettingTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_UPDATE_SETTINGS_KEY, true);
clusterUpdateSettingTaskKey = clusterService.registerClusterManagerTask(CLUSTER_UPDATE_SETTINGS_KEY, true);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -51,6 +50,8 @@

import java.io.IOException;

import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.DELETE_SCRIPT_KEY;

/**
* Transport action for deleting stored script
*
Expand Down Expand Up @@ -81,7 +82,7 @@ public TransportDeleteStoredScriptAction(
);
this.scriptService = scriptService;
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
deleteScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SCRIPT_KEY, true);
deleteScriptTaskKey = clusterService.registerClusterManagerTask(DELETE_SCRIPT_KEY, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -51,6 +50,8 @@

import java.io.IOException;

import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.PUT_SCRIPT_KEY;

/**
* Transport action for putting stored script
*
Expand Down Expand Up @@ -81,7 +82,7 @@ public TransportPutStoredScriptAction(
);
this.scriptService = scriptService;
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
putScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SCRIPT_KEY, true);
putScriptTaskKey = clusterService.registerClusterManagerTask(PUT_SCRIPT_KEY, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
import org.opensearch.cluster.metadata.MetadataCreateIndexService;
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand All @@ -62,6 +61,8 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.AUTO_CREATE_KEY;

/**
* Api that auto creates an index or data stream that originate from requests that write into an index that doesn't yet exist.
*
Expand Down Expand Up @@ -104,7 +105,7 @@ public TransportAction(
this.metadataCreateDataStreamService = metadataCreateDataStreamService;

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
autoCreateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.AUTO_CREATE_KEY, true);
autoCreateTaskKey = clusterService.registerClusterManagerTask(AUTO_CREATE_KEY, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -67,6 +66,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.DELETE_DANGLING_INDEX_KEY;

/**
* Implements the deletion of a dangling index. When handling a {@link DeleteDanglingIndexAction},
* this class first checks that such a dangling index exists. It then submits a cluster state update
Expand Down Expand Up @@ -105,7 +106,7 @@ public TransportDeleteDanglingIndexAction(
this.settings = settings;
this.nodeClient = nodeClient;
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
deleteDanglingIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_DANGLING_INDEX_KEY, true);
deleteDanglingIndexTaskKey = clusterService.registerClusterManagerTask(DELETE_DANGLING_INDEX_KEY, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.MetadataDeleteIndexService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand All @@ -75,6 +74,7 @@
import java.util.Set;

import static org.opensearch.action.ValidateActions.addValidationError;
import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.REMOVE_DATA_STREAM_KEY;

/**
* Transport action for deleting a datastream
Expand Down Expand Up @@ -186,7 +186,7 @@ public TransportAction(
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.deleteIndexService = deleteIndexService;
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
removeDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_DATA_STREAM_KEY, true);
removeDataStreamTaskKey = clusterService.registerClusterManagerTask(REMOVE_DATA_STREAM_KEY, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand All @@ -71,6 +70,8 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.ROLLOVER_INDEX_KEY;

/**
* Main class to swap the index pointed to by an alias, given some conditions
*
Expand Down Expand Up @@ -106,7 +107,7 @@ public TransportRolloverAction(
this.client = client;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
rolloverIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.ROLLOVER_INDEX_KEY, true);
rolloverIndexTaskKey = clusterService.registerClusterManagerTask(ROLLOVER_INDEX_KEY, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ack.ClusterStateUpdateRequest;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand All @@ -63,6 +62,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.CREATE_DATA_STREAM_KEY;

/**
* Creates a data stream of metadata
*
Expand All @@ -86,7 +87,7 @@ public MetadataCreateDataStreamService(
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.metadataCreateIndexService = metadataCreateIndexService;
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_DATA_STREAM_KEY, true);
createDataStreamTaskKey = clusterService.registerClusterManagerTask(CREATE_DATA_STREAM_KEY, true);
}

public void createDataStream(CreateDataStreamClusterStateUpdateRequest request, ActionListener<AcknowledgedResponse> finalListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -156,6 +155,7 @@
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findContextTemplateName;
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.CREATE_INDEX_KEY;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
Expand Down Expand Up @@ -225,7 +225,7 @@ public MetadataCreateIndexService(
this.awarenessReplicaBalance = awarenessReplicaBalance;

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
createIndexTaskKey = clusterService.registerClusterManagerTask(CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStoreCustomMetadataResolver = isRemoteDataAttributePresent(settings)
? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier, repositoriesServiceSupplier, settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand All @@ -62,6 +61,8 @@
import java.util.Map;
import java.util.Set;

import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.DELETE_INDEX_KEY;

/**
* Deletes indices.
*
Expand All @@ -84,7 +85,7 @@ public MetadataDeleteIndexService(Settings settings, ClusterService clusterServi
this.allocationService = allocationService;

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
deleteIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_INDEX_KEY, true);
deleteIndexTaskKey = clusterService.registerClusterManagerTask(DELETE_INDEX_KEY, true);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.metadata.AliasAction.NewAliasValidator;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand All @@ -62,6 +61,7 @@
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.opensearch.cluster.service.ClusterManagerTaskConfigurations.TaskKeys.INDEX_ALIASES_KEY;
import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;

/**
Expand Down Expand Up @@ -97,7 +97,7 @@ public MetadataIndexAliasesService(
this.xContentRegistry = xContentRegistry;

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
indexAliasTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.INDEX_ALIASES_KEY, true);
indexAliasTaskKey = clusterService.registerClusterManagerTask(INDEX_ALIASES_KEY, true);

}

Expand Down
Loading