Skip to content

Commit 041d6ce

Browse files
authored
Code Refactoring for CommonName (#901)
In this pull request, I have refactored the code related to shared names in both AD and forecasting modules to CommonNames. Additionally, the previously used CommonName has been renamed to ADCommonName. For the Forecasting module, I have introduced new names in ForecastCommonNames. Testing done: * gradle build Signed-off-by: Kaituo Li <[email protected]>
1 parent e2fee9d commit 041d6ce

File tree

152 files changed

+919
-819
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

152 files changed

+919
-819
lines changed

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,8 @@ List<String> jacocoExclusions = [
665665

666666
// Class containing just constants. Don't need to test
667667
'org.opensearch.ad.constant.*',
668+
'org.opensearch.forecast.constant.*',
669+
'org.opensearch.timeseries.constant.*',
668670

669671
//'org.opensearch.ad.common.exception.AnomalyDetectionException',
670672
'org.opensearch.ad.util.ClientUtil',

src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
6363
import org.opensearch.jobscheduler.spi.utils.LockService;
6464
import org.opensearch.threadpool.ThreadPool;
65+
import org.opensearch.timeseries.constant.CommonName;
6566

6667
import com.google.common.base.Throwables;
6768

@@ -514,7 +515,7 @@ private void stopAdJobForEndRunException(
514515
}
515516

516517
private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
517-
GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
518+
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId);
518519
ActionListener<GetResponse> listener = ActionListener.wrap(response -> {
519520
if (response.isExists()) {
520521
try (
@@ -537,7 +538,7 @@ private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
537538
job.getUser(),
538539
job.getResultIndex()
539540
);
540-
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX)
541+
IndexRequest indexRequest = new IndexRequest(CommonName.JOB_INDEX)
541542
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
542543
.source(newJob.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), XCONTENT_WITH_TYPE))
543544
.id(detectorId);

src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.opensearch.ad.cluster.ADDataMigrator;
4343
import org.opensearch.ad.cluster.ClusterManagerEventListener;
4444
import org.opensearch.ad.cluster.HashRing;
45-
import org.opensearch.ad.constant.CommonName;
45+
import org.opensearch.ad.constant.ADCommonName;
4646
import org.opensearch.ad.dataprocessor.IntegerSensitiveSingleFeatureLinearUniformInterpolator;
4747
import org.opensearch.ad.dataprocessor.Interpolator;
4848
import org.opensearch.ad.dataprocessor.LinearUniformInterpolator;
@@ -85,7 +85,6 @@
8585
import org.opensearch.ad.settings.NumericSetting;
8686
import org.opensearch.ad.stats.ADStat;
8787
import org.opensearch.ad.stats.ADStats;
88-
import org.opensearch.ad.stats.StatNames;
8988
import org.opensearch.ad.stats.suppliers.CounterSupplier;
9089
import org.opensearch.ad.stats.suppliers.IndexStatusSupplier;
9190
import org.opensearch.ad.stats.suppliers.ModelsOnNodeCountSupplier;
@@ -195,6 +194,8 @@
195194
import org.opensearch.threadpool.ExecutorBuilder;
196195
import org.opensearch.threadpool.ScalingExecutorBuilder;
197196
import org.opensearch.threadpool.ThreadPool;
197+
import org.opensearch.timeseries.constant.CommonName;
198+
import org.opensearch.timeseries.stats.StatNames;
198199
import org.opensearch.watcher.ResourceWatcherService;
199200

200201
import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
@@ -429,7 +430,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
429430
CheckpointDao checkpoint = new CheckpointDao(
430431
client,
431432
clientUtil,
432-
CommonName.CHECKPOINT_INDEX_NAME,
433+
ADCommonName.CHECKPOINT_INDEX_NAME,
433434
gson,
434435
mapper,
435436
converter,
@@ -454,7 +455,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
454455
CheckPointMaintainRequestAdapter adapter = new CheckPointMaintainRequestAdapter(
455456
cacheProvider,
456457
checkpoint,
457-
CommonName.CHECKPOINT_INDEX_NAME,
458+
ADCommonName.CHECKPOINT_INDEX_NAME,
458459
AnomalyDetectorSettings.CHECKPOINT_SAVING_FREQ,
459460
getClock(),
460461
clusterService,
@@ -477,7 +478,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
477478
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
478479
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
479480
checkpoint,
480-
CommonName.CHECKPOINT_INDEX_NAME,
481+
ADCommonName.CHECKPOINT_INDEX_NAME,
481482
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
482483
stateManager,
483484
AnomalyDetectorSettings.HOURLY_MAINTENANCE
@@ -625,23 +626,23 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
625626
)
626627
.put(
627628
StatNames.ANOMALY_DETECTORS_INDEX_STATUS.getName(),
628-
new ADStat<>(true, new IndexStatusSupplier(indexUtils, AnomalyDetector.ANOMALY_DETECTORS_INDEX))
629+
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.CONFIG_INDEX))
629630
)
630631
.put(
631632
StatNames.ANOMALY_RESULTS_INDEX_STATUS.getName(),
632-
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.ANOMALY_RESULT_INDEX_ALIAS))
633+
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS))
633634
)
634635
.put(
635636
StatNames.MODELS_CHECKPOINT_INDEX_STATUS.getName(),
636-
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.CHECKPOINT_INDEX_NAME))
637+
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.CHECKPOINT_INDEX_NAME))
637638
)
638639
.put(
639640
StatNames.ANOMALY_DETECTION_JOB_INDEX_STATUS.getName(),
640-
new ADStat<>(true, new IndexStatusSupplier(indexUtils, AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX))
641+
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.JOB_INDEX))
641642
)
642643
.put(
643644
StatNames.ANOMALY_DETECTION_STATE_STATUS.getName(),
644-
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.DETECTION_STATE_INDEX))
645+
new ADStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.DETECTION_STATE_INDEX))
645646
)
646647
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
647648
.put(StatNames.SINGLE_ENTITY_DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
@@ -752,7 +753,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
752753
client,
753754
settings,
754755
threadPool,
755-
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
756+
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
756757
anomalyDetectionIndices,
757758
this.clientUtil,
758759
this.indexUtils,
@@ -1010,7 +1011,7 @@ public String getJobType() {
10101011

10111012
@Override
10121013
public String getJobIndex() {
1013-
return AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
1014+
return CommonName.JOB_INDEX;
10141015
}
10151016

10161017
@Override

src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313

1414
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG;
1515
import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_PARSE_DETECTOR_MSG;
16-
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
17-
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
1816
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
1917
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
2018
import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
@@ -35,8 +33,8 @@
3533
import org.opensearch.action.search.SearchResponse;
3634
import org.opensearch.ad.common.exception.NotSerializedADExceptionName;
3735
import org.opensearch.ad.common.exception.ResourceNotFoundException;
36+
import org.opensearch.ad.constant.ADCommonName;
3837
import org.opensearch.ad.constant.CommonErrorMessages;
39-
import org.opensearch.ad.constant.CommonName;
4038
import org.opensearch.ad.model.ADTaskType;
4139
import org.opensearch.ad.model.AnomalyDetector;
4240
import org.opensearch.ad.model.AnomalyDetectorJob;
@@ -74,6 +72,7 @@
7472
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
7573
import org.opensearch.search.aggregations.metrics.InternalCardinality;
7674
import org.opensearch.search.builder.SearchSourceBuilder;
75+
import org.opensearch.timeseries.constant.CommonName;
7776
import org.opensearch.transport.TransportService;
7877

7978
public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
@@ -121,7 +120,7 @@ private void calculateTotalResponsesToWait(
121120
Set<DetectorProfileName> profilesToCollect,
122121
ActionListener<DetectorProfile> listener
123122
) {
124-
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
123+
GetRequest getDetectorRequest = new GetRequest(CommonName.CONFIG_INDEX, detectorId);
125124
client.get(getDetectorRequest, ActionListener.wrap(getDetectorResponse -> {
126125
if (getDetectorResponse != null && getDetectorResponse.isExists()) {
127126
try (
@@ -151,7 +150,7 @@ private void prepareProfile(
151150
Set<DetectorProfileName> profilesToCollect
152151
) {
153152
String detectorId = detector.getDetectorId();
154-
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
153+
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, detectorId);
155154
client.get(getRequest, ActionListener.wrap(getResponse -> {
156155
if (getResponse != null && getResponse.isExists()) {
157156
try (
@@ -292,14 +291,14 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
292291
if (categoryField.size() == 1) {
293292
// Run a cardinality aggregation to count the cardinality of single category fields
294293
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
295-
CardinalityAggregationBuilder aggBuilder = new CardinalityAggregationBuilder(CommonName.TOTAL_ENTITIES);
294+
CardinalityAggregationBuilder aggBuilder = new CardinalityAggregationBuilder(ADCommonName.TOTAL_ENTITIES);
296295
aggBuilder.field(categoryField.get(0));
297296
searchSourceBuilder.aggregation(aggBuilder);
298297

299298
SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
300299
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> {
301300
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
302-
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(CommonName.TOTAL_ENTITIES);
301+
InternalCardinality totalEntities = (InternalCardinality) aggMap.get(ADCommonName.TOTAL_ENTITIES);
303302
long value = totalEntities.getValue();
304303
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
305304
DetectorProfile profile = profileBuilder.totalEntities(value).build();
@@ -322,7 +321,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
322321
// Run a composite query and count the number of buckets to decide cardinality of multiple category fields
323322
AggregationBuilder bucketAggs = AggregationBuilders
324323
.composite(
325-
CommonName.TOTAL_ENTITIES,
324+
ADCommonName.TOTAL_ENTITIES,
326325
detector.getCategoryField().stream().map(f -> new TermsValuesSourceBuilder(f).field(f)).collect(Collectors.toList())
327326
)
328327
.size(maxTotalEntitiesToTrack);
@@ -344,7 +343,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
344343
return;
345344
}
346345

347-
Aggregation aggrResult = aggs.get(CommonName.TOTAL_ENTITIES);
346+
Aggregation aggrResult = aggs.get(ADCommonName.TOTAL_ENTITIES);
348347
if (aggrResult == null) {
349348
listener.onFailure(new IllegalArgumentException("Fail to find valid aggregation result"));
350349
return;
@@ -558,7 +557,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
558557
NotSerializedADExceptionName.RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE.getName()
559558
)
560559
|| (ExceptionUtil.isIndexNotAvailable(causeException)
561-
&& causeException.getMessage().contains(CommonName.CHECKPOINT_INDEX_NAME))) {
560+
&& causeException.getMessage().contains(ADCommonName.CHECKPOINT_INDEX_NAME))) {
562561
// cannot find checkpoint
563562
// We don't want to show the estimated time remaining to initialize
564563
// a detector before cold start finishes, where the actual

src/main/java/org/opensearch/ad/EntityProfileRunner.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111

1212
package org.opensearch.ad;
1313

14-
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
15-
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
1614
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
1715

1816
import java.util.List;
@@ -27,8 +25,8 @@
2725
import org.opensearch.action.get.GetRequest;
2826
import org.opensearch.action.search.SearchRequest;
2927
import org.opensearch.action.search.SearchResponse;
28+
import org.opensearch.ad.constant.ADCommonName;
3029
import org.opensearch.ad.constant.CommonErrorMessages;
31-
import org.opensearch.ad.constant.CommonName;
3230
import org.opensearch.ad.model.AnomalyDetector;
3331
import org.opensearch.ad.model.AnomalyDetectorJob;
3432
import org.opensearch.ad.model.AnomalyResult;
@@ -58,6 +56,7 @@
5856
import org.opensearch.index.query.TermQueryBuilder;
5957
import org.opensearch.search.aggregations.AggregationBuilders;
6058
import org.opensearch.search.builder.SearchSourceBuilder;
59+
import org.opensearch.timeseries.constant.CommonName;
6160

6261
public class EntityProfileRunner extends AbstractProfileRunner {
6362
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);
@@ -94,7 +93,7 @@ public void profile(
9493
listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
9594
return;
9695
}
97-
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
96+
GetRequest getDetectorRequest = new GetRequest(CommonName.CONFIG_INDEX, detectorId);
9897

9998
client.get(getDetectorRequest, ActionListener.wrap(getResponse -> {
10099
if (getResponse != null && getResponse.isExists()) {
@@ -220,7 +219,7 @@ private void getJob(
220219
EntityProfileResponse entityProfileResponse,
221220
ActionListener<EntityProfile> listener
222221
) {
223-
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
222+
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, detectorId);
224223
client.get(getRequest, ActionListener.wrap(getResponse -> {
225224
if (getResponse != null && getResponse.isExists()) {
226225
try (
@@ -457,15 +456,15 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable
457456

458457
boolQueryBuilder.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
459458

460-
boolQueryBuilder.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
459+
boolQueryBuilder.filter(QueryBuilders.rangeQuery(CommonName.EXECUTION_END_TIME_FIELD).gte(enabledTime));
461460

462461
SearchSourceBuilder source = new SearchSourceBuilder()
463462
.query(boolQueryBuilder)
464-
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
463+
.aggregation(AggregationBuilders.max(ADCommonName.AGG_NAME_MAX_TIME).field(CommonName.EXECUTION_END_TIME_FIELD))
465464
.trackTotalHits(false)
466465
.size(0);
467466

468-
SearchRequest request = new SearchRequest(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
467+
SearchRequest request = new SearchRequest(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS);
469468
request.source(source);
470469
if (resultIndex != null) {
471470
request.indices(resultIndex);

src/main/java/org/opensearch/ad/NodeStateManager.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import org.opensearch.action.get.GetRequest;
3232
import org.opensearch.action.get.GetResponse;
3333
import org.opensearch.ad.common.exception.EndRunException;
34+
import org.opensearch.ad.constant.ADCommonName;
3435
import org.opensearch.ad.constant.CommonErrorMessages;
35-
import org.opensearch.ad.constant.CommonName;
3636
import org.opensearch.ad.ml.SingleStreamModelIdMapper;
3737
import org.opensearch.ad.model.AnomalyDetector;
3838
import org.opensearch.ad.model.AnomalyDetectorJob;
@@ -48,6 +48,7 @@
4848
import org.opensearch.common.xcontent.XContentType;
4949
import org.opensearch.core.xcontent.NamedXContentRegistry;
5050
import org.opensearch.core.xcontent.XContentParser;
51+
import org.opensearch.timeseries.constant.CommonName;
5152

5253
/**
5354
* NodeStateManager is used to manage states shared by transport and ml components
@@ -130,7 +131,7 @@ public void getAnomalyDetector(String adID, ActionListener<Optional<AnomalyDetec
130131
if (state != null && state.getDetectorDef() != null) {
131132
listener.onResponse(Optional.of(state.getDetectorDef()));
132133
} else {
133-
GetRequest request = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX, adID);
134+
GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, adID);
134135
clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetDetectorResponse(adID, listener));
135136
}
136137
}
@@ -182,7 +183,7 @@ public void getDetectorCheckpoint(String adID, ActionListener<Boolean> listener)
182183
return;
183184
}
184185

185-
GetRequest request = new GetRequest(CommonName.CHECKPOINT_INDEX_NAME, SingleStreamModelIdMapper.getRcfModelId(adID, 0));
186+
GetRequest request = new GetRequest(ADCommonName.CHECKPOINT_INDEX_NAME, SingleStreamModelIdMapper.getRcfModelId(adID, 0));
186187

187188
clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetCheckpointResponse(adID, listener));
188189
}
@@ -375,7 +376,7 @@ public void getAnomalyDetectorJob(String adID, ActionListener<Optional<AnomalyDe
375376
if (state != null && state.getDetectorJob() != null) {
376377
listener.onResponse(Optional.of(state.getDetectorJob()));
377378
} else {
378-
GetRequest request = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, adID);
379+
GetRequest request = new GetRequest(CommonName.JOB_INDEX, adID);
379380
clientUtil.<GetRequest, GetResponse>asyncRequest(request, client::get, onGetDetectorJobResponse(adID, listener));
380381
}
381382
}

0 commit comments

Comments
 (0)