Skip to content

Commit 3078649

Browse files
Add task cancellation check in aggregation code paths (#18426)
--------- Signed-off-by: Kaushal Kumar <[email protected]>
1 parent 674de10 commit 3078649

20 files changed

+238
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
101101
- DocValues-only IP field supports `terms_query` with more than 1025 IP masks ([#18357](https://github.com/opensearch-project/OpenSearch/pull/18357)
102102
- Fix MatrixStatsAggregator reuse when mode parameter changes ([#18242](https://github.com/opensearch-project/OpenSearch/issues/18242))
103103
- Replace the deprecated construction method of TopScoreDocCollectorManager with the new method ([#18395](https://github.com/opensearch-project/OpenSearch/pull/18395))
104-
- Fixed Approximate Framework regression with Lucene 10.2.1 by updating `intersectRight` BKD walk and `IntRef` visit method ([#18358](https://github.com/opensearch-project/OpenSearch/issues/18358
104+
- Fixed Approximate Framework regression with Lucene 10.2.1 by updating `intersectRight` BKD walk and `IntRef` visit method ([#18358](https://github.com/opensearch-project/OpenSearch/issues/18358))
105+
- Add task cancellation checks in aggregators ([#18426](https://github.com/opensearch-project/OpenSearch/pull/18426))
105106

106107
### Security
107108

server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.core.common.breaker.CircuitBreaker;
3939
import org.opensearch.core.common.breaker.CircuitBreakingException;
4040
import org.opensearch.core.indices.breaker.CircuitBreakerService;
41+
import org.opensearch.core.tasks.TaskCancelledException;
4142
import org.opensearch.search.SearchShardTarget;
4243
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
4344
import org.opensearch.search.internal.SearchContext;
@@ -328,4 +329,10 @@ protected final InternalAggregations buildEmptySubAggregations() {
328329
public String toString() {
329330
return name;
330331
}
332+
333+
protected void checkCancelled() {
334+
if (context.isCancelled()) {
335+
throw new TaskCancelledException("The query has been cancelled");
336+
}
337+
}
331338
}

server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,11 @@ protected void beforeBuildingBuckets(long[] ordsToCollect) throws IOException {}
235235
* array of ordinals
236236
*/
237237
protected final InternalAggregations[] buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
238+
checkCancelled();
238239
beforeBuildingBuckets(bucketOrdsToCollect);
239240
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
240241
for (int i = 0; i < subAggregators.length; i++) {
242+
checkCancelled();
241243
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
242244
}
243245
InternalAggregations[] result = new InternalAggregations[bucketOrdsToCollect.length];
@@ -323,6 +325,7 @@ protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(
323325
BucketBuilderForFixedCount<B> bucketBuilder,
324326
Function<List<B>, InternalAggregation> resultBuilder
325327
) throws IOException {
328+
checkCancelled();
326329
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
327330
long[] bucketOrdsToCollect = new long[totalBuckets];
328331
int bucketOrdIdx = 0;
@@ -373,6 +376,7 @@ protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] ow
373376
* `consumeBucketsAndMaybeBreak(owningBucketOrds.length)`
374377
* here but we don't because single bucket aggs never have.
375378
*/
379+
checkCancelled();
376380
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
377381
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
378382
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
@@ -403,6 +407,7 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
403407
BucketBuilderForVariable<B> bucketBuilder,
404408
ResultBuilderForVariable<B> resultBuilder
405409
) throws IOException {
410+
checkCancelled();
406411
long totalOrdsToCollect = 0;
407412
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
408413
totalOrdsToCollect += bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);

server/src/main/java/org/opensearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ public void collect(int doc, long bucket) throws IOException {
208208

209209
@Override
210210
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
211+
checkCancelled();
211212
// Buckets are ordered into groups - [keyed filters] [key1&key2 intersects]
212213
int maxOrd = owningBucketOrds.length * totalNumKeys;
213214
int totalBucketsToBuild = 0;

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ protected void doPostCollection() throws IOException {
260260

261261
@Override
262262
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
263+
checkCancelled();
263264
// Composite aggregator must be at the top of the aggregation tree
264265
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
265266
if (deferredCollectors != NO_OP_COLLECTOR) {

server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
200200
owningBucketOrds,
201201
keys.length + (showOtherBucket ? 1 : 0),
202202
(offsetInOwningOrd, docCount, subAggregationResults) -> {
203+
checkCancelled();
203204
if (offsetInOwningOrd < keys.length) {
204205
return new InternalFilters.InternalBucket(keys[offsetInOwningOrd], docCount, subAggregationResults, keyed);
205206
}

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,12 @@ public AbstractHistogramAggregator(
104104
@Override
105105
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
106106
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
107+
checkCancelled();
107108
double roundKey = Double.longBitsToDouble(bucketValue);
108109
double key = roundKey * interval + offset;
109110
return new InternalHistogram.Bucket(key, docCount, keyed, formatter, subAggregationResults);
110111
}, (owningBucketOrd, buckets) -> {
112+
checkCancelled();
111113
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
112114
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
113115

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ protected final InternalAggregation[] buildAggregations(
285285
subAggregationResults
286286
),
287287
(owningBucketOrd, buckets) -> {
288+
checkCancelled();
288289
// the contract of the histogram aggregation is that shards must return
289290
// buckets ordered by key in ascending order
290291
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
@@ -733,6 +734,7 @@ private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucke
733734
private void rebucket() {
734735
rebucketCount++;
735736
try (LongKeyedBucketOrds oldOrds = bucketOrds) {
737+
checkCancelled();
736738
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
737739
bucketOrds = new LongKeyedBucketOrds.FromMany(context.bigArrays());
738740
for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) {

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
207207
subAggregationResults
208208
),
209209
(owningBucketOrd, buckets) -> {
210+
checkCancelled();
210211
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
211212
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
212213

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
585585

586586
List<InternalVariableWidthHistogram.Bucket> buckets = new ArrayList<>(numClusters);
587587
for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) {
588+
checkCancelled();
588589
buckets.add(collector.buildBucket(bucketOrd, subAggregationResults[bucketOrd]));
589590
}
590591

0 commit comments

Comments
 (0)