diff --git a/sample-resource-plugin/build.gradle b/sample-resource-plugin/build.gradle index 39fd7a314e..88d9bab077 100644 --- a/sample-resource-plugin/build.gradle +++ b/sample-resource-plugin/build.gradle @@ -19,7 +19,7 @@ opensearchplugin { name 'opensearch-sample-resource-plugin' description 'Sample plugin that extends OpenSearch Resource Plugin' classname 'org.opensearch.sample.SampleResourcePlugin' - extendedPlugins = ['opensearch-security;optional=true'] + extendedPlugins = ['opensearch-security;optional=true', 'opensearch-job-scheduler'] } dependencyLicenses.enabled = false @@ -69,6 +69,7 @@ configurations.all { dependencies { // Main implementation dependencies compileOnly project(path: ":${rootProject.name}-spi", configuration: 'shadow') + compileOnly "org.opensearch:opensearch-job-scheduler-spi:3.3.0.0-SNAPSHOT" implementation "org.opensearch:common-utils:${common_utils_version}" implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}" @@ -81,6 +82,8 @@ dependencies { integrationTestImplementation rootProject.sourceSets.main.output integrationTestImplementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}" integrationTestImplementation 'org.ldaptive:ldaptive:1.2.3' + integrationTestImplementation "org.opensearch:opensearch-job-scheduler-spi:3.3.0.0-SNAPSHOT" + integrationTestImplementation "org.opensearch:opensearch-job-scheduler:3.3.0.0-SNAPSHOT" // To be removed once integration test framework supports extended plugins integrationTestImplementation project(path: ":${rootProject.name}-spi", configuration: 'shadow') diff --git a/sample-resource-plugin/src/integrationTest/java/org/opensearch/sample/secure/SecureJobTests.java b/sample-resource-plugin/src/integrationTest/java/org/opensearch/sample/secure/SecureJobTests.java new file mode 100644 index 0000000000..e524c72073 --- /dev/null +++ b/sample-resource-plugin/src/integrationTest/java/org/opensearch/sample/secure/SecureJobTests.java @@ -0,0 +1,141 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ +package org.opensearch.sample.secure; + +import java.io.IOException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.opensearch.Version; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.spi.schedule.CronSchedule; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.sample.SampleResourcePlugin; +import org.opensearch.sample.scheduledjob.SampleSecureJobParameter; +import org.opensearch.security.OpenSearchSecurityPlugin; +import org.opensearch.test.framework.TestSecurityConfig.AuthcDomain; +import org.opensearch.test.framework.cluster.ClusterManager; +import org.opensearch.test.framework.cluster.LocalCluster; +import org.opensearch.test.framework.cluster.TestRestClient; +import org.opensearch.test.framework.cluster.TestRestClient.HttpResponse; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.security.support.ConfigConstants.SECURITY_SYSTEM_INDICES_ENABLED_KEY; +import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN; +import static org.awaitility.Awaitility.await; + +@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class) +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class SecureJobTests { + + public static final AuthcDomain AUTHC_DOMAIN = new AuthcDomain("basic", 0).httpAuthenticatorWithChallenge("basic").backend("internal"); + + @ClassRule + public static final LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.SINGLENODE) + .anonymousAuth(false) + .authc(AUTHC_DOMAIN) + .users(USER_ADMIN) + .plugin(JobSchedulerPlugin.class) + .plugin( + new PluginInfo( + SampleResourcePlugin.class.getName(), + "classpath plugin", + "NA", + Version.CURRENT, + "1.8", + SampleResourcePlugin.class.getName(), + null, + List.of(OpenSearchSecurityPlugin.class.getName(), JobSchedulerPlugin.class.getName()), + false + ) + ) + .nodeSettings(Map.of(SECURITY_SYSTEM_INDICES_ENABLED_KEY, true, "plugins.jobscheduler.sweeper.period", "1s")) + .build(); + + public static String randomAlphaOfLength(int codeUnits) { + return RandomizedTest.randomAsciiOfLength(codeUnits); + } + + protected Map getJobParameterAsMap(String jobId, SampleSecureJobParameter jobParameter) throws IOException { + Map params = new HashMap<>(); + params.put("id", jobId); + params.put("job_name", jobParameter.getName()); + params.put("index", jobParameter.getIndexToWatch()); + params.put("enabled", String.valueOf(jobParameter.isEnabled())); + if (jobParameter.getSchedule() instanceof IntervalSchedule) { + params.put("interval", String.valueOf(((IntervalSchedule) jobParameter.getSchedule()).getInterval())); + } else if (jobParameter.getSchedule() instanceof CronSchedule) { + params.put("cron", ((CronSchedule) jobParameter.getSchedule()).getCronExpression()); + } + params.put("lock_duration_seconds", String.valueOf(jobParameter.getLockDurationSeconds())); + return params; + } + + public static String toUrlParams(Map params) { + StringJoiner joiner = new StringJoiner("&"); + for (Map.Entry entry : params.entrySet()) { + String encodedKey = URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8); + String encodedValue = URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8); + joiner.add(encodedKey + "=" + encodedValue); + } + return joiner.toString(); + } + + @Test + public void testThatJobSchedulerIsInstalled() { + try (TestRestClient client = cluster.getRestClient(USER_ADMIN)) { + HttpResponse response = client.get("_cat/plugins"); + + assertThat(response.getStatusCode(), equalTo(RestStatus.OK.getStatus())); + assertThat(response.getBody(), containsString(JobSchedulerPlugin.class.getName())); + } + } + + @Test + public void testCreateAJobAndWaitForCompletion() throws IOException { + SampleSecureJobParameter jobParameter = new SampleSecureJobParameter(); + jobParameter.setJobName("sample-job-it"); + jobParameter.setIndexToWatch("http-logs"); + jobParameter.setSchedule(new IntervalSchedule(Instant.now(), 5, ChronoUnit.SECONDS)); + jobParameter.setLockDurationSeconds(5L); + jobParameter.setEnabled(true); + + // Creates a new watcher job. + String jobId = randomAlphaOfLength(10); + Map params = getJobParameterAsMap(jobId, jobParameter); + try (TestRestClient client = cluster.getRestClient(USER_ADMIN)) { + client.put("http-logs"); + HttpResponse response = client.post("_plugins/scheduler_sample/watch" + "?" + toUrlParams(params)); + + assertThat(response.getStatusCode(), equalTo(RestStatus.OK.getStatus())); + + await().until(() -> { + HttpResponse countResponse = client.get("http-logs/_count"); + return countResponse.getStatusCode() == RestStatus.OK.getStatus() && countResponse.getBody().contains("\"count\":1"); + }); + } + } +} diff --git a/sample-resource-plugin/src/integrationTest/java/org/opensearch/sample/secure/SecurePluginTests.java b/sample-resource-plugin/src/integrationTest/java/org/opensearch/sample/secure/SecurePluginTests.java index ff88739d5c..ae33cada04 100644 --- a/sample-resource-plugin/src/integrationTest/java/org/opensearch/sample/secure/SecurePluginTests.java +++ b/sample-resource-plugin/src/integrationTest/java/org/opensearch/sample/secure/SecurePluginTests.java @@ -19,7 +19,6 @@ import org.opensearch.Version; import org.opensearch.core.rest.RestStatus; -import org.opensearch.painless.PainlessModulePlugin; import org.opensearch.plugins.PluginInfo; import org.opensearch.sample.SampleResourcePlugin; import org.opensearch.security.OpenSearchSecurityPlugin; @@ -47,7 +46,6 @@ public class SecurePluginTests { .anonymousAuth(false) .authc(AUTHC_DOMAIN) .users(USER_ADMIN) - .plugin(PainlessModulePlugin.class) .plugin( new PluginInfo( SampleResourcePlugin.class.getName(), diff --git a/sample-resource-plugin/src/main/java/org/opensearch/sample/SampleResourcePlugin.java b/sample-resource-plugin/src/main/java/org/opensearch/sample/SampleResourcePlugin.java index 2bc32262af..2da2ae0f86 100644 --- a/sample-resource-plugin/src/main/java/org/opensearch/sample/SampleResourcePlugin.java +++ b/sample-resource-plugin/src/main/java/org/opensearch/sample/SampleResourcePlugin.java @@ -53,6 +53,8 @@ import org.opensearch.sample.resource.actions.transport.RevokeResourceAccessTransportAction; import org.opensearch.sample.resource.actions.transport.ShareResourceTransportAction; import org.opensearch.sample.resource.actions.transport.UpdateResourceTransportAction; +import org.opensearch.sample.scheduledjob.SampleSecureJobRestHandler; +import org.opensearch.sample.scheduledjob.SampleSecureJobRunner; import org.opensearch.sample.secure.actions.rest.create.SecurePluginAction; import org.opensearch.sample.secure.actions.rest.create.SecurePluginRestAction; import org.opensearch.sample.secure.actions.transport.SecurePluginTransportAction; @@ -89,6 +91,8 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { this.pluginClient = new PluginClient(client); + SampleSecureJobRunner jobRunner = SampleSecureJobRunner.getJobRunnerInstance(); + jobRunner.setClient(client); return List.of(pluginClient); } @@ -110,6 +114,7 @@ public List getRestHandlers( handlers.add(new ShareResourceRestAction()); handlers.add(new RevokeResourceAccessRestAction()); + handlers.add(new SampleSecureJobRestHandler()); return handlers; } diff --git a/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleJobExtension.java b/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleJobExtension.java new file mode 100644 index 0000000000..b35db614c6 --- /dev/null +++ b/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleJobExtension.java @@ -0,0 +1,81 @@ +package org.opensearch.sample.scheduledjob; + +import java.io.IOException; +import java.time.Instant; + +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; + +public class SampleJobExtension implements JobSchedulerExtension { + @Override + public String getJobType() { + return "sample-secure-job"; + } + + @Override + public String getJobIndex() { + return ".sample-secure-job"; + } + + @Override + public ScheduledJobRunner getJobRunner() { + return SampleSecureJobRunner.getJobRunnerInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> { + SampleSecureJobParameter jobParameter = new SampleSecureJobParameter(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case SampleSecureJobParameter.NAME_FIELD: + jobParameter.setJobName(parser.text()); + break; + case SampleSecureJobParameter.ENABLED_FILED: + jobParameter.setEnabled(parser.booleanValue()); + break; + case SampleSecureJobParameter.ENABLED_TIME_FILED: + jobParameter.setEnabledTime(parseInstantValue(parser)); + break; + case SampleSecureJobParameter.LAST_UPDATE_TIME_FIELD: + jobParameter.setLastUpdateTime(parseInstantValue(parser)); + break; + case SampleSecureJobParameter.SCHEDULE_FIELD: + jobParameter.setSchedule(ScheduleParser.parse(parser)); + break; + case SampleSecureJobParameter.INDEX_NAME_FIELD: + jobParameter.setIndexToWatch(parser.text()); + break; + case SampleSecureJobParameter.LOCK_DURATION_SECONDS: + jobParameter.setLockDurationSeconds(parser.longValue()); + break; + case SampleSecureJobParameter.JITTER: + jobParameter.setJitter(parser.doubleValue()); + break; + default: + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + } + } + return jobParameter; + }; + } + + private Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } +} diff --git a/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobParameter.java b/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobParameter.java new file mode 100644 index 0000000000..6b844b7007 --- /dev/null +++ b/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobParameter.java @@ -0,0 +1,160 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.sample.scheduledjob; + +import java.io.IOException; +import java.time.Instant; + +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +/** + * A sample job parameter. + *

+ * It adds an additional "indexToWatch" field to {@link ScheduledJobParameter}, which stores the index + * the job runner will watch. + */ +public class SampleSecureJobParameter implements ScheduledJobParameter { + public static final String NAME_FIELD = "name"; + public static final String ENABLED_FILED = "enabled"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FILED = "enabled_time"; + public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field"; + public static final String INDEX_NAME_FIELD = "index_name_to_watch"; + public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds"; + public static final String JITTER = "jitter"; + + private String jobName; + private Instant lastUpdateTime; + private Instant enabledTime; + private boolean isEnabled; + private Schedule schedule; + private String indexToWatch; + private Long lockDurationSeconds; + private Double jitter; + + public SampleSecureJobParameter() {} + + public SampleSecureJobParameter( + String id, + String name, + String indexToWatch, + Schedule schedule, + Long lockDurationSeconds, + Double jitter + ) { + this.jobName = name; + this.indexToWatch = indexToWatch; + this.schedule = schedule; + + Instant now = Instant.now(); + this.isEnabled = true; + this.enabledTime = now; + this.lastUpdateTime = now; + this.lockDurationSeconds = lockDurationSeconds; + this.jitter = jitter; + } + + @Override + public String getName() { + return this.jobName; + } + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + @Override + public Schedule getSchedule() { + return this.schedule; + } + + @Override + public boolean isEnabled() { + return this.isEnabled; + } + + @Override + public Long getLockDurationSeconds() { + return this.lockDurationSeconds; + } + + @Override + public Double getJitter() { + return jitter; + } + + public String getIndexToWatch() { + return this.indexToWatch; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public void setLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public void setEnabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + } + + public void setEnabled(boolean enabled) { + isEnabled = enabled; + } + + public void setSchedule(Schedule schedule) { + this.schedule = schedule; + } + + public void setIndexToWatch(String indexToWatch) { + this.indexToWatch = indexToWatch; + } + + public void setLockDurationSeconds(Long lockDurationSeconds) { + this.lockDurationSeconds = lockDurationSeconds; + } + + public void setJitter(Double jitter) { + this.jitter = jitter; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD, this.jobName) + .field(ENABLED_FILED, this.isEnabled) + .field(SCHEDULE_FIELD, this.schedule) + .field(INDEX_NAME_FIELD, this.indexToWatch); + if (this.enabledTime != null) { + builder.timeField(ENABLED_TIME_FILED, ENABLED_TIME_FILED_READABLE, this.enabledTime.toEpochMilli()); + } + if (this.lastUpdateTime != null) { + builder.timeField(LAST_UPDATE_TIME_FIELD, LAST_UPDATE_TIME_FIELD_READABLE, this.lastUpdateTime.toEpochMilli()); + } + if (this.lockDurationSeconds != null) { + builder.field(LOCK_DURATION_SECONDS, this.lockDurationSeconds); + } + if (this.jitter != null) { + builder.field(JITTER, this.jitter); + } + builder.endObject(); + return builder; + } +} diff --git a/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobRestHandler.java b/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobRestHandler.java new file mode 100644 index 0000000000..d60970735f --- /dev/null +++ b/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobRestHandler.java @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.sample.scheduledjob; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.jobscheduler.spi.schedule.CronSchedule; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.transport.client.node.NodeClient; + +/** + * A sample rest handler that supports schedule and deschedule job operation + * + * Users need to provide "id", "index", "job_name", and either "interval" or "cron" parameter to schedule + * a job. e.g. + * {@code + * POST /_plugins/scheduler_sample/watch?id=dashboards-job-id&job_name=watch dashboards index&index=.opensearch_dashboards_1&interval=1 + * POST /_plugins/scheduler_sample/watch?id=dashboards-job-id&job_name=watch dashboards index&index=.opensearch_dashboards_1&cron=0 9 * * MON + * } + * + * creates a job with id "dashboards-job-id" and job name "watch dashboards index", + * which logs ".opensearch_dashboards_1" index's shards info every 1 minute or every Monday at 9 AM + * + * Users can remove that job by calling + * {@code DELETE /_plugins/scheduler_sample/watch?id=dashboards-job-id} + */ +public class SampleSecureJobRestHandler extends BaseRestHandler { + public static final String WATCH_INDEX_URI = "/_plugins/scheduler_sample/watch"; + + @Override + public String getName() { + return "Sample JobScheduler extension handler"; + } + + @Override + public List routes() { + return Collections.unmodifiableList( + Arrays.asList(new Route(RestRequest.Method.POST, WATCH_INDEX_URI), new Route(RestRequest.Method.DELETE, WATCH_INDEX_URI)) + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + if (request.method().equals(RestRequest.Method.POST)) { + // compose SampleJobParameter object from request + String id = request.param("id"); + String indexName = request.param("index"); + String jobName = request.param("job_name"); + String interval = request.param("interval"); + String cron = request.param("cron"); + String enabled = request.param("enabled", "true"); + String lockDurationSecondsString = request.param("lock_duration_seconds"); + Long lockDurationSeconds = lockDurationSecondsString != null ? Long.parseLong(lockDurationSecondsString) : null; + String jitterString = request.param("jitter"); + Double jitter = jitterString != null ? Double.parseDouble(jitterString) : null; + + if (id == null || indexName == null) { + throw new IllegalArgumentException("Must specify id and index parameter"); + } + if (interval == null && cron == null) { + throw new IllegalArgumentException("Must specify either interval or cron parameter"); + } + if (interval != null && cron != null) { + throw new IllegalArgumentException("Cannot specify both interval and cron parameters"); + } + + Schedule schedule; + if (interval != null) { + schedule = new IntervalSchedule(Instant.now(), Integer.parseInt(interval), ChronoUnit.SECONDS); + } else { + schedule = new CronSchedule(cron, ZoneId.systemDefault()); + } + + SampleSecureJobParameter jobParameter = new SampleSecureJobParameter( + id, + jobName, + indexName, + schedule, + lockDurationSeconds, + jitter + ); + jobParameter.setEnabled(Boolean.parseBoolean(enabled)); + IndexRequest indexRequest = new IndexRequest().index(".sample-secure-job") + .id(id) + .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + return restChannel -> { + // index the job parameter + client.index(indexRequest, new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + try { + RestResponse restResponse = new BytesRestResponse( + RestStatus.OK, + indexResponse.toXContent(JsonXContent.contentBuilder(), null) + ); + restChannel.sendResponse(restResponse); + } catch (IOException e) { + restChannel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + } + + @Override + public void onFailure(Exception e) { + restChannel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + }; + } else if (request.method().equals(RestRequest.Method.DELETE)) { + // delete job parameter doc from index + String id = request.param("id"); + DeleteRequest deleteRequest = new DeleteRequest().index(".sample-secure-job").id(id); + + return restChannel -> { + client.delete(deleteRequest, new ActionListener() { + @Override + public void onResponse(DeleteResponse deleteResponse) { + restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "Job deleted.")); + } + + @Override + public void onFailure(Exception e) { + restChannel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + }; + } else { + return restChannel -> { + restChannel.sendResponse(new BytesRestResponse(RestStatus.METHOD_NOT_ALLOWED, request.method() + " is not allowed.")); + }; + } + } +} diff --git a/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobRunner.java b/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobRunner.java new file mode 100644 index 0000000000..6447e3558d --- /dev/null +++ b/sample-resource-plugin/src/main/java/org/opensearch/sample/scheduledjob/SampleSecureJobRunner.java @@ -0,0 +1,97 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.sample.scheduledjob; + +import java.util.UUID; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.plugins.Plugin; +import org.opensearch.transport.client.Client; + +/** + * A sample job runner class. + * + * The job runner should be a singleton class if it uses OpenSearch client or other objects passed + * from OpenSearch. Because when registering the job runner to JobScheduler plugin, OpenSearch has + * not invoke plugins' createComponents() method. That is saying the plugin is not completely initalized, + * and the OpenSearch {@link Client}, {@link ClusterService} and other objects + * are not available to plugin and this job runner. + * + * So we have to move this job runner intialization to {@link Plugin} createComponents() method, and using + * singleton job runner to ensure we register a usable job runner instance to JobScheduler plugin. + * + * This sample job runner takes the "indexToWatch" from job parameter and logs that index's shards. + */ +public class SampleSecureJobRunner implements ScheduledJobRunner { + + private static final Logger log = LogManager.getLogger(ScheduledJobRunner.class); + + private static SampleSecureJobRunner INSTANCE; + + public static SampleSecureJobRunner getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (SampleSecureJobRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new SampleSecureJobRunner(); + return INSTANCE; + } + } + + private Client client; + + private SampleSecureJobRunner() { + // Singleton class, use getJobRunner method instead of constructor + } + + public void setClient(Client client) { + this.client = client; + } + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + final LockService lockService = context.getLockService(); + + if (jobParameter.getLockDurationSeconds() != null) { + lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> { + if (lock == null) { + return; + } + + SampleSecureJobParameter parameter = (SampleSecureJobParameter) jobParameter; + this.client.indexAsync( + new IndexRequest(parameter.getIndexToWatch()).id(UUID.randomUUID().toString()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source("{\"message\": \"message\"}", XContentType.JSON) + ).thenAccept(indexResponse -> { + lockService.release( + lock, + ActionListener.wrap(released -> { log.info("Released lock for job {}", jobParameter.getName()); }, exception -> { + throw new IllegalStateException("Failed to release lock."); + }) + ); + }).exceptionally(exception -> { throw new IllegalStateException("Failed to index sample doc."); }); + }, exception -> { throw new IllegalStateException("Failed to acquire lock."); })); + } + } +} diff --git a/sample-resource-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/sample-resource-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 0000000000..399d212aa6 --- /dev/null +++ b/sample-resource-plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1 @@ +org.opensearch.sample.scheduledjob.SampleJobExtension \ No newline at end of file