Skip to content

Commit d4b391d

Browse files
authored
Fix ESQL async get while task is being cancelled (#119897) (#119907)
ES|QL and EQL async queries do not store the initial response in the async index. This means that async-get should retrieve results from the task, not the async index, when the query task is being canceled, as no document exists there.
1 parent 37450c4 commit d4b391d

File tree

3 files changed

+64
-1
lines changed

3 files changed

+64
-1
lines changed

docs/changelog/119897.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119897
2+
summary: Fix ESQL async get while task is being cancelled
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private void getSearchResponseFromTask(
121121
) {
122122
try {
123123
final Task task = store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass);
124-
if (task == null || task.isCancelled()) {
124+
if (task == null || (updateInitialResultsInStore && task.isCancelled())) {
125125
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
126126
return;
127127
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

+58
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import org.elasticsearch.ResourceNotFoundException;
1111
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1212
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.compute.operator.DriverTaskRunner;
1314
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1415
import org.elasticsearch.core.TimeValue;
1516
import org.elasticsearch.plugins.Plugin;
17+
import org.elasticsearch.tasks.TaskCancelledException;
1618
import org.elasticsearch.tasks.TaskInfo;
1719
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
1820
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
@@ -34,8 +36,11 @@
3436
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isEmpty;
3537
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
3638
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
39+
import static org.hamcrest.Matchers.empty;
3740
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3842
import static org.hamcrest.Matchers.is;
43+
import static org.hamcrest.Matchers.not;
3944
import static org.hamcrest.Matchers.notNullValue;
4045

4146
/**
@@ -112,6 +117,59 @@ public void testBasicAsyncExecution() throws Exception {
112117
}
113118
}
114119

120+
public void testGetAsyncWhileQueryTaskIsBeingCancelled() throws Exception {
121+
try (var initialResponse = sendAsyncQuery()) {
122+
assertThat(initialResponse.asyncExecutionId(), isPresent());
123+
assertThat(initialResponse.isRunning(), is(true));
124+
String id = initialResponse.asyncExecutionId().get();
125+
// ensure we have started Lucene operators
126+
assertBusy(() -> {
127+
var tasks = client().admin()
128+
.cluster()
129+
.prepareListTasks()
130+
.setActions(DriverTaskRunner.ACTION_NAME)
131+
.setDetailed(true)
132+
.get()
133+
.getTasks()
134+
.stream()
135+
.filter(t -> t.description().contains("_LuceneSourceOperator"))
136+
.toList();
137+
assertThat(tasks.size(), greaterThanOrEqualTo(1));
138+
});
139+
client().admin().cluster().prepareCancelTasks().setActions(EsqlQueryAction.NAME + "[a]").get();
140+
assertBusy(() -> {
141+
List<TaskInfo> tasks = getEsqlQueryTasks().stream().filter(TaskInfo::cancelled).toList();
142+
assertThat(tasks, not(empty()));
143+
});
144+
// get the result while the query is being cancelled
145+
{
146+
var getResultsRequest = new GetAsyncResultRequest(id);
147+
getResultsRequest.setWaitForCompletionTimeout(timeValueMillis(10));
148+
getResultsRequest.setKeepAlive(randomKeepAlive());
149+
var future = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest);
150+
try (var resp = future.get()) {
151+
assertThat(initialResponse.asyncExecutionId(), isPresent());
152+
assertThat(resp.asyncExecutionId().get(), equalTo(id));
153+
assertThat(resp.isRunning(), is(true));
154+
}
155+
}
156+
// release the permits to allow the query to proceed
157+
scriptPermits.release(numberOfDocs());
158+
// get the result after the cancellation is done
159+
{
160+
var getResultsRequest = new GetAsyncResultRequest(id);
161+
getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(10));
162+
getResultsRequest.setKeepAlive(randomKeepAlive());
163+
var future = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest);
164+
TaskCancelledException error = expectThrows(TaskCancelledException.class, future::actionGet);
165+
assertThat(error.getMessage(), equalTo("by user request"));
166+
}
167+
assertTrue(deleteAsyncId(id).isAcknowledged());
168+
} finally {
169+
scriptPermits.drainPermits();
170+
}
171+
}
172+
115173
public void testAsyncCancellation() throws Exception {
116174
try (var initialResponse = sendAsyncQuery()) {
117175
assertThat(initialResponse.asyncExecutionId(), isPresent());

0 commit comments

Comments
 (0)