Skip to content

Commit c332506

Browse files
Add search backpressure service check for query group tasks (#17576) (#17591)
* add validation check for SBP service * add SBP should not handle the task tracking UT * add CHANGELOG entry * fix broken UT * address comments --------- (cherry picked from commit 8ee5eeb) Signed-off-by: Kaushal Kumar <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 87b4b91 commit c332506

File tree

3 files changed

+16
-4
lines changed

3 files changed

+16
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3939
- Fix visit of inner query for FunctionScoreQueryBuilder ([#16776](https://github.com/opensearch-project/OpenSearch/pull/16776))
4040
- Fix case insensitive and escaped query on wildcard ([#16827](https://github.com/opensearch-project/OpenSearch/pull/16827))
4141
- Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781))
42+
- Fix NPE in node stats due to QueryGroupTasks ([#17576](https://github.com/opensearch-project/OpenSearch/pull/17576))
4243

4344
### Security
4445

server/src/main/java/org/opensearch/wlm/QueryGroupService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public Set<QueryGroup> getDeletedQueryGroups() {
331331
public boolean shouldSBPHandle(Task t) {
332332
QueryGroupTask task = (QueryGroupTask) t;
333333
boolean isInvalidQueryGroupTask = true;
334-
if (!task.getQueryGroupId().equals(QueryGroupTask.DEFAULT_QUERY_GROUP_ID_SUPPLIER.get())) {
334+
if (task.isQueryGroupSet() && !QueryGroupTask.DEFAULT_QUERY_GROUP_ID_SUPPLIER.get().equals(task.getQueryGroupId())) {
335335
isInvalidQueryGroupTask = activeQueryGroups.stream()
336336
.noneMatch(queryGroup -> queryGroup.get_id().equals(task.getQueryGroupId()));
337337
}
@@ -340,7 +340,7 @@ public boolean shouldSBPHandle(Task t) {
340340

341341
@Override
342342
public void onTaskCompleted(Task task) {
343-
if (!(task instanceof QueryGroupTask)) {
343+
if (!(task instanceof QueryGroupTask) || !((QueryGroupTask) task).isQueryGroupSet()) {
344344
return;
345345
}
346346
final QueryGroupTask queryGroupTask = (QueryGroupTask) task;

server/src/test/java/org/opensearch/wlm/QueryGroupServiceTests.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ public void testRejectIfNeeded_whenFeatureIsNotEnabled() {
395395
}
396396

397397
public void testOnTaskCompleted() {
398-
Task task = createMockTaskWithResourceStats(SearchTask.class, 100, 200, 0, 12);
398+
Task task = new SearchTask(12, "", "", () -> "", null, null);
399399
mockThreadPool = new TestThreadPool("queryGroupServiceTests");
400400
mockThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, "testId");
401401
QueryGroupState queryGroupState = new QueryGroupState();
@@ -442,7 +442,7 @@ public void testOnTaskCompleted() {
442442
}
443443

444444
public void testShouldSBPHandle() {
445-
QueryGroupTask task = createMockTaskWithResourceStats(SearchTask.class, 100, 200, 0, 12);
445+
SearchTask task = createMockTaskWithResourceStats(SearchTask.class, 100, 200, 0, 12);
446446
QueryGroupState queryGroupState = new QueryGroupState();
447447
Set<QueryGroup> activeQueryGroups = new HashSet<>();
448448
mockQueryGroupStateMap.put("testId", queryGroupState);
@@ -464,6 +464,8 @@ public void testShouldSBPHandle() {
464464
mockThreadPool = new TestThreadPool("queryGroupServiceTests");
465465
mockThreadPool.getThreadContext()
466466
.putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, QueryGroupTask.DEFAULT_QUERY_GROUP_ID_SUPPLIER.get());
467+
// we haven't set the queryGroupId yet SBP should still track the task for cancellation
468+
assertTrue(queryGroupService.shouldSBPHandle(task));
467469
task.setQueryGroupId(mockThreadPool.getThreadContext());
468470
assertTrue(queryGroupService.shouldSBPHandle(task));
469471

@@ -490,6 +492,15 @@ public void testShouldSBPHandle() {
490492
);
491493
assertTrue(queryGroupService.shouldSBPHandle(task));
492494

495+
mockThreadPool.shutdownNow();
496+
497+
// test the case when SBP should not track the task
498+
when(mockWorkloadManagementSettings.getWlmMode()).thenReturn(WlmMode.ENABLED);
499+
task = new SearchTask(1, "", "test", () -> "", null, null);
500+
mockThreadPool = new TestThreadPool("queryGroupServiceTests");
501+
mockThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, "testId");
502+
task.setQueryGroupId(mockThreadPool.getThreadContext());
503+
assertFalse(queryGroupService.shouldSBPHandle(task));
493504
}
494505

495506
private static Set<QueryGroup> getActiveQueryGroups(

0 commit comments

Comments
 (0)