Skip to content

Commit 3daf64f

Browse files
authored
[Feature] Flint query scheduler part1 - integrate job scheduler plugin (#2834)
* [Feature] Flint query scheduler part1 - integrate job scheduler plugin Signed-off-by: Louis Chu <[email protected]> * Add comments Signed-off-by: Louis Chu <[email protected]> * Add unit test Signed-off-by: Louis Chu <[email protected]> * Remove test rest API Signed-off-by: Louis Chu <[email protected]> * Fix doc test Signed-off-by: Louis Chu <[email protected]> * Add more tests Signed-off-by: Louis Chu <[email protected]> * Fix IT Signed-off-by: Louis Chu <[email protected]> * Fix IT with security Signed-off-by: Louis Chu <[email protected]> * Improve test coverage Signed-off-by: Louis Chu <[email protected]> * Fix integTest cluster Signed-off-by: Louis Chu <[email protected]> * Fix UT Signed-off-by: Louis Chu <[email protected]> * Update UT Signed-off-by: Louis Chu <[email protected]> * Fix bwc test Signed-off-by: Louis Chu <[email protected]> * Resolve comments Signed-off-by: Louis Chu <[email protected]> * Fix bwc test Signed-off-by: Louis Chu <[email protected]> * clean up doc test Signed-off-by: Louis Chu <[email protected]> * Resolve comments Signed-off-by: Louis Chu <[email protected]> * Fix UT Signed-off-by: Louis Chu <[email protected]> --------- Signed-off-by: Louis Chu <[email protected]>
1 parent 1b17520 commit 3daf64f

24 files changed

+1357
-29
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,5 @@ gen
4949
.worktrees
5050
http-client.env.json
5151
/doctest/sql-cli/
52+
/doctest/opensearch-job-scheduler/
5253
.factorypath

async-query-core/src/main/antlr/SqlBaseParser.g4

+11-6
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ compoundStatement
6666
;
6767

6868
setStatementWithOptionalVarKeyword
69-
: SET (VARIABLE | VAR)? assignmentList #setVariableWithOptionalKeyword
70-
| SET (VARIABLE | VAR)? LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
69+
: SET variable? assignmentList #setVariableWithOptionalKeyword
70+
| SET variable? LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
7171
LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword
7272
;
7373

@@ -215,9 +215,9 @@ statement
215215
routineCharacteristics
216216
RETURN (query | expression) #createUserDefinedFunction
217217
| DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference #dropFunction
218-
| DECLARE (OR REPLACE)? VARIABLE?
218+
| DECLARE (OR REPLACE)? variable?
219219
identifierReference dataType? variableDefaultExpression? #createVariable
220-
| DROP TEMPORARY VARIABLE (IF EXISTS)? identifierReference #dropVariable
220+
| DROP TEMPORARY variable (IF EXISTS)? identifierReference #dropVariable
221221
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
222222
(statement|setResetStatement) #explain
223223
| SHOW TABLES ((FROM | IN) identifierReference)?
@@ -272,8 +272,8 @@ setResetStatement
272272
| SET TIME ZONE interval #setTimeZone
273273
| SET TIME ZONE timezone #setTimeZone
274274
| SET TIME ZONE .*? #setTimeZone
275-
| SET (VARIABLE | VAR) assignmentList #setVariable
276-
| SET (VARIABLE | VAR) LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
275+
| SET variable assignmentList #setVariable
276+
| SET variable LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
277277
LEFT_PAREN query RIGHT_PAREN #setVariable
278278
| SET configKey EQ configValue #setQuotedConfiguration
279279
| SET configKey (EQ .*?)? #setConfiguration
@@ -438,6 +438,11 @@ namespaces
438438
| SCHEMAS
439439
;
440440

441+
variable
442+
: VARIABLE
443+
| VAR
444+
;
445+
441446
describeFuncName
442447
: identifierReference
443448
| stringLit

async-query/build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ repositories {
1616

1717

1818
dependencies {
19+
implementation "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
20+
1921
api project(':core')
2022
api project(':async-query-core')
2123
implementation project(':protocol')
@@ -97,6 +99,7 @@ jacocoTestCoverageVerification {
9799
// ignore because XContext IOException
98100
'org.opensearch.sql.spark.execution.statestore.StateStore',
99101
'org.opensearch.sql.spark.rest.*',
102+
'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser',
100103
'org.opensearch.sql.spark.transport.model.*'
101104
]
102105
limit {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.spark.scheduler;
7+
8+
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
9+
10+
import com.google.common.annotations.VisibleForTesting;
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.nio.charset.StandardCharsets;
14+
import java.time.Instant;
15+
import org.apache.commons.io.IOUtils;
16+
import org.apache.logging.log4j.LogManager;
17+
import org.apache.logging.log4j.Logger;
18+
import org.opensearch.action.DocWriteRequest;
19+
import org.opensearch.action.DocWriteResponse;
20+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
21+
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
22+
import org.opensearch.action.delete.DeleteRequest;
23+
import org.opensearch.action.delete.DeleteResponse;
24+
import org.opensearch.action.index.IndexRequest;
25+
import org.opensearch.action.index.IndexResponse;
26+
import org.opensearch.action.support.WriteRequest;
27+
import org.opensearch.action.update.UpdateRequest;
28+
import org.opensearch.action.update.UpdateResponse;
29+
import org.opensearch.client.Client;
30+
import org.opensearch.cluster.service.ClusterService;
31+
import org.opensearch.common.action.ActionFuture;
32+
import org.opensearch.common.xcontent.XContentType;
33+
import org.opensearch.common.xcontent.json.JsonXContent;
34+
import org.opensearch.index.engine.DocumentMissingException;
35+
import org.opensearch.index.engine.VersionConflictEngineException;
36+
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
37+
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob;
38+
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
39+
import org.opensearch.threadpool.ThreadPool;
40+
41+
/** Scheduler class for managing asynchronous query jobs. */
42+
public class OpenSearchAsyncQueryScheduler {
43+
public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler";
44+
public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler";
45+
private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME =
46+
"async-query-scheduler-index-mapping.yml";
47+
private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME =
48+
"async-query-scheduler-index-settings.yml";
49+
private static final Logger LOG = LogManager.getLogger();
50+
51+
private Client client;
52+
private ClusterService clusterService;
53+
54+
/** Loads job resources, setting up required services and job runner instance. */
55+
public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) {
56+
this.client = client;
57+
this.clusterService = clusterService;
58+
OpenSearchRefreshIndexJob openSearchRefreshIndexJob =
59+
OpenSearchRefreshIndexJob.getJobRunnerInstance();
60+
openSearchRefreshIndexJob.setClusterService(clusterService);
61+
openSearchRefreshIndexJob.setThreadPool(threadPool);
62+
openSearchRefreshIndexJob.setClient(client);
63+
}
64+
65+
/** Schedules a new job by indexing it into the job index. */
66+
public void scheduleJob(OpenSearchRefreshIndexJobRequest request) {
67+
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
68+
createAsyncQuerySchedulerIndex();
69+
}
70+
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME);
71+
indexRequest.id(request.getName());
72+
indexRequest.opType(DocWriteRequest.OpType.CREATE);
73+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
74+
IndexResponse indexResponse;
75+
try {
76+
indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));
77+
ActionFuture<IndexResponse> indexResponseActionFuture = client.index(indexRequest);
78+
indexResponse = indexResponseActionFuture.actionGet();
79+
} catch (VersionConflictEngineException exception) {
80+
throw new IllegalArgumentException("A job already exists with name: " + request.getName());
81+
} catch (Throwable e) {
82+
LOG.error("Failed to schedule job : {}", request.getName(), e);
83+
throw new RuntimeException(e);
84+
}
85+
86+
if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
87+
LOG.debug("Job : {} successfully created", request.getName());
88+
} else {
89+
throw new RuntimeException(
90+
"Schedule job failed with result : " + indexResponse.getResult().getLowercase());
91+
}
92+
}
93+
94+
/** Unschedules a job by marking it as disabled and updating its last update time. */
95+
public void unscheduleJob(String jobId) throws IOException {
96+
assertIndexExists();
97+
OpenSearchRefreshIndexJobRequest request =
98+
OpenSearchRefreshIndexJobRequest.builder()
99+
.jobName(jobId)
100+
.enabled(false)
101+
.lastUpdateTime(Instant.now())
102+
.build();
103+
updateJob(request);
104+
}
105+
106+
/** Updates an existing job with new parameters. */
107+
public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException {
108+
assertIndexExists();
109+
UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName());
110+
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
111+
updateRequest.doc(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));
112+
UpdateResponse updateResponse;
113+
try {
114+
ActionFuture<UpdateResponse> updateResponseActionFuture = client.update(updateRequest);
115+
updateResponse = updateResponseActionFuture.actionGet();
116+
} catch (DocumentMissingException exception) {
117+
throw new IllegalArgumentException("Job: " + request.getName() + " doesn't exist");
118+
} catch (Throwable e) {
119+
LOG.error("Failed to update job : {}", request.getName(), e);
120+
throw new RuntimeException(e);
121+
}
122+
123+
if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)
124+
|| updateResponse.getResult().equals(DocWriteResponse.Result.NOOP)) {
125+
LOG.debug("Job : {} successfully updated", request.getName());
126+
} else {
127+
throw new RuntimeException(
128+
"Update job failed with result : " + updateResponse.getResult().getLowercase());
129+
}
130+
}
131+
132+
/** Removes a job by deleting its document from the index. */
133+
public void removeJob(String jobId) {
134+
assertIndexExists();
135+
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
136+
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
137+
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
138+
DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet();
139+
140+
if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) {
141+
LOG.debug("Job : {} successfully deleted", jobId);
142+
} else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
143+
throw new IllegalArgumentException("Job : " + jobId + " doesn't exist");
144+
} else {
145+
throw new RuntimeException(
146+
"Remove job failed with result : " + deleteResponse.getResult().getLowercase());
147+
}
148+
}
149+
150+
/** Creates the async query scheduler index with specified mappings and settings. */
151+
@VisibleForTesting
152+
void createAsyncQuerySchedulerIndex() {
153+
try {
154+
InputStream mappingFileStream =
155+
OpenSearchAsyncQueryScheduler.class
156+
.getClassLoader()
157+
.getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME);
158+
InputStream settingsFileStream =
159+
OpenSearchAsyncQueryScheduler.class
160+
.getClassLoader()
161+
.getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME);
162+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME);
163+
createIndexRequest.mapping(
164+
IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML);
165+
createIndexRequest.settings(
166+
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML);
167+
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture =
168+
client.admin().indices().create(createIndexRequest);
169+
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();
170+
171+
if (createIndexResponse.isAcknowledged()) {
172+
LOG.debug("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME);
173+
} else {
174+
throw new RuntimeException("Index creation is not acknowledged.");
175+
}
176+
} catch (Throwable e) {
177+
LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e);
178+
throw new RuntimeException(
179+
"Internal server error while creating "
180+
+ SCHEDULER_INDEX_NAME
181+
+ " index: "
182+
+ e.getMessage(),
183+
e);
184+
}
185+
}
186+
187+
private void assertIndexExists() {
188+
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
189+
throw new IllegalStateException("Job index does not exist.");
190+
}
191+
}
192+
193+
/** Returns the job runner instance for the scheduler. */
194+
public static ScheduledJobRunner getJobRunner() {
195+
return OpenSearchRefreshIndexJob.getJobRunnerInstance();
196+
}
197+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.spark.scheduler;
7+
8+
import java.io.IOException;
9+
import java.time.Instant;
10+
import org.opensearch.core.xcontent.XContentParser;
11+
import org.opensearch.core.xcontent.XContentParserUtils;
12+
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
13+
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
14+
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
15+
16+
public class OpenSearchRefreshIndexJobRequestParser {
17+
18+
private static Instant parseInstantValue(XContentParser parser) throws IOException {
19+
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
20+
return null;
21+
}
22+
if (parser.currentToken().isValue()) {
23+
return Instant.ofEpochMilli(parser.longValue());
24+
}
25+
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
26+
return null;
27+
}
28+
29+
public static ScheduledJobParser getJobParser() {
30+
return (parser, id, jobDocVersion) -> {
31+
OpenSearchRefreshIndexJobRequest.OpenSearchRefreshIndexJobRequestBuilder builder =
32+
OpenSearchRefreshIndexJobRequest.builder();
33+
XContentParserUtils.ensureExpectedToken(
34+
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
35+
36+
while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
37+
String fieldName = parser.currentName();
38+
parser.nextToken();
39+
switch (fieldName) {
40+
case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD:
41+
builder.jobName(parser.text());
42+
break;
43+
case OpenSearchRefreshIndexJobRequest.JOB_TYPE_FIELD:
44+
builder.jobType(parser.text());
45+
break;
46+
case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD:
47+
builder.enabled(parser.booleanValue());
48+
break;
49+
case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD:
50+
builder.enabledTime(parseInstantValue(parser));
51+
break;
52+
case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD:
53+
builder.lastUpdateTime(parseInstantValue(parser));
54+
break;
55+
case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD:
56+
builder.schedule(ScheduleParser.parse(parser));
57+
break;
58+
case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS:
59+
builder.lockDurationSeconds(parser.longValue());
60+
break;
61+
case OpenSearchRefreshIndexJobRequest.JITTER:
62+
builder.jitter(parser.doubleValue());
63+
break;
64+
default:
65+
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
66+
}
67+
}
68+
return builder.build();
69+
};
70+
}
71+
}

0 commit comments

Comments
 (0)