Skip to content

Commit 7aa3bc8

Browse files
committed
Enabled default throttling for all tasks submitted to cluster manager
Signed-off-by: Manik Garg <[email protected]>
1 parent d066743 commit 7aa3bc8

File tree

3 files changed

+34
-20
lines changed

3 files changed

+34
-20
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- [Rule Based Auto-tagging] Add rule schema for auto tagging ([#17238](https://github.com/opensearch-project/OpenSearch/pull/17238))
1010
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
1111
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
12-
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
12+
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
1313
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
1414
- [Security Manager Replacement] Add support of Java policies ([#17663](https://github.com/opensearch-project/OpenSearch/pull/17663))
1515
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615)
16+
- Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711))
1617

1718
### Changed
1819
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashSet;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.Objects;
2425
import java.util.Set;
2526
import java.util.concurrent.ConcurrentHashMap;
2627
import java.util.concurrent.ConcurrentMap;
@@ -31,9 +32,11 @@
3132
* This class does throttling on task submission to cluster manager node, it uses throttling key defined in various executors
3233
* as key for throttling. Throttling will be performed over task executor's class level, different task types have different executors class.
3334
* <p>
34-
* Set specific setting to for setting the threshold of throttling of particular task type.
35+
* Set specific setting for setting the threshold of throttling of a particular task type.
3536
* e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks,
36-
* Set it to default value(-1) to disable the throttling for this task type.
37+
* <p>
38+
* Set it to (-1) to disable the throttling for this task type.
39+
* If no setting is set, default threshold of (50) will be used for throttling when a task is registered with throttling enabled.
3740
*/
3841
public class ClusterManagerTaskThrottler implements TaskBatcherListener {
3942
private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);
@@ -67,6 +70,8 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
6770
protected Map<String, ThrottlingKey> THROTTLING_TASK_KEYS = new ConcurrentHashMap<>();
6871

6972
private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling
73+
// Note: Add different thresholds based on task type if required in the future.
74+
private final int DEFAULT_THRESHOLD_VALUE = 50;
7075
private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener;
7176

7277
final ConcurrentMap<String, Long> tasksCount;
@@ -122,13 +127,19 @@ public static TimeValue getMaxDelayForRetry() {
122127
* Added retry mechanism in TransportClusterManagerNodeAction, so it would be retried for customer generated tasks.
123128
* <p>
124129
* If tasks are not getting retried then we can register with false flag, so user won't be able to configure threshold limits for it.
130+
* <p>
131+
* If throttling is enabled, default threshold of (50) will be used if not specified in settings.
125132
*/
126133
protected ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) {
127134
ThrottlingKey throttlingKey = new ThrottlingKey(taskKey, throttlingEnabled);
128135
if (THROTTLING_TASK_KEYS.containsKey(taskKey)) {
129136
throw new IllegalArgumentException("There is already a Throttling key registered with same name: " + taskKey);
130137
}
131138
THROTTLING_TASK_KEYS.put(taskKey, throttlingKey);
139+
140+
if (throttlingEnabled && Objects.isNull(getThrottlingLimit(taskKey))) {
141+
tasksThreshold.put(taskKey, (long) DEFAULT_THRESHOLD_VALUE);
142+
}
132143
return throttlingKey;
133144
}
134145

@@ -176,7 +187,7 @@ void validateSetting(final Settings settings) {
176187
if (!THROTTLING_TASK_KEYS.get(key).isThrottlingEnabled()) {
177188
throw new IllegalArgumentException("Throttling is not enabled for given task type: " + key);
178189
}
179-
int threshold = groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE);
190+
int threshold = groups.get(key).getAsInt("value", DEFAULT_THRESHOLD_VALUE);
180191
if (threshold < MIN_THRESHOLD_VALUE) {
181192
throw new IllegalArgumentException("Provide positive integer for limit or -1 for disabling throttling");
182193
}
@@ -192,7 +203,7 @@ void updateSetting(final Settings newSettings) {
192203
settingKeys.addAll(tasksThreshold.keySet());
193204
for (String key : settingKeys) {
194205
Settings setting = groups.get(key);
195-
updateLimit(key, setting == null ? MIN_THRESHOLD_VALUE : setting.getAsInt("value", MIN_THRESHOLD_VALUE));
206+
updateLimit(key, setting == null ? DEFAULT_THRESHOLD_VALUE : setting.getAsInt("value", DEFAULT_THRESHOLD_VALUE));
196207
}
197208
}
198209

server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java

+17-15
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.CountDownLatch;
3535
import java.util.concurrent.TimeUnit;
3636

37+
import static org.opensearch.cluster.service.ClusterManagerTaskThrottler.THRESHOLD_SETTINGS;
3738
import static org.opensearch.test.ClusterServiceUtils.setState;
3839

3940
/**
@@ -75,8 +76,9 @@ public void testDefaults() {
7576
}, new ClusterManagerThrottlingStats());
7677
throttler.registerClusterManagerTask("put-mapping", true);
7778
throttler.registerClusterManagerTask("create-index", true);
79+
7880
for (String key : throttler.THROTTLING_TASK_KEYS.keySet()) {
79-
assertNull(throttler.getThrottlingLimit(key));
81+
assertEquals(50, throttler.getThrottlingLimit(key).intValue());
8082
}
8183
}
8284

@@ -94,11 +96,11 @@ public void testValidateSettingsForDifferentVersion() {
9496
}, new ClusterManagerThrottlingStats());
9597
throttler.registerClusterManagerTask("put-mapping", true);
9698

97-
// set some limit for update snapshot tasks
99+
// set some limit for put-mapping tasks
98100
int newLimit = randomIntBetween(1, 10);
99101

100102
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build();
101-
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
103+
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(THRESHOLD_SETTINGS.get(newSettings)));
102104

103105
// validate for empty setting, it shouldn't throw exception
104106
Settings emptySettings = Settings.builder().build();
@@ -124,11 +126,11 @@ public void testValidateSettingsForTaskWithoutRetryOnDataNode() {
124126
}, new ClusterManagerThrottlingStats());
125127
throttler.registerClusterManagerTask("put-mapping", false);
126128

127-
// set some limit for update snapshot tasks
129+
// set some limit for put-mapping tasks
128130
int newLimit = randomIntBetween(1, 10);
129131

130132
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build();
131-
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
133+
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(THRESHOLD_SETTINGS.get(newSettings)));
132134
}
133135

134136
public void testUpdateSettingsForNullValue() {
@@ -154,7 +156,7 @@ public void testUpdateSettingsForNullValue() {
154156
// set limit to null
155157
Settings nullSettings = Settings.builder().build();
156158
clusterSettings.applySettings(nullSettings);
157-
assertNull(throttler.getThrottlingLimit("put-mapping"));
159+
assertEquals(50, throttler.getThrottlingLimit("put-mapping").intValue());
158160
}
159161

160162
public void testSettingsOnBootstrap() {
@@ -222,10 +224,10 @@ public void testValidateSettingsForUnknownTask() {
222224
return clusterService.getClusterManagerService().getMinNodeVersion();
223225
}, new ClusterManagerThrottlingStats());
224226

225-
// set some limit for update snapshot tasks
227+
// set some limit for random tasks
226228
int newLimit = randomIntBetween(1, 10);
227229
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.random-task.value", newLimit).build();
228-
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
230+
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(THRESHOLD_SETTINGS.get(newSettings)));
229231
}
230232

231233
public void testUpdateThrottlingLimitForBasicSanity() {
@@ -242,17 +244,17 @@ public void testUpdateThrottlingLimitForBasicSanity() {
242244
}, new ClusterManagerThrottlingStats());
243245
throttler.registerClusterManagerTask("put-mapping", true);
244246

245-
// set some limit for update snapshot tasks
247+
// set some limit for put-mapping tasks
246248
long newLimit = randomLongBetween(1, 10);
247249

248250
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build();
249251
clusterSettings.applySettings(newSettings);
250252
assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue());
251253

252-
// set update snapshot task limit to default
253-
newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", -1).build();
254+
// set put-mapping task limit to 20
255+
newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", 20).build();
254256
clusterSettings.applySettings(newSettings);
255-
assertNull(throttler.getThrottlingLimit("put-mapping"));
257+
assertEquals(20, throttler.getThrottlingLimit("put-mapping").intValue());
256258
}
257259

258260
public void testValidateSettingForLimit() {
@@ -269,8 +271,8 @@ public void testValidateSettingForLimit() {
269271
}, new ClusterManagerThrottlingStats());
270272
throttler.registerClusterManagerTask("put-mapping", true);
271273

272-
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.values", -5).build();
273-
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings));
274+
Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", -5).build();
275+
assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(THRESHOLD_SETTINGS.get(newSettings)));
274276
}
275277

276278
public void testUpdateLimit() {
@@ -348,7 +350,7 @@ public void testThrottlingForInitialStaticSettingAndVersionCheck() {
348350
}, throttlingStats);
349351
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true);
350352

351-
// verifying adding more tasks then threshold passes
353+
// verifying adding more tasks than threshold passes
352354
throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, put_mapping_threshold_value + 5));
353355
assertEquals(0L, throttlingStats.getThrottlingCount("put-mapping"));
354356

0 commit comments

Comments
 (0)