Skip to content

Commit 5704a16

Browse files
authored
forward port flaky test fix in PR #1319 and add forecasting security tests (#1329)
Signed-off-by: Kaituo Li <[email protected]>
1 parent 96f9fdc commit 5704a16

40 files changed

+93558
-391
lines changed

build.gradle

+4-4
Original file line numberDiff line numberDiff line change
@@ -339,12 +339,15 @@ integTest {
339339
filter {
340340
includeTestsMatching "org.opensearch.ad.rest.*IT"
341341
includeTestsMatching "org.opensearch.ad.e2e.*IT"
342+
includeTestsMatching "org.opensearch.forecast.rest.*IT"
343+
includeTestsMatching "org.opensearch.forecast.e2e.*IT"
342344
}
343345
}
344346

345347
if (System.getProperty("https") == null || System.getProperty("https") == "false") {
346348
filter {
347349
excludeTestsMatching "org.opensearch.ad.rest.SecureADRestIT"
350+
excludeTestsMatching "org.opensearch.forecast.rest.SecureForecastRestIT"
348351
}
349352
}
350353

@@ -468,6 +471,7 @@ task integTestRemote(type: RestIntegTestTask) {
468471
if (System.getProperty("https") == null || System.getProperty("https") == "false") {
469472
filter {
470473
excludeTestsMatching "org.opensearch.ad.rest.SecureADRestIT"
474+
excludeTestsMatching "org.opensearch.forecast.rest.SecureForecastRestIT"
471475
}
472476
}
473477
}
@@ -696,10 +700,7 @@ List<String> jacocoExclusions = [
696700

697701
// TODO: add test coverage (kaituo)
698702
'org.opensearch.forecast.*',
699-
'org.opensearch.timeseries.ml.TimeSeriesSingleStreamCheckpointDao',
700-
'org.opensearch.timeseries.transport.JobRequest',
701703
'org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler',
702-
'org.opensearch.timeseries.ml.Inferencer',
703704
'org.opensearch.timeseries.transport.SingleStreamResultRequest',
704705
'org.opensearch.timeseries.rest.handler.IndexJobActionHandler.1',
705706
'org.opensearch.timeseries.transport.SuggestConfigParamResponse',
@@ -727,7 +728,6 @@ List<String> jacocoExclusions = [
727728
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker',
728729
'org.opensearch.timeseries.util.TimeUtil',
729730
'org.opensearch.ad.transport.ADHCImputeTransportAction',
730-
'org.opensearch.timeseries.ml.RealTimeInferencer',
731731
]
732732

733733

src/main/java/org/opensearch/ad/ml/ADRealTimeInferencer.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import static org.opensearch.timeseries.TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME;
99

10+
import java.time.Clock;
11+
1012
import org.opensearch.ad.caching.ADCacheProvider;
1113
import org.opensearch.ad.caching.ADPriorityCache;
1214
import org.opensearch.ad.indices.ADIndex;
@@ -32,7 +34,8 @@ public ADRealTimeInferencer(
3234
ADColdStartWorker coldStartWorker,
3335
ADSaveResultStrategy resultWriteWorker,
3436
ADCacheProvider cache,
35-
ThreadPool threadPool
37+
ThreadPool threadPool,
38+
Clock clock
3639
) {
3740
super(
3841
modelManager,
@@ -43,7 +46,8 @@ public ADRealTimeInferencer(
4346
resultWriteWorker,
4447
cache,
4548
threadPool,
46-
AD_THREAD_POOL_NAME
49+
AD_THREAD_POOL_NAME,
50+
clock
4751
);
4852
}
4953

src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java

+7-10
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.opensearch.timeseries.ml.ModelState;
3232
import org.opensearch.timeseries.ml.Sample;
3333
import org.opensearch.timeseries.model.Config;
34-
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
3534
import org.opensearch.timeseries.util.ActionListenerExecutor;
3635
import org.opensearch.transport.TransportService;
3736

@@ -129,14 +128,12 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest
129128
return;
130129
}
131130
Config config = configOptional.get();
132-
long windowDelayMillis = ((IntervalTimeConfiguration) config.getWindowDelay()).toDuration().toMillis();
133131
int featureSize = config.getEnabledFeatureIds().size();
134132
long dataEndMillis = nodeRequest.getRequest().getDataEndMillis();
135133
long dataStartMillis = nodeRequest.getRequest().getDataStartMillis();
136-
long executionEndTime = dataEndMillis + windowDelayMillis;
137134
String taskId = nodeRequest.getRequest().getTaskId();
138135
for (ModelState<ThresholdedRandomCutForest> modelState : cache.get().getAllModels(configId)) {
139-
if (shouldProcessModelState(modelState, executionEndTime, clusterService, hashRing)) {
136+
if (shouldProcessModelState(modelState, dataEndMillis, clusterService, hashRing)) {
140137
double[] nanArray = new double[featureSize];
141138
Arrays.fill(nanArray, Double.NaN);
142139
adInferencer
@@ -163,8 +160,8 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest
163160
* Determines whether the model state should be processed based on various conditions.
164161
*
165162
* Conditions checked:
166-
* - The model's last seen execution end time is not the minimum Instant value.
167-
* - The current execution end time is greater than or equal to the model's last seen execution end time,
163+
* - The model's last seen data end time is not the minimum Instant value. This means the model hasn't been initialized yet.
164+
* - The current data end time is greater than the model's last seen data end time,
168165
* indicating that the model state was updated in previous intervals.
169166
* - The entity associated with the model state is present.
170167
* - The owning node for real-time processing of the entity, with the same local version, is present in the hash ring.
@@ -175,14 +172,14 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest
175172
* concurrently (e.g., during tests when multiple threads may operate quickly).
176173
*
177174
* @param modelState The current state of the model.
178-
* @param executionEndTime The end time of the current execution interval.
175+
* @param dataEndTime The data end time of current interval.
179176
* @param clusterService The service providing information about the current cluster node.
180177
* @param hashRing The hash ring used to determine the owning node for real-time processing of entities.
181178
* @return true if the model state should be processed; otherwise, false.
182179
*/
183180
private boolean shouldProcessModelState(
184181
ModelState<ThresholdedRandomCutForest> modelState,
185-
long executionEndTime,
182+
long dataEndTime,
186183
ClusterService clusterService,
187184
HashRing hashRing
188185
) {
@@ -194,8 +191,8 @@ private boolean shouldProcessModelState(
194191
// Check if the model state conditions are met for processing
195192
// We cannot use last used time as it will be updated whenever we update its priority in CacheBuffer.update when there is a
196193
// PriorityCache.get.
197-
return modelState.getLastSeenExecutionEndTime() != Instant.MIN
198-
&& executionEndTime >= modelState.getLastSeenExecutionEndTime().toEpochMilli()
194+
return modelState.getLastSeenDataEndTime() != Instant.MIN
195+
&& dataEndTime > modelState.getLastSeenDataEndTime().toEpochMilli()
199196
&& modelState.getEntity().isPresent()
200197
&& owningNode.isPresent()
201198
&& owningNode.get().getId().equals(clusterService.localNode().getId());

src/main/java/org/opensearch/forecast/ml/ForecastRealTimeInferencer.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import static org.opensearch.timeseries.TimeSeriesAnalyticsPlugin.FORECAST_THREAD_POOL_NAME;
99

10+
import java.time.Clock;
11+
1012
import org.opensearch.forecast.caching.ForecastCacheProvider;
1113
import org.opensearch.forecast.caching.ForecastPriorityCache;
1214
import org.opensearch.forecast.indices.ForecastIndex;
@@ -32,7 +34,8 @@ public ForecastRealTimeInferencer(
3234
ForecastColdStartWorker coldStartWorker,
3335
ForecastSaveResultStrategy resultWriteWorker,
3436
ForecastCacheProvider cache,
35-
ThreadPool threadPool
37+
ThreadPool threadPool,
38+
Clock clock
3639
) {
3740
super(
3841
modelManager,
@@ -43,7 +46,8 @@ public ForecastRealTimeInferencer(
4346
resultWriteWorker,
4447
cache,
4548
threadPool,
46-
FORECAST_THREAD_POOL_NAME
49+
FORECAST_THREAD_POOL_NAME,
50+
clock
4751
);
4852
}
4953

src/main/java/org/opensearch/forecast/rest/RestGetForecasterAction.java

-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
7171
all,
7272
RestHandlerUtils.buildEntity(request, forecasterId)
7373
);
74-
7574
return channel -> client.execute(GetForecasterAction.INSTANCE, getForecasterRequest, new RestToXContentListener<>(channel));
7675
} catch (IllegalArgumentException e) {
7776
throw new IllegalArgumentException(Encode.forHtml(e.getMessage()));

src/main/java/org/opensearch/forecast/transport/SearchTopForecastResultTransportAction.java

-1
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ private QueryBuilder generateBuildInSubFilter(SearchTopForecastResultRequest req
467467
*/
468468
private RangeQueryBuilder generateDateFilter(SearchTopForecastResultRequest request, Forecaster forecaster) {
469469
// forecast from is data end time for forecast
470-
// return QueryBuilders.termQuery(CommonName.DATA_END_TIME_FIELD, request.getForecastFrom().toEpochMilli());
471470
long startInclusive = request.getForecastFrom().toEpochMilli();
472471
long endExclusive = startInclusive + forecaster.getIntervalInMilliseconds();
473472
return QueryBuilders.rangeQuery(CommonName.DATA_END_TIME_FIELD).gte(startInclusive).lt(endExclusive);

src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
840840
adColdstartQueue,
841841
adSaveResultStrategy,
842842
adCacheProvider,
843-
threadPool
843+
threadPool,
844+
getClock()
844845
);
845846

846847
ADCheckpointReadWorker adCheckpointReadQueue = new ADCheckpointReadWorker(
@@ -1230,7 +1231,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
12301231
forecastColdstartQueue,
12311232
forecastSaveResultStrategy,
12321233
forecastCacheProvider,
1233-
threadPool
1234+
threadPool,
1235+
getClock()
12341236
);
12351237

12361238
ForecastCheckpointReadWorker forecastCheckpointReadQueue = new ForecastCheckpointReadWorker(

src/main/java/org/opensearch/timeseries/caching/PriorityCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public ModelState<RCFModelType> get(String modelId, Config config) {
174174
// reset every 60 intervals
175175
return new DoorKeeper(
176176
TimeSeriesSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION,
177-
config.getIntervalDuration().multipliedBy(TimeSeriesSettings.DOOR_KEEPER_MAINTENANCE_FREQ),
177+
config.getIntervalDuration().multipliedBy(TimeSeriesSettings.EXPIRING_VALUE_MAINTENANCE_FREQ),
178178
clock,
179179
TimeSeriesSettings.CACHE_DOOR_KEEPER_COUNT_THRESHOLD
180180
);

src/main/java/org/opensearch/timeseries/ml/ModelColdStart.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ private void coldStart(
241241
// reset every 60 intervals
242242
return new DoorKeeper(
243243
TimeSeriesSettings.DOOR_KEEPER_FOR_COLD_STARTER_MAX_INSERTION,
244-
config.getIntervalDuration().multipliedBy(TimeSeriesSettings.DOOR_KEEPER_MAINTENANCE_FREQ),
244+
config.getIntervalDuration().multipliedBy(TimeSeriesSettings.EXPIRING_VALUE_MAINTENANCE_FREQ),
245245
clock,
246246
TimeSeriesSettings.COLD_START_DOOR_KEEPER_COUNT_THRESHOLD
247247
);
@@ -251,7 +251,7 @@ private void coldStart(
251251
logger
252252
.info(
253253
"Won't retry real-time cold start within {} intervals for model {}",
254-
TimeSeriesSettings.DOOR_KEEPER_MAINTENANCE_FREQ,
254+
TimeSeriesSettings.EXPIRING_VALUE_MAINTENANCE_FREQ,
255255
modelId
256256
);
257257
return;

src/main/java/org/opensearch/timeseries/ml/ModelManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public <RCFDescriptor extends AnomalyDescriptor> IntermediateResultType score(
169169
throw e;
170170
} finally {
171171
modelState.setLastUsedTime(clock.instant());
172-
modelState.setLastSeenExecutionEndTime(clock.instant());
172+
modelState.setLastSeenDataEndTime(sample.getDataEndTime());
173173
}
174174
return createEmptyResult();
175175
}

src/main/java/org/opensearch/timeseries/ml/ModelState.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class ModelState<T> implements org.opensearch.timeseries.ExpiringState {
3636
// time when the ML model was used last time
3737
protected Instant lastUsedTime;
3838
protected Instant lastCheckpointTime;
39-
protected Instant lastSeenExecutionEndTime;
39+
protected Instant lastSeenDataEndTime;
4040
protected Clock clock;
4141
protected float priority;
4242
protected Deque<Sample> samples;
@@ -75,7 +75,7 @@ public ModelState(
7575
this.priority = priority;
7676
this.entity = entity;
7777
this.samples = samples;
78-
this.lastSeenExecutionEndTime = Instant.MIN;
78+
this.lastSeenDataEndTime = Instant.MIN;
7979
}
8080

8181
/**
@@ -252,11 +252,11 @@ public Map<String, Object> getModelStateAsMap() {
252252
};
253253
}
254254

255-
public Instant getLastSeenExecutionEndTime() {
256-
return lastSeenExecutionEndTime;
255+
public Instant getLastSeenDataEndTime() {
256+
return lastSeenDataEndTime;
257257
}
258258

259-
public void setLastSeenExecutionEndTime(Instant lastSeenExecutionEndTime) {
260-
this.lastSeenExecutionEndTime = lastSeenExecutionEndTime;
259+
public void setLastSeenDataEndTime(Instant lastSeenExecutionEndTime) {
260+
this.lastSeenDataEndTime = lastSeenExecutionEndTime;
261261
}
262262
}

0 commit comments

Comments
 (0)