Skip to content

Commit 3dd1729

Browse files
authored
Change JobExecutionResponseReader to an interface (opensearch-project#2693)
* Change JobExecutionResponseReader to an interface Signed-off-by: Tomoyuki Morita <[email protected]> * Fix comment Signed-off-by: Tomoyuki Morita <[email protected]> --------- Signed-off-by: Tomoyuki Morita <[email protected]>
1 parent 5a2c199 commit 3dd1729

File tree

8 files changed

+121
-91
lines changed

8 files changed

+121
-91
lines changed

spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class BatchQueryHandler extends AsyncQueryHandler {
4242
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
4343
// either empty json when the result is not available or data with status
4444
// Fetch from Result Index
45-
return jobExecutionResponseReader.getResultFromOpensearchIndex(
45+
return jobExecutionResponseReader.getResultWithJobId(
4646
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());
4747
}
4848

spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java

+15-65
Original file line numberDiff line numberDiff line change
@@ -5,75 +5,25 @@
55

66
package org.opensearch.sql.spark.response;
77

8-
import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX;
9-
import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD;
10-
import static org.opensearch.sql.spark.data.constants.SparkConstants.JOB_ID_FIELD;
11-
12-
import org.apache.logging.log4j.LogManager;
13-
import org.apache.logging.log4j.Logger;
148
import org.json.JSONObject;
15-
import org.opensearch.action.search.SearchRequest;
16-
import org.opensearch.action.search.SearchResponse;
17-
import org.opensearch.client.Client;
18-
import org.opensearch.common.action.ActionFuture;
19-
import org.opensearch.index.IndexNotFoundException;
20-
import org.opensearch.index.query.QueryBuilder;
21-
import org.opensearch.index.query.QueryBuilders;
22-
import org.opensearch.search.SearchHit;
23-
import org.opensearch.search.builder.SearchSourceBuilder;
24-
25-
public class JobExecutionResponseReader {
26-
private final Client client;
27-
private static final Logger LOG = LogManager.getLogger();
289

10+
/** Interface for reading job execution result */
11+
public interface JobExecutionResponseReader {
2912
/**
30-
* JobExecutionResponseReader for spark query.
13+
* Retrieves the job execution result based on the job ID.
3114
*
32-
* @param client Opensearch client
15+
* @param jobId The job ID.
16+
* @param resultLocation The location identifier where the result is stored (optional).
17+
* @return A JSONObject containing the result data.
3318
*/
34-
public JobExecutionResponseReader(Client client) {
35-
this.client = client;
36-
}
37-
38-
public JSONObject getResultFromOpensearchIndex(String jobId, String resultIndex) {
39-
return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, jobId), resultIndex);
40-
}
41-
42-
public JSONObject getResultWithQueryId(String queryId, String resultIndex) {
43-
return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultIndex);
44-
}
19+
JSONObject getResultWithJobId(String jobId, String resultLocation);
4520

46-
private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) {
47-
SearchRequest searchRequest = new SearchRequest();
48-
String searchResultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex;
49-
searchRequest.indices(searchResultIndex);
50-
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
51-
searchSourceBuilder.query(query);
52-
searchRequest.source(searchSourceBuilder);
53-
ActionFuture<SearchResponse> searchResponseActionFuture;
54-
JSONObject data = new JSONObject();
55-
try {
56-
searchResponseActionFuture = client.search(searchRequest);
57-
} catch (IndexNotFoundException e) {
58-
// if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty
59-
// json
60-
LOG.info(resultIndex + " is not created yet.");
61-
return data;
62-
} catch (Exception e) {
63-
throw new RuntimeException(e);
64-
}
65-
SearchResponse searchResponse = searchResponseActionFuture.actionGet();
66-
if (searchResponse.status().getStatus() != 200) {
67-
throw new RuntimeException(
68-
"Fetching result from "
69-
+ searchResultIndex
70-
+ " index failed with status : "
71-
+ searchResponse.status());
72-
} else {
73-
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
74-
data.put(DATA_FIELD, searchHit.getSourceAsMap());
75-
}
76-
return data;
77-
}
78-
}
21+
/**
22+
* Retrieves the job execution result based on the query ID.
23+
*
24+
* @param queryId The query ID.
25+
* @param resultLocation The location identifier where the result is stored (optional).
26+
* @return A JSONObject containing the result data.
27+
*/
28+
JSONObject getResultWithQueryId(String queryId, String resultLocation);
7929
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.spark.response;
7+
8+
import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX;
9+
import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD;
10+
import static org.opensearch.sql.spark.data.constants.SparkConstants.JOB_ID_FIELD;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.json.JSONObject;
15+
import org.opensearch.action.search.SearchRequest;
16+
import org.opensearch.action.search.SearchResponse;
17+
import org.opensearch.client.Client;
18+
import org.opensearch.common.action.ActionFuture;
19+
import org.opensearch.index.IndexNotFoundException;
20+
import org.opensearch.index.query.QueryBuilder;
21+
import org.opensearch.index.query.QueryBuilders;
22+
import org.opensearch.search.SearchHit;
23+
import org.opensearch.search.builder.SearchSourceBuilder;
24+
25+
/** JobExecutionResponseReader implementation for reading response from OpenSearch index. */
26+
public class OpenSearchJobExecutionResponseReader implements JobExecutionResponseReader {
27+
private final Client client;
28+
private static final Logger LOG = LogManager.getLogger();
29+
30+
public OpenSearchJobExecutionResponseReader(Client client) {
31+
this.client = client;
32+
}
33+
34+
@Override
35+
public JSONObject getResultWithJobId(String jobId, String resultLocation) {
36+
return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, jobId), resultLocation);
37+
}
38+
39+
@Override
40+
public JSONObject getResultWithQueryId(String queryId, String resultLocation) {
41+
return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultLocation);
42+
}
43+
44+
private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) {
45+
SearchRequest searchRequest = new SearchRequest();
46+
String searchResultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex;
47+
searchRequest.indices(searchResultIndex);
48+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
49+
searchSourceBuilder.query(query);
50+
searchRequest.source(searchSourceBuilder);
51+
ActionFuture<SearchResponse> searchResponseActionFuture;
52+
JSONObject data = new JSONObject();
53+
try {
54+
searchResponseActionFuture = client.search(searchRequest);
55+
} catch (IndexNotFoundException e) {
56+
// if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty
57+
// json
58+
LOG.info(resultIndex + " is not created yet.");
59+
return data;
60+
} catch (Exception e) {
61+
throw new RuntimeException(e);
62+
}
63+
SearchResponse searchResponse = searchResponseActionFuture.actionGet();
64+
if (searchResponse.status().getStatus() != 200) {
65+
throw new RuntimeException(
66+
"Fetching result from "
67+
+ searchResultIndex
68+
+ " index failed with status : "
69+
+ searchResponse.status());
70+
} else {
71+
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
72+
data.put(DATA_FIELD, searchHit.getSourceAsMap());
73+
}
74+
return data;
75+
}
76+
}
77+
}

spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
4646
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
4747
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
48+
import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader;
4849

4950
@RequiredArgsConstructor
5051
public class AsyncExecutorServiceModule extends AbstractModule {
@@ -87,15 +88,15 @@ public SparkQueryDispatcher sparkQueryDispatcher(
8788

8889
@Provides
8990
public QueryHandlerFactory queryhandlerFactory(
90-
JobExecutionResponseReader jobExecutionResponseReader,
91+
JobExecutionResponseReader openSearchJobExecutionResponseReader,
9192
FlintIndexMetadataServiceImpl flintIndexMetadataReader,
9293
SessionManager sessionManager,
9394
DefaultLeaseManager defaultLeaseManager,
9495
IndexDMLResultStorageService indexDMLResultStorageService,
9596
FlintIndexOpFactory flintIndexOpFactory,
9697
EMRServerlessClientFactory emrServerlessClientFactory) {
9798
return new QueryHandlerFactory(
98-
jobExecutionResponseReader,
99+
openSearchJobExecutionResponseReader,
99100
flintIndexMetadataReader,
100101
sessionManager,
101102
defaultLeaseManager,
@@ -172,7 +173,7 @@ public FlintIndexMetadataServiceImpl flintIndexMetadataReader(NodeClient client)
172173

173174
@Provides
174175
public JobExecutionResponseReader jobExecutionResponseReader(NodeClient client) {
175-
return new JobExecutionResponseReader(client);
176+
return new OpenSearchJobExecutionResponseReader(client);
176177
}
177178

178179
private void registerStateStoreMetrics(StateStore stateStore) {

spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
8282
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
8383
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
84+
import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader;
8485
import org.opensearch.sql.storage.DataSourceFactory;
8586
import org.opensearch.test.OpenSearchIntegTestCase;
8687

@@ -225,7 +226,7 @@ private DataSourceServiceImpl createDataSourceService() {
225226
protected AsyncQueryExecutorService createAsyncQueryExecutorService(
226227
EMRServerlessClientFactory emrServerlessClientFactory) {
227228
return createAsyncQueryExecutorService(
228-
emrServerlessClientFactory, new JobExecutionResponseReader(client));
229+
emrServerlessClientFactory, new OpenSearchJobExecutionResponseReader(client));
229230
}
230231

231232
/** Pass a custom response reader which can mock interaction between PPL plugin and EMR-S job. */

spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.sql.spark.execution.statement.StatementState;
3434
import org.opensearch.sql.spark.flint.FlintIndexType;
3535
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
36+
import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader;
3637
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
3738
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
3839
import org.opensearch.sql.spark.rest.model.LangType;
@@ -425,9 +426,9 @@ private class AssertionHelper {
425426
* current interaction. Intercept both get methods for different query handler which
426427
* will only call either of them.
427428
*/
428-
new JobExecutionResponseReader(client) {
429+
new JobExecutionResponseReader() {
429430
@Override
430-
public JSONObject getResultFromOpensearchIndex(String jobId, String resultIndex) {
431+
public JSONObject getResultWithJobId(String jobId, String resultIndex) {
431432
return interaction.interact(new InteractionStep(emrClient, jobId, resultIndex));
432433
}
433434

@@ -497,7 +498,8 @@ private InteractionStep(LocalEMRSClient emrClient, String queryId, String result
497498

498499
/** Simulate PPL plugin search query_execution_result */
499500
JSONObject pluginSearchQueryResult() {
500-
return new JobExecutionResponseReader(client).getResultWithQueryId(queryId, resultIndex);
501+
return new OpenSearchJobExecutionResponseReader(client)
502+
.getResultWithQueryId(queryId, resultIndex);
501503
}
502504

503505
/** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */

spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,7 @@ void testGetQueryResponse() {
906906
when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID))
907907
.thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.PENDING)));
908908
// simulate result index is not created yet
909-
when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null))
909+
when(jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null))
910910
.thenReturn(new JSONObject());
911911

912912
JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata());
@@ -978,12 +978,11 @@ void testGetQueryResponseWithSuccess() {
978978
resultMap.put(STATUS_FIELD, "SUCCESS");
979979
resultMap.put(ERROR_FIELD, "");
980980
queryResult.put(DATA_FIELD, resultMap);
981-
when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null))
982-
.thenReturn(queryResult);
981+
when(jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null)).thenReturn(queryResult);
983982

984983
JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata());
985984

986-
verify(jobExecutionResponseReader, times(1)).getResultFromOpensearchIndex(EMR_JOB_ID, null);
985+
verify(jobExecutionResponseReader, times(1)).getResultWithJobId(EMR_JOB_ID, null);
987986
Assertions.assertEquals(
988987
new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet());
989988
JSONObject dataJson = new JSONObject();

spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java spark/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.junit.jupiter.api.Assertions;
1919
import org.junit.jupiter.api.Test;
2020
import org.junit.jupiter.api.extension.ExtendWith;
21+
import org.mockito.InjectMocks;
2122
import org.mockito.Mock;
2223
import org.mockito.Mockito;
2324
import org.mockito.junit.jupiter.MockitoExtension;
@@ -30,12 +31,14 @@
3031
import org.opensearch.search.SearchHits;
3132

3233
@ExtendWith(MockitoExtension.class)
33-
public class AsyncQueryExecutionResponseReaderTest {
34+
public class OpenSearchJobExecutionResponseReaderTest {
3435
@Mock private Client client;
3536
@Mock private SearchResponse searchResponse;
3637
@Mock private SearchHit searchHit;
3738
@Mock private ActionFuture<SearchResponse> searchResponseActionFuture;
3839

40+
@InjectMocks OpenSearchJobExecutionResponseReader jobExecutionResponseReader;
41+
3942
@Test
4043
public void testGetResultFromOpensearchIndex() {
4144
when(client.search(any())).thenReturn(searchResponseActionFuture);
@@ -46,9 +49,8 @@ public void testGetResultFromOpensearchIndex() {
4649
new SearchHits(
4750
new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F));
4851
Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID));
49-
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
50-
assertFalse(
51-
jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null).isEmpty());
52+
53+
assertFalse(jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null).isEmpty());
5254
}
5355

5456
@Test
@@ -61,9 +63,8 @@ public void testGetResultFromCustomIndex() {
6163
new SearchHits(
6264
new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F));
6365
Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID));
64-
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
65-
assertFalse(
66-
jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty());
66+
67+
assertFalse(jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, "foo").isEmpty());
6768
}
6869

6970
@Test
@@ -72,11 +73,11 @@ public void testInvalidSearchResponse() {
7273
when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse);
7374
when(searchResponse.status()).thenReturn(RestStatus.NO_CONTENT);
7475

75-
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
7676
RuntimeException exception =
7777
assertThrows(
7878
RuntimeException.class,
79-
() -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null));
79+
() -> jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null));
80+
8081
Assertions.assertEquals(
8182
"Fetching result from "
8283
+ DEFAULT_RESULT_INDEX
@@ -88,17 +89,16 @@ public void testInvalidSearchResponse() {
8889
@Test
8990
public void testSearchFailure() {
9091
when(client.search(any())).thenThrow(RuntimeException.class);
91-
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
92+
9293
assertThrows(
9394
RuntimeException.class,
94-
() -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null));
95+
() -> jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null));
9596
}
9697

9798
@Test
9899
public void testIndexNotFoundException() {
99100
when(client.search(any())).thenThrow(IndexNotFoundException.class);
100-
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
101-
assertTrue(
102-
jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty());
101+
102+
assertTrue(jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, "foo").isEmpty());
103103
}
104104
}

0 commit comments

Comments
 (0)