Skip to content

Commit 56cdb85

Browse files
ylwu-amznjngz-es
andauthored
Add native memory circuit breaker. (opensearch-project#689) (opensearch-project#777)
* Add native memory circuit breaker. Refactor all breakers from common to plugin. Add dynamic setting for native memory circuit breaker. * Address the comments 1. * Spotless changes. * Address the comments 2. Signed-off-by: Jing Zhang <[email protected]> Co-authored-by: Jing Zhang <[email protected]>
1 parent eddf095 commit 56cdb85

31 files changed

+257
-55
lines changed

plugin/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ List<String> jacocoExclusions = [
266266
'org.opensearch.ml.task.MLExecuteTaskRunner',
267267
'org.opensearch.ml.action.profile.MLProfileTransportAction',
268268
'org.opensearch.ml.action.models.DeleteModelTransportAction.1',
269-
'org.opensearch.ml.rest.RestMLPredictionAction'
269+
'org.opensearch.ml.rest.RestMLPredictionAction',
270+
'org.opensearch.ml.breaker.DiskCircuitBreaker'
270271
]
271272

272273
jacocoTestCoverageVerification {

plugin/src/main/java/org/opensearch/ml/action/load/TransportLoadModelOnNodeAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.opensearch.common.io.stream.StreamInput;
2929
import org.opensearch.common.settings.Settings;
3030
import org.opensearch.core.xcontent.NamedXContentRegistry;
31+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
3132
import org.opensearch.ml.common.FunctionName;
3233
import org.opensearch.ml.common.MLTask;
33-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
3434
import org.opensearch.ml.common.transport.forward.MLForwardAction;
3535
import org.opensearch.ml.common.transport.forward.MLForwardInput;
3636
import org.opensearch.ml.common.transport.forward.MLForwardRequest;

common/src/main/java/org/opensearch/ml/common/breaker/BreakerName.java plugin/src/main/java/org/opensearch/ml/breaker/BreakerName.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
6+
package org.opensearch.ml.breaker;
77

88
public enum BreakerName {
99
MEMORY,
10-
DISK
10+
DISK,
11+
NATIVE_MEMORY
1112
}

common/src/main/java/org/opensearch/ml/common/breaker/CircuitBreaker.java plugin/src/main/java/org/opensearch/ml/breaker/CircuitBreaker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
6+
package org.opensearch.ml.breaker;
77

88
/**
99
* An interface for circuit breaker.

common/src/main/java/org/opensearch/ml/common/breaker/DiskCircuitBreaker.java plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
7-
8-
import org.opensearch.ml.common.exception.MLException;
6+
package org.opensearch.ml.breaker;
97

108
import java.io.File;
119
import java.security.AccessController;
1210
import java.security.PrivilegedActionException;
1311
import java.security.PrivilegedExceptionAction;
1412

13+
import org.opensearch.ml.common.exception.MLException;
14+
1515
/**
1616
* A circuit breaker for disk usage.
1717
*/
1818
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<Long> {
19+
// TODO: make this value configurable as cluster setting
1920
private static final String ML_DISK_CB = "Disk Circuit Breaker";
2021
public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L;
22+
private static final long GB = 1024 * 1024 * 1024;
2123
private String diskDir;
2224

2325
public DiskCircuitBreaker(String diskDir) {
@@ -32,17 +34,17 @@ public DiskCircuitBreaker(long threshold, String diskDir) {
3234

3335
@Override
3436
public String getName() {
35-
return ML_DISK_CB;
37+
return ML_DISK_CB;
3638
}
3739

3840
@Override
3941
public boolean isOpen() {
4042
try {
4143
return AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () -> {
42-
return (new File(diskDir).getFreeSpace()/1024/1024/1024) < getThreshold(); // in GB
44+
return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB
4345
});
4446
} catch (PrivilegedActionException e) {
4547
throw new MLException("Failed to run disk circuit breaker");
4648
}
4749
}
48-
}
50+
}

common/src/main/java/org/opensearch/ml/common/breaker/MLCircuitBreakerService.java plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
7-
8-
import lombok.extern.log4j.Log4j2;
9-
import org.opensearch.monitor.jvm.JvmService;
6+
package org.opensearch.ml.breaker;
107

118
import java.nio.file.Path;
129
import java.util.concurrent.ConcurrentHashMap;
1310
import java.util.concurrent.ConcurrentMap;
1411

12+
import lombok.extern.log4j.Log4j2;
13+
14+
import org.opensearch.cluster.service.ClusterService;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.monitor.jvm.JvmService;
17+
import org.opensearch.monitor.os.OsService;
18+
1519
/**
1620
* This service registers internal system breakers and provide API for users to register their own breakers.
1721
*/
@@ -20,14 +24,23 @@ public class MLCircuitBreakerService {
2024

2125
private final ConcurrentMap<BreakerName, CircuitBreaker> breakers = new ConcurrentHashMap<>();
2226
private final JvmService jvmService;
27+
private final OsService osService;
28+
private final Settings settings;
29+
private final ClusterService clusterService;
2330

2431
/**
2532
* Constructor.
2633
*
2734
* @param jvmService jvm info
35+
* @param osService os info
36+
* @param settings settings
37+
* @param clusterService clusterService
2838
*/
29-
public MLCircuitBreakerService(JvmService jvmService) {
39+
public MLCircuitBreakerService(JvmService jvmService, OsService osService, Settings settings, ClusterService clusterService) {
3040
this.jvmService = jvmService;
41+
this.osService = osService;
42+
this.settings = settings;
43+
this.clusterService = clusterService;
3144
}
3245

3346
public void registerBreaker(BreakerName name, CircuitBreaker breaker) {
@@ -65,18 +78,21 @@ public MLCircuitBreakerService init(Path path) {
6578
log.info("Registered ML memory breaker.");
6679
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString()));
6780
log.info("Registered ML disk breaker.");
81+
// Register native memory circuit breaker
82+
registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService));
83+
log.info("Registered ML native memory breaker.");
6884

6985
return this;
7086
}
7187

7288
/**
7389
*
74-
* @return the name of any open circuit breaker; otherwise return null
90+
* @return any open circuit breaker; otherwise return null
7591
*/
76-
public String checkOpenCB() {
92+
public ThresholdCircuitBreaker checkOpenCB() {
7793
for (CircuitBreaker breaker : breakers.values()) {
7894
if (breaker.isOpen()) {
79-
return breaker.getName();
95+
return (ThresholdCircuitBreaker) breaker;
8096
}
8197
}
8298

common/src/main/java/org/opensearch/ml/common/breaker/MemoryCircuitBreaker.java plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
6+
package org.opensearch.ml.breaker;
77

88
import org.opensearch.monitor.jvm.JvmService;
99

1010
/**
1111
* A circuit breaker for memory usage.
1212
*/
1313
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
14-
//TODO: make this value configurable as cluster setting
14+
// TODO: make this value configurable as cluster setting
1515
private static final String ML_MEMORY_CB = "Memory Circuit Breaker";
1616
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
1717
private final JvmService jvmService;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.ml.breaker;
7+
8+
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD;
9+
10+
import org.opensearch.cluster.service.ClusterService;
11+
import org.opensearch.common.settings.Settings;
12+
import org.opensearch.monitor.os.OsService;
13+
14+
/**
15+
* A circuit breaker for native memory usage.
16+
*/
17+
public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
18+
private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker";
19+
public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90;
20+
private final OsService osService;
21+
private volatile Integer nativeMemThreshold = 90;
22+
23+
public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) {
24+
super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD);
25+
this.osService = osService;
26+
this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings);
27+
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it);
28+
}
29+
30+
public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) {
31+
super(threshold.shortValue());
32+
this.nativeMemThreshold = threshold;
33+
this.osService = osService;
34+
}
35+
36+
@Override
37+
public String getName() {
38+
return ML_MEMORY_CB;
39+
}
40+
41+
@Override
42+
public Short getThreshold() {
43+
return this.nativeMemThreshold.shortValue();
44+
}
45+
46+
@Override
47+
public boolean isOpen() {
48+
return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue();
49+
}
50+
}

common/src/main/java/org/opensearch/ml/common/breaker/ThresholdCircuitBreaker.java plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.ml.common.breaker;
6+
package org.opensearch.ml.breaker;
77

88
/**
99
* An abstract class for all breakers with threshold.

plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@
7373
import org.opensearch.index.query.TermQueryBuilder;
7474
import org.opensearch.index.reindex.DeleteByQueryAction;
7575
import org.opensearch.index.reindex.DeleteByQueryRequest;
76+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
7677
import org.opensearch.ml.common.FunctionName;
7778
import org.opensearch.ml.common.MLModel;
7879
import org.opensearch.ml.common.MLTask;
79-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
8080
import org.opensearch.ml.common.exception.MLException;
8181
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
8282
import org.opensearch.ml.common.model.MLModelState;

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.concurrent.ConcurrentHashMap;
1616
import java.util.function.Supplier;
1717

18+
import lombok.SneakyThrows;
19+
1820
import org.opensearch.action.ActionRequest;
1921
import org.opensearch.action.ActionResponse;
2022
import org.opensearch.client.Client;
@@ -56,11 +58,11 @@
5658
import org.opensearch.ml.action.upload_chunk.MLModelMetaCreate;
5759
import org.opensearch.ml.action.upload_chunk.TransportCreateModelMetaAction;
5860
import org.opensearch.ml.action.upload_chunk.TransportUploadModelChunkAction;
61+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
5962
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
6063
import org.opensearch.ml.cluster.MLCommonsClusterEventListener;
6164
import org.opensearch.ml.cluster.MLCommonsClusterManagerEventListener;
6265
import org.opensearch.ml.common.FunctionName;
63-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
6466
import org.opensearch.ml.common.input.execute.anomalylocalization.AnomalyLocalizationInput;
6567
import org.opensearch.ml.common.input.execute.samplecalculator.LocalSampleCalculatorInput;
6668
import org.opensearch.ml.common.input.parameter.ad.AnomalyDetectionLibSVMParams;
@@ -131,6 +133,7 @@
131133
import org.opensearch.ml.task.MLTrainingTaskRunner;
132134
import org.opensearch.ml.utils.IndexUtils;
133135
import org.opensearch.monitor.jvm.JvmService;
136+
import org.opensearch.monitor.os.OsService;
134137
import org.opensearch.plugins.ActionPlugin;
135138
import org.opensearch.plugins.Plugin;
136139
import org.opensearch.repositories.RepositoriesService;
@@ -207,6 +210,7 @@ public class MachineLearningPlugin extends Plugin implements ActionPlugin {
207210
);
208211
}
209212

213+
@SneakyThrows
210214
@Override
211215
public Collection<Object> createComponents(
212216
Client client,
@@ -232,7 +236,9 @@ public Collection<Object> createComponents(
232236
modelCacheHelper = new MLModelCacheHelper(clusterService, settings);
233237

234238
JvmService jvmService = new JvmService(environment.settings());
235-
MLCircuitBreakerService mlCircuitBreakerService = new MLCircuitBreakerService(jvmService).init(environment.dataFiles()[0]);
239+
OsService osService = new OsService(environment.settings());
240+
MLCircuitBreakerService mlCircuitBreakerService = new MLCircuitBreakerService(jvmService, osService, settings, clusterService)
241+
.init(environment.dataFiles()[0]);
236242

237243
Map<Enum, MLStat<?>> stats = new ConcurrentHashMap<>();
238244
// cluster level stats
@@ -505,7 +511,8 @@ public List<Setting<?>> getSettings() {
505511
MLCommonsSettings.ML_COMMONS_MAX_UPLOAD_TASKS_PER_NODE,
506512
MLCommonsSettings.ML_COMMONS_MAX_ML_TASK_PER_NODE,
507513
MLCommonsSettings.ML_COMMONS_MAX_LOAD_MODEL_TASKS_PER_NODE,
508-
MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX
514+
MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX,
515+
MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD
509516
);
510517
return settings;
511518
}

plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java

+3
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,7 @@ private MLCommonsSettings() {}
5353
Setting.Property.NodeScope,
5454
Setting.Property.Dynamic
5555
);
56+
57+
public static final Setting<Integer> ML_COMMONS_NATIVE_MEM_THRESHOLD = Setting
58+
.intSetting("plugins.ml_commons.native_memory_threshold", 90, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic);
5659
}

plugin/src/main/java/org/opensearch/ml/task/MLExecuteTaskRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
import org.opensearch.action.ActionListenerResponseHandler;
1414
import org.opensearch.client.Client;
1515
import org.opensearch.cluster.service.ClusterService;
16+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
1617
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
1718
import org.opensearch.ml.common.FunctionName;
18-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
1919
import org.opensearch.ml.common.input.Input;
2020
import org.opensearch.ml.common.output.Output;
2121
import org.opensearch.ml.common.transport.execute.MLExecuteTaskAction;

plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@
3232
import org.opensearch.commons.authuser.User;
3333
import org.opensearch.core.xcontent.NamedXContentRegistry;
3434
import org.opensearch.core.xcontent.XContentParser;
35+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
3536
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
3637
import org.opensearch.ml.common.FunctionName;
3738
import org.opensearch.ml.common.MLModel;
3839
import org.opensearch.ml.common.MLTask;
3940
import org.opensearch.ml.common.MLTaskState;
4041
import org.opensearch.ml.common.MLTaskType;
41-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
4242
import org.opensearch.ml.common.dataset.MLInputDataType;
4343
import org.opensearch.ml.common.dataset.MLInputDataset;
4444
import org.opensearch.ml.common.exception.MLException;

plugin/src/main/java/org/opensearch/ml/task/MLTaskRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414

1515
import org.opensearch.action.ActionListener;
1616
import org.opensearch.cluster.service.ClusterService;
17+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
1718
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
1819
import org.opensearch.ml.common.MLTask;
1920
import org.opensearch.ml.common.MLTaskState;
20-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
2121
import org.opensearch.ml.common.transport.MLTaskRequest;
2222
import org.opensearch.ml.common.transport.MLTaskResponse;
2323
import org.opensearch.ml.stats.MLNodeLevelStat;

plugin/src/main/java/org/opensearch/ml/task/MLTrainAndPredictTaskRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
import org.opensearch.action.support.ThreadedActionListener;
1818
import org.opensearch.client.Client;
1919
import org.opensearch.cluster.service.ClusterService;
20+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
2021
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
2122
import org.opensearch.ml.common.MLTask;
2223
import org.opensearch.ml.common.MLTaskState;
2324
import org.opensearch.ml.common.MLTaskType;
24-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
2525
import org.opensearch.ml.common.dataset.MLInputDataType;
2626
import org.opensearch.ml.common.dataset.MLInputDataset;
2727
import org.opensearch.ml.common.input.MLInput;

plugin/src/main/java/org/opensearch/ml/task/MLTrainingTaskRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.opensearch.common.xcontent.XContentType;
2626
import org.opensearch.core.xcontent.ToXContent;
2727
import org.opensearch.core.xcontent.XContentBuilder;
28+
import org.opensearch.ml.breaker.MLCircuitBreakerService;
2829
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
2930
import org.opensearch.ml.common.MLModel;
3031
import org.opensearch.ml.common.MLTask;
3132
import org.opensearch.ml.common.MLTaskState;
3233
import org.opensearch.ml.common.MLTaskType;
33-
import org.opensearch.ml.common.breaker.MLCircuitBreakerService;
3434
import org.opensearch.ml.common.dataset.MLInputDataType;
3535
import org.opensearch.ml.common.dataset.MLInputDataset;
3636
import org.opensearch.ml.common.input.MLInput;

0 commit comments

Comments
 (0)