Skip to content

Commit e33ff46

Browse files
add a feature that flattens custom result index when enabled (#1401)
* add a feature that flattens custom result index when enabled Signed-off-by: Jackie Han <jkhanjob@gmail.com> * clean up Signed-off-by: Jackie Han <jkhanjob@gmail.com> * add IT Signed-off-by: Jackie Han <hnyng@amazon.com> * cleanup Signed-off-by: Jackie Han <hnyng@amazon.com> * address comments Signed-off-by: Jackie Han <hnyng@amazon.com> * update IT Signed-off-by: Jackie Han <hnyng@amazon.com> * update IT Signed-off-by: Jackie Han <hnyng@amazon.com> * clean up Signed-off-by: Jackie Han <hnyng@amazon.com> * add more IT Signed-off-by: Jackie Han <hnyng@amazon.com> * add more IT Signed-off-by: Jackie Han <hnyng@amazon.com> * add more IT Signed-off-by: Jackie Han <hnyng@amazon.com> * address comments * address comments Signed-off-by: Jackie Han <hnyng@amazon.com> * address comments Signed-off-by: Jackie Han <hnyng@amazon.com> * utlizing a node state manager when writing results into flattened result index Signed-off-by: Jackie Han <hnyng@amazon.com> * build flatten resuilt index enabled into the ResultWriteRequest Signed-off-by: Jackie Han <hnyng@amazon.com> * unbind ingest pipeline with flattened result index when it's disabled Signed-off-by: Jackie Han <hnyng@amazon.com> * test * address comments * cleanup Signed-off-by: Jackie Han <hnyng@amazon.com> * make flatten result index use detector name Signed-off-by: Jackie Han <hnyng@amazon.com> --------- Signed-off-by: Jackie Han <jkhanjob@gmail.com> Signed-off-by: Jackie Han <hnyng@amazon.com>
1 parent d2a30d8 commit e33ff46

27 files changed

Lines changed: 858 additions & 72 deletions

src/main/java/org/opensearch/ad/indices/ADIndexManagement.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.io.IOException;
2424
import java.util.EnumMap;
25+
import java.util.Map;
2526

2627
import org.apache.logging.log4j.LogManager;
2728
import org.apache.logging.log4j.Logger;
@@ -45,6 +46,8 @@
4546
import org.opensearch.timeseries.indices.IndexManagement;
4647
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
4748

49+
import com.fasterxml.jackson.databind.ObjectMapper;
50+
4851
/**
4952
* This class provides utility methods for various anomaly detection indices.
5053
*/
@@ -122,6 +125,22 @@ public static String getResultMappings() throws IOException {
122125
return getMappings(ANOMALY_RESULTS_INDEX_MAPPING_FILE);
123126
}
124127

128+
/**
129+
* Retrieves the JSON mapping for the flattened result index with the "dynamic" field set to true
130+
* @return JSON mapping for the flattened result index.
131+
* @throws IOException if the mapping file cannot be read.
132+
*/
133+
public static String getFlattenedResultMappings() throws IOException {
134+
ObjectMapper objectMapper = new ObjectMapper();
135+
136+
Map<String, Object> mapping = objectMapper
137+
.readValue(ADIndexManagement.class.getClassLoader().getResourceAsStream(ANOMALY_RESULTS_INDEX_MAPPING_FILE), Map.class);
138+
139+
mapping.put("dynamic", true);
140+
141+
return objectMapper.writeValueAsString(mapping);
142+
}
143+
125144
/**
126145
* Get anomaly detector state index mapping json content.
127146
*

src/main/java/org/opensearch/ad/model/AnomalyDetector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ public static AnomalyDetector parse(
592592
case RESULT_INDEX_FIELD_TTL:
593593
customResultIndexTTL = onlyParseNumberValue(parser);
594594
break;
595-
case FLATTEN_RESULT_INDEX_MAPPING:
595+
case FLATTEN_CUSTOM_RESULT_INDEX:
596596
flattenResultIndexMapping = onlyParseBooleanValue(parser);
597597
break;
598598
case BREAKING_UI_CHANGE_TIME:

src/main/java/org/opensearch/ad/ratelimit/ADResultWriteRequest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ public ADResultWriteRequest(
2525
String detectorId,
2626
RequestPriority priority,
2727
AnomalyResult result,
28-
String resultIndex
28+
String resultIndex,
29+
String flattenResultIndex
2930
) {
30-
super(expirationEpochMs, detectorId, priority, result, resultIndex);
31+
super(expirationEpochMs, detectorId, priority, result, resultIndex, flattenResultIndex);
3132
}
3233

3334
public ADResultWriteRequest(StreamInput in) throws IOException {

src/main/java/org/opensearch/ad/ratelimit/ADResultWriteWorker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ protected ADResultWriteRequest createResultWriteRequest(
103103
String configId,
104104
RequestPriority priority,
105105
AnomalyResult result,
106-
String resultIndex
106+
String resultIndex,
107+
String flattenResultIndex
107108
) {
108-
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
109+
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
109110
}
110111
}

src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public void saveResult(AnomalyResult result, Config config) {
8686
config.getId(),
8787
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
8888
result,
89-
config.getCustomResultIndexOrAlias()
89+
config.getCustomResultIndexOrAlias(),
90+
config.getFlattenResultIndexAlias()
9091
)
9192
);
9293
}

src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
public class ADResultBulkTransportAction extends ResultBulkTransportAction<AnomalyResult, ADResultWriteRequest, ADResultBulkRequest> {
4040

4141
private static final Logger LOG = LogManager.getLogger(ADResultBulkTransportAction.class);
42+
private final ClusterService clusterService;
43+
private final Client client;
4244

4345
@Inject
4446
public ADResultBulkTransportAction(
@@ -61,39 +63,77 @@ public ADResultBulkTransportAction(
6163
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
6264
ADResultBulkRequest::new
6365
);
66+
this.clusterService = clusterService;
67+
this.client = client;
6468
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_SOFT_LIMIT, it -> softLimit = it);
6569
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_HARD_LIMIT, it -> hardLimit = it);
6670
}
6771

72+
/**
73+
* Prepares a {@link BulkRequest} for indexing anomaly detection results.
74+
*
75+
* This method processes a list of anomaly detection results provided in the {@link ADResultBulkRequest}.
76+
* Each result is evaluated based on the current indexing pressure and result priority. If a flattened
77+
* result index exists for the result, the result is also added to the flattened index.
78+
*
79+
* @param indexingPressurePercent the current percentage of indexing pressure. This value influences
80+
* whether a result is indexed based on predefined thresholds and probabilities.
81+
* @param request the {@link ADResultBulkRequest} containing anomaly detection results
82+
* to be processed.
83+
* @return a {@link BulkRequest} containing all results that are eligible for indexing.
84+
*
85+
* <p><b>Behavior:</b></p>
86+
* <ul>
87+
* <li>Results are added to the bulk request if the indexing pressure is within acceptable limits
88+
* or the result has high priority.</li>
89+
* <li>If a flattened result index exists for a result, it is added to the flattened index in addition
90+
* to the primary index.</li>
91+
* </ul>
92+
*
93+
* <p><b>Indexing Pressure Thresholds:</b></p>
94+
* <ul>
95+
* <li>Below the soft limit: All results are added.</li>
96+
* <li>Between the soft limit and the hard limit: High-priority results are always added, and
97+
* other results are added based on a probability that decreases with increasing pressure.</li>
98+
* <li>Above the hard limit: Only high-priority results are added.</li>
99+
* </ul>
100+
*
101+
* @see ADResultBulkRequest
102+
* @see BulkRequest
103+
* @see ADResultWriteRequest
104+
*/
68105
@Override
69106
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) {
70107
BulkRequest bulkRequest = new BulkRequest();
71108
List<ADResultWriteRequest> results = request.getResults();
72109

73-
if (indexingPressurePercent <= softLimit) {
74-
for (ADResultWriteRequest resultWriteRequest : results) {
75-
addResult(bulkRequest, resultWriteRequest.getResult(), resultWriteRequest.getResultIndex());
110+
for (ADResultWriteRequest resultWriteRequest : results) {
111+
AnomalyResult result = resultWriteRequest.getResult();
112+
String resultIndex = resultWriteRequest.getResultIndex();
113+
114+
if (shouldAddResult(indexingPressurePercent, result)) {
115+
addResult(bulkRequest, result, resultIndex);
116+
if (resultWriteRequest.getFlattenResultIndex() != null) {
117+
addResult(bulkRequest, result, resultWriteRequest.getFlattenResultIndex());
118+
}
76119
}
120+
}
121+
122+
return bulkRequest;
123+
}
124+
125+
private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult result) {
126+
if (indexingPressurePercent <= softLimit) {
127+
// Always add when below soft limit
128+
return true;
77129
} else if (indexingPressurePercent <= hardLimit) {
78130
// exceed soft limit (60%) but smaller than hard limit (90%)
79131
float acceptProbability = 1 - indexingPressurePercent;
80-
for (ADResultWriteRequest resultWriteRequest : results) {
81-
AnomalyResult result = resultWriteRequest.getResult();
82-
if (result.isHighPriority() || random.nextFloat() < acceptProbability) {
83-
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
84-
}
85-
}
132+
return result.isHighPriority() || random.nextFloat() < acceptProbability;
86133
} else {
87134
// if exceeding hard limit, only index non-zero grade or error result
88-
for (ADResultWriteRequest resultWriteRequest : results) {
89-
AnomalyResult result = resultWriteRequest.getResult();
90-
if (result.isHighPriority()) {
91-
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
92-
}
93-
}
135+
return result.isHighPriority();
94136
}
95-
96-
return bulkRequest;
97137
}
98138

99139
private void addResult(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {

src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
7272
config.getId(),
7373
RequestPriority.MEDIUM,
7474
result,
75-
config.getCustomResultIndexOrAlias()
75+
config.getCustomResultIndexOrAlias(),
76+
config.getFlattenResultIndexAlias()
7677
);
7778
}
7879

src/main/java/org/opensearch/forecast/model/Forecaster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ public static Forecaster parse(
437437
case RESULT_INDEX_FIELD_TTL:
438438
customResultIndexTTL = parser.intValue();
439439
break;
440-
case FLATTEN_RESULT_INDEX_MAPPING:
440+
case FLATTEN_CUSTOM_RESULT_INDEX:
441441
flattenResultIndexMapping = parser.booleanValue();
442442
break;
443443
case BREAKING_UI_CHANGE_TIME:

src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteRequest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ public ForecastResultWriteRequest(
2525
String forecasterId,
2626
RequestPriority priority,
2727
ForecastResult result,
28-
String resultIndex
28+
String resultIndex,
29+
String flattenResultIndex
2930
) {
30-
super(expirationEpochMs, forecasterId, priority, result, resultIndex);
31+
super(expirationEpochMs, forecasterId, priority, result, resultIndex, flattenResultIndex);
3132
}
3233

3334
public ForecastResultWriteRequest(StreamInput in) throws IOException {

src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteWorker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ protected ForecastResultWriteRequest createResultWriteRequest(
103103
String configId,
104104
RequestPriority priority,
105105
ForecastResult result,
106-
String resultIndex
106+
String resultIndex,
107+
String flattenResultIndex
107108
) {
108-
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
109+
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
109110
}
110111
}

0 commit comments

Comments
 (0)