Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sample-resource-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ opensearchplugin {
name 'opensearch-sample-resource-plugin'
description 'Sample plugin that extends OpenSearch Resource Plugin'
classname 'org.opensearch.sample.SampleResourcePlugin'
extendedPlugins = ['opensearch-security;optional=true']
extendedPlugins = ['opensearch-security;optional=true', 'opensearch-job-scheduler']
}

dependencyLicenses.enabled = false
Expand Down Expand Up @@ -69,6 +69,7 @@ configurations.all {
dependencies {
// Main implementation dependencies
compileOnly project(path: ":${rootProject.name}-spi", configuration: 'shadow')
compileOnly "org.opensearch:opensearch-job-scheduler-spi:3.3.0.0-SNAPSHOT"

implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
Expand All @@ -81,6 +82,8 @@ dependencies {
integrationTestImplementation rootProject.sourceSets.main.output
integrationTestImplementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}"
integrationTestImplementation 'org.ldaptive:ldaptive:1.2.3'
integrationTestImplementation "org.opensearch:opensearch-job-scheduler-spi:3.3.0.0-SNAPSHOT"
integrationTestImplementation "org.opensearch:opensearch-job-scheduler:3.3.0.0-SNAPSHOT"

// To be removed once integration test framework supports extended plugins
integrationTestImplementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
package org.opensearch.sample.secure;

import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.opensearch.Version;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.spi.schedule.CronSchedule;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.sample.SampleResourcePlugin;
import org.opensearch.sample.scheduledjob.SampleSecureJobParameter;
import org.opensearch.security.OpenSearchSecurityPlugin;
import org.opensearch.test.framework.TestSecurityConfig.AuthcDomain;
import org.opensearch.test.framework.cluster.ClusterManager;
import org.opensearch.test.framework.cluster.LocalCluster;
import org.opensearch.test.framework.cluster.TestRestClient;
import org.opensearch.test.framework.cluster.TestRestClient.HttpResponse;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.security.support.ConfigConstants.SECURITY_SYSTEM_INDICES_ENABLED_KEY;
import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN;
import static org.awaitility.Awaitility.await;

@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class SecureJobTests {

public static final AuthcDomain AUTHC_DOMAIN = new AuthcDomain("basic", 0).httpAuthenticatorWithChallenge("basic").backend("internal");

@ClassRule
public static final LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.SINGLENODE)
.anonymousAuth(false)
.authc(AUTHC_DOMAIN)
.users(USER_ADMIN)
.plugin(JobSchedulerPlugin.class)
.plugin(
new PluginInfo(
SampleResourcePlugin.class.getName(),
"classpath plugin",
"NA",
Version.CURRENT,
"1.8",
SampleResourcePlugin.class.getName(),
null,
List.of(OpenSearchSecurityPlugin.class.getName(), JobSchedulerPlugin.class.getName()),
false
)
)
.nodeSettings(Map.of(SECURITY_SYSTEM_INDICES_ENABLED_KEY, true, "plugins.jobscheduler.sweeper.period", "1s"))
.build();

public static String randomAlphaOfLength(int codeUnits) {
return RandomizedTest.randomAsciiOfLength(codeUnits);
}

protected Map<String, String> getJobParameterAsMap(String jobId, SampleSecureJobParameter jobParameter) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("id", jobId);
params.put("job_name", jobParameter.getName());
params.put("index", jobParameter.getIndexToWatch());
params.put("enabled", String.valueOf(jobParameter.isEnabled()));
if (jobParameter.getSchedule() instanceof IntervalSchedule) {
params.put("interval", String.valueOf(((IntervalSchedule) jobParameter.getSchedule()).getInterval()));
} else if (jobParameter.getSchedule() instanceof CronSchedule) {
params.put("cron", ((CronSchedule) jobParameter.getSchedule()).getCronExpression());
}
params.put("lock_duration_seconds", String.valueOf(jobParameter.getLockDurationSeconds()));
return params;
}

public static String toUrlParams(Map<String, String> params) {
StringJoiner joiner = new StringJoiner("&");
for (Map.Entry<String, String> entry : params.entrySet()) {
String encodedKey = URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8);
String encodedValue = URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8);
joiner.add(encodedKey + "=" + encodedValue);
}
return joiner.toString();
}

@Test
public void testThatJobSchedulerIsInstalled() {
try (TestRestClient client = cluster.getRestClient(USER_ADMIN)) {
HttpResponse response = client.get("_cat/plugins");

assertThat(response.getStatusCode(), equalTo(RestStatus.OK.getStatus()));
assertThat(response.getBody(), containsString(JobSchedulerPlugin.class.getName()));
}
}

@Test
public void testCreateAJobAndWaitForCompletion() throws IOException {
SampleSecureJobParameter jobParameter = new SampleSecureJobParameter();
jobParameter.setJobName("sample-job-it");
jobParameter.setIndexToWatch("http-logs");
jobParameter.setSchedule(new IntervalSchedule(Instant.now(), 5, ChronoUnit.SECONDS));
jobParameter.setLockDurationSeconds(5L);
jobParameter.setEnabled(true);

// Creates a new watcher job.
String jobId = randomAlphaOfLength(10);
Map<String, String> params = getJobParameterAsMap(jobId, jobParameter);
try (TestRestClient client = cluster.getRestClient(USER_ADMIN)) {
client.put("http-logs");
HttpResponse response = client.post("_plugins/scheduler_sample/watch" + "?" + toUrlParams(params));

assertThat(response.getStatusCode(), equalTo(RestStatus.OK.getStatus()));

await().until(() -> {
HttpResponse countResponse = client.get("http-logs/_count");
return countResponse.getStatusCode() == RestStatus.OK.getStatus() && countResponse.getBody().contains("\"count\":1");
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.opensearch.Version;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.painless.PainlessModulePlugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.sample.SampleResourcePlugin;
import org.opensearch.security.OpenSearchSecurityPlugin;
Expand Down Expand Up @@ -47,7 +46,6 @@ public class SecurePluginTests {
.anonymousAuth(false)
.authc(AUTHC_DOMAIN)
.users(USER_ADMIN)
.plugin(PainlessModulePlugin.class)
.plugin(
new PluginInfo(
SampleResourcePlugin.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.opensearch.sample.resource.actions.transport.RevokeResourceAccessTransportAction;
import org.opensearch.sample.resource.actions.transport.ShareResourceTransportAction;
import org.opensearch.sample.resource.actions.transport.UpdateResourceTransportAction;
import org.opensearch.sample.scheduledjob.SampleSecureJobRestHandler;
import org.opensearch.sample.scheduledjob.SampleSecureJobRunner;
import org.opensearch.sample.secure.actions.rest.create.SecurePluginAction;
import org.opensearch.sample.secure.actions.rest.create.SecurePluginRestAction;
import org.opensearch.sample.secure.actions.transport.SecurePluginTransportAction;
Expand Down Expand Up @@ -89,6 +91,8 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.pluginClient = new PluginClient(client);
SampleSecureJobRunner jobRunner = SampleSecureJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
return List.of(pluginClient);
}

Expand All @@ -110,6 +114,7 @@ public List<RestHandler> getRestHandlers(

handlers.add(new ShareResourceRestAction());
handlers.add(new RevokeResourceAccessRestAction());
handlers.add(new SampleSecureJobRestHandler());
return handlers;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.opensearch.sample.scheduledjob;

import java.io.IOException;
import java.time.Instant;

import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;

public class SampleJobExtension implements JobSchedulerExtension {
@Override
public String getJobType() {
return "sample-secure-job";
}

@Override
public String getJobIndex() {
return ".sample-secure-job";
}

@Override
public ScheduledJobRunner getJobRunner() {
return SampleSecureJobRunner.getJobRunnerInstance();
}

@Override
public ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
SampleSecureJobParameter jobParameter = new SampleSecureJobParameter();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case SampleSecureJobParameter.NAME_FIELD:
jobParameter.setJobName(parser.text());
break;
case SampleSecureJobParameter.ENABLED_FILED:
jobParameter.setEnabled(parser.booleanValue());
break;
case SampleSecureJobParameter.ENABLED_TIME_FILED:
jobParameter.setEnabledTime(parseInstantValue(parser));
break;
case SampleSecureJobParameter.LAST_UPDATE_TIME_FIELD:
jobParameter.setLastUpdateTime(parseInstantValue(parser));
break;
case SampleSecureJobParameter.SCHEDULE_FIELD:
jobParameter.setSchedule(ScheduleParser.parse(parser));
break;
case SampleSecureJobParameter.INDEX_NAME_FIELD:
jobParameter.setIndexToWatch(parser.text());
break;
case SampleSecureJobParameter.LOCK_DURATION_SECONDS:
jobParameter.setLockDurationSeconds(parser.longValue());
break;
case SampleSecureJobParameter.JITTER:
jobParameter.setJitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return jobParameter;
};
}

private Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}
}
Loading
Loading