diff --git a/build.gradle b/build.gradle index 450be32e..fbe8a99e 100644 --- a/build.gradle +++ b/build.gradle @@ -241,11 +241,34 @@ repositories { dependencies { implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow') + implementation ("org.opensearch:opensearch-remote-metadata-sdk-ddb-client:${opensearch_build}") + implementation "org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}" testImplementation group: 'org.mockito', name: 'mockito-core', version: "${versions.mockito}" opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip" } +def commonResolutionStrategy = { + force "org.slf4j:slf4j-api:${versions.slf4j}" + force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" + force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}" + force "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" + force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}" + force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" + force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" + force "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}" + force "commons-codec:commons-codec:${versions.commonscodec}" + force "org.apache.httpcomponents:httpcore:${versions.httpcore}" + force "jakarta.json:jakarta.json-api:2.1.3" + force "commons-logging:commons-logging:${versions.commonslogging}" + force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}" +} + +configurations { + runtimeClasspath.resolutionStrategy(commonResolutionStrategy) + testRuntimeClasspath.resolutionStrategy(commonResolutionStrategy) +} + // RPM & Debian build apply plugin: 'com.netflix.nebula.ospackage' @@ -294,9 +317,9 @@ integTest { } } Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin"); -integTest.getClusters().forEach{c -> { +integTest.getClusters().forEach{ c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile())) -}} +} testClusters.integTest { testDistribution = 'INTEG_TEST' diff --git a/sample-extension-plugin/build.gradle b/sample-extension-plugin/build.gradle index b4ffd0f0..7c02b632 100644 --- a/sample-extension-plugin/build.gradle +++ b/sample-extension-plugin/build.gradle @@ -111,10 +111,10 @@ integTest { Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin"); Zip rootBundle = (Zip) rootProject.getTasks().getByName("bundlePlugin"); -integTest.getClusters().forEach{c -> { +integTest.getClusters().forEach{c -> c.plugin(rootProject.getObjects().fileProperty().value(rootBundle.getArchiveFile())) c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile())) -}} +} testClusters.integTest { testDistribution = 'INTEG_TEST' diff --git a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java index 2ad38f94..82998700 100644 --- a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java +++ b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java @@ -56,6 +56,8 @@ import org.opensearch.plugins.IdentityAwarePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SystemIndexPlugin; +import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.remote.metadata.client.impl.SdkClientFactory; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.script.ScriptService; @@ -70,14 +72,23 @@ import java.util.Map; import java.util.Set; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.ArrayList; import java.util.function.Supplier; +import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.TENANT_ID_FIELD_KEY; + public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, IdentityAwarePlugin { public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler"; public static final String JS_BASE_URI = "/_plugins/_job_scheduler"; + public static final String TENANT_ID_FIELD = "tenant_id"; private static final Logger log = LogManager.getLogger(JobSchedulerPlugin.class); private JobSweeper sweeper; @@ -87,6 +98,7 @@ public class JobSchedulerPlugin extends Plugin implements ActionPlugin, Extensib private Map indexToJobProviders; private Set indicesToListen; private PluginClient pluginClient; + private SdkClient sdkClient; private JobDetailsService jobDetailsService; @@ -130,10 +142,36 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - Supplier statusHistoryEnabled = () -> JobSchedulerSettings.STATUS_HISTORY.get(environment.settings()); + Settings settings = environment.settings(); + Boolean isMultiTenancyEnabled = JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED.get(settings); this.pluginClient = new PluginClient(client); + + // Initialize SDK client for remote metadata storage + this.sdkClient = SdkClientFactory.createSdkClient( + pluginClient, + xContentRegistry, + isMultiTenancyEnabled + ? Map.ofEntries( + Map.entry(REMOTE_METADATA_TYPE_KEY, JobSchedulerSettings.REMOTE_METADATA_TYPE.get(settings)), + Map.entry(REMOTE_METADATA_ENDPOINT_KEY, JobSchedulerSettings.REMOTE_METADATA_ENDPOINT.get(settings)), + Map.entry(REMOTE_METADATA_REGION_KEY, JobSchedulerSettings.REMOTE_METADATA_REGION.get(settings)), + Map.entry(REMOTE_METADATA_SERVICE_NAME_KEY, JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME.get(settings)), + Map.entry(TENANT_AWARE_KEY, "true"), + Map.entry(TENANT_ID_FIELD_KEY, TENANT_ID_FIELD) + ) + : Collections.emptyMap() + ); + + Supplier statusHistoryEnabled = () -> JobSchedulerSettings.STATUS_HISTORY.get(environment.settings()); this.historyService = new JobHistoryService(pluginClient, clusterService); - this.lockService = new LockServiceImpl(pluginClient, clusterService, historyService, statusHistoryEnabled); + this.lockService = new LockServiceImpl( + pluginClient, + clusterService, + historyService, + statusHistoryEnabled, + this.sdkClient, + isMultiTenancyEnabled + ); this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders); this.scheduler = new JobScheduler(threadPool, this.lockService); this.sweeper = initSweeper( @@ -149,7 +187,7 @@ public Collection createComponents( clusterService.addListener(this.sweeper); clusterService.addLifecycleListener(this.sweeper); - return List.of(this.lockService, this.scheduler, this.jobDetailsService, this.pluginClient); + return List.of(this.lockService, this.scheduler, this.jobDetailsService, this.pluginClient, this.sdkClient); } @Override @@ -168,6 +206,11 @@ public List> getSettings() { settingList.add(JobSchedulerSettings.SWEEP_PERIOD); settingList.add(JobSchedulerSettings.JITTER_LIMIT); settingList.add(JobSchedulerSettings.STATUS_HISTORY); + settingList.add(JobSchedulerSettings.REMOTE_METADATA_TYPE); + settingList.add(JobSchedulerSettings.REMOTE_METADATA_ENDPOINT); + settingList.add(JobSchedulerSettings.REMOTE_METADATA_REGION); + settingList.add(JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME); + settingList.add(JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED); return settingList; } diff --git a/src/main/java/org/opensearch/jobscheduler/JobSchedulerSettings.java b/src/main/java/org/opensearch/jobscheduler/JobSchedulerSettings.java index 0a5ed5dd..b4187509 100644 --- a/src/main/java/org/opensearch/jobscheduler/JobSchedulerSettings.java +++ b/src/main/java/org/opensearch/jobscheduler/JobSchedulerSettings.java @@ -11,6 +11,12 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; +import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY; +import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY; + public class JobSchedulerSettings { public static final Setting REQUEST_TIMEOUT = Setting.positiveTimeSetting( "plugins.jobscheduler.request_timeout", @@ -60,4 +66,40 @@ public class JobSchedulerSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ); + + /** This setting sets the remote metadata type */ + public static final Setting REMOTE_METADATA_TYPE = Setting.simpleString( + "plugins.jobscheduler." + REMOTE_METADATA_TYPE_KEY, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + /** This setting sets the remote metadata endpoint */ + public static final Setting REMOTE_METADATA_ENDPOINT = Setting.simpleString( + "plugins.jobscheduler." + REMOTE_METADATA_ENDPOINT_KEY, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + /** This setting sets the remote metadata region */ + public static final Setting REMOTE_METADATA_REGION = Setting.simpleString( + "plugins.jobscheduler." + REMOTE_METADATA_REGION_KEY, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + /** This setting sets the remote metadata service name */ + public static final Setting REMOTE_METADATA_SERVICE_NAME = Setting.simpleString( + "plugins.jobscheduler." + REMOTE_METADATA_SERVICE_NAME_KEY, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + /** This setting enables multi-tenancy for job scheduler */ + public static final Setting JOB_SCHEDULER_MULTI_TENANCY_ENABLED = Setting.boolSetting( + "plugins.jobscheduler." + TENANT_AWARE_KEY, + false, + Setting.Property.NodeScope, + Setting.Property.Final + ); } diff --git a/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java b/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java index 7da56923..6d07a4ce 100644 --- a/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java +++ b/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java @@ -398,7 +398,7 @@ public void processJobDetails( /** * Create Job details entry - * @param tempJobDetails new job details object that need to be inserted as document in the index= + * @param tempJobDetails new job details object that need to be inserted as document in the index * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was created * or else null. */ diff --git a/src/main/java/org/opensearch/jobscheduler/utils/LockServiceImpl.java b/src/main/java/org/opensearch/jobscheduler/utils/LockServiceImpl.java index 7af9c586..a90bf447 100644 --- a/src/main/java/org/opensearch/jobscheduler/utils/LockServiceImpl.java +++ b/src/main/java/org/opensearch/jobscheduler/utils/LockServiceImpl.java @@ -8,7 +8,6 @@ */ package org.opensearch.jobscheduler.utils; -import org.opensearch.action.support.WriteRequest; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; @@ -18,16 +17,14 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.delete.DeleteRequest; -import org.opensearch.action.get.GetRequest; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.update.UpdateResponse; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexNotFoundException; @@ -36,6 +33,12 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.transport.client.Client; +import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.remote.metadata.common.SdkClientUtils; +import org.opensearch.remote.metadata.client.PutDataObjectRequest; +import org.opensearch.remote.metadata.client.GetDataObjectRequest; +import org.opensearch.remote.metadata.client.UpdateDataObjectRequest; +import org.opensearch.remote.metadata.client.DeleteDataObjectRequest; import java.io.BufferedReader; import java.io.IOException; @@ -55,6 +58,8 @@ public class LockServiceImpl implements LockService { final static Map INDEX_SETTINGS = Map.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-1"); private final JobHistoryService historyService; private final Supplier statusHistoryEnabled; + private final SdkClient sdkClient; + private final Boolean isMultiTenancyEnabled; // This is used in tests to control time. private Instant testInstant = null; @@ -63,19 +68,25 @@ public LockServiceImpl( final Client client, final ClusterService clusterService, JobHistoryService historyService, - Supplier statusHistoryEnabled + Supplier statusHistoryEnabled, + SdkClient sdkClient, + Boolean isMultiTenancyEnabled ) { this.client = client; this.clusterService = clusterService; this.historyService = historyService; this.statusHistoryEnabled = statusHistoryEnabled; + this.sdkClient = sdkClient; + this.isMultiTenancyEnabled = isMultiTenancyEnabled; } - public LockServiceImpl(final Client client, final ClusterService clusterService) { + public LockServiceImpl(final Client client, final ClusterService clusterService, SdkClient sdkClient, Boolean isMultiTenancyEnabled) { this.client = client; this.clusterService = clusterService; this.historyService = null; this.statusHistoryEnabled = () -> false; + this.sdkClient = sdkClient; + this.isMultiTenancyEnabled = isMultiTenancyEnabled; } private String lockMapping() { @@ -98,7 +109,8 @@ public boolean lockIndexExist() { @VisibleForTesting void createLockIndex(ActionListener listener) { - if (lockIndexExist()) { + // When multi-tenancy is enabled, we assume remote index or DDB table is pre-created prior to JS plugin being started. + if (this.isMultiTenancyEnabled || lockIndexExist()) { listener.onResponse(true); } else { final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping(), (MediaType) XContentType.JSON) @@ -213,90 +225,111 @@ private boolean isLockReleasedOrExpired(final LockModel lock) { } private void updateLock(final LockModel updateLock, ActionListener listener) { + // Use SdkClient with UpdateDataObjectRequest try { - UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME) + UpdateDataObjectRequest updateDataObjectRequest = UpdateDataObjectRequest.builder() + .index(LOCK_INDEX_NAME) .id(updateLock.getLockId()) - .setIfSeqNo(updateLock.getSeqNo()) - .setIfPrimaryTerm(updateLock.getPrimaryTerm()) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .doc(updateLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .fetchSource(true); - - client.update( - updateRequest, - ActionListener.wrap( - response -> listener.onResponse(new LockModel(updateLock, response.getSeqNo(), response.getPrimaryTerm())), - exception -> { - if (exception instanceof VersionConflictEngineException) { - logger.debug("could not acquire lock {}", exception.getMessage()); - } - if (exception instanceof DocumentMissingException) { - logger.debug( - "Document is deleted. This happens if the job is already removed and" + " this is the last run." + "{}", - exception.getMessage() - ); - } - if (exception instanceof IOException) { - logger.error("IOException occurred updating lock.", exception); - } - listener.onResponse(null); - } - ) - ); - } catch (IOException e) { - logger.error("IOException occurred updating lock.", e); + .ifSeqNo(updateLock.getSeqNo()) + .ifPrimaryTerm(updateLock.getPrimaryTerm()) + .dataObject(updateLock) + .build(); + + sdkClient.updateDataObjectAsync(updateDataObjectRequest).thenAccept(response -> { + UpdateResponse updateResponse = response.updateResponse(); + if (updateResponse != null) { + listener.onResponse(new LockModel(updateLock, updateResponse.getSeqNo(), updateResponse.getPrimaryTerm())); + } else { + listener.onResponse(null); + } + }).exceptionally(throwable -> { + Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); + if (cause instanceof VersionConflictEngineException) { + logger.debug("could not acquire lock {}", cause.getMessage()); + } + if (cause instanceof DocumentMissingException) { + logger.debug( + "Document is deleted. This happens if the job is already removed and" + " this is the last run." + "{}", + cause.getMessage() + ); + } + if (cause instanceof IOException) { + logger.error("IOException occurred updating lock.", cause); + } + listener.onResponse(null); + return null; + }); + + } catch (Exception e) { + logger.error("Exception occurred updating lock.", e); listener.onResponse(null); } } private void createLock(final LockModel tempLock, ActionListener listener) { try { - final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId()) - .source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) - .setIfPrimaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .create(true); - client.index( - request, - ActionListener.wrap( - response -> listener.onResponse(new LockModel(tempLock, response.getSeqNo(), response.getPrimaryTerm())), - exception -> { - if (exception instanceof VersionConflictEngineException) { - logger.debug("Lock is already created. {}", exception.getMessage()); - listener.onResponse(null); - return; - } - listener.onFailure(exception); - } - ) - ); - } catch (IOException e) { - logger.error("IOException occurred creating lock", e); + PutDataObjectRequest putDataObjectRequest = PutDataObjectRequest.builder() + .index(LOCK_INDEX_NAME) + .id(tempLock.getLockId()) + .ifSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) + .ifPrimaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + .dataObject(tempLock) + .build(); + + sdkClient.putDataObjectAsync(putDataObjectRequest).thenAccept(response -> { + IndexResponse indexResponse = response.indexResponse(); + if (indexResponse != null) { + listener.onResponse(new LockModel(tempLock, indexResponse.getSeqNo(), indexResponse.getPrimaryTerm())); + } else { + listener.onResponse(null); + } + }).exceptionally(throwable -> { + Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); + if (cause instanceof VersionConflictEngineException) { + logger.debug("Lock is already created. {}", cause.getMessage()); + listener.onResponse(null); + } else { + listener.onFailure(cause); + } + return null; + }); + + } catch (Exception e) { + logger.error("Exception occurred creating lock.", e); listener.onFailure(e); } } public void findLock(final String lockId, ActionListener listener) { - GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); - client.get(getRequest, ActionListener.wrap(response -> { - if (!response.isExists()) { - listener.onResponse(null); - } else { - try { - XContentParser parser = XContentType.JSON.xContent() - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); - parser.nextToken(); - listener.onResponse(LockModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); - } catch (IOException e) { - logger.error("IOException occurred finding lock", e); + try { + GetDataObjectRequest getDataObjectRequest = GetDataObjectRequest.builder().index(LOCK_INDEX_NAME).id(lockId).build(); + + sdkClient.getDataObjectAsync(getDataObjectRequest).thenAccept(response -> { + GetResponse getResponse = response.getResponse(); + if (getResponse == null || !getResponse.isExists()) { listener.onResponse(null); + } else { + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()); + parser.nextToken(); + listener.onResponse(LockModel.parse(parser, getResponse.getSeqNo(), getResponse.getPrimaryTerm())); + } catch (IOException e) { + logger.error("IOException occurred parsing GetResponse.", e); + listener.onResponse(null); + } } - } - }, exception -> { - logger.error("Exception occurred finding lock", exception); - listener.onFailure(exception); - })); + }).exceptionally(throwable -> { + Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); + logger.error("Exception occurred finding lock", cause); + listener.onFailure(cause); + return null; + }); + + } catch (Exception e) { + logger.error("Exception occurred finding lock.", e); + listener.onFailure(e); + } } /** @@ -339,19 +372,34 @@ public void release(final LockModel lock, ActionListener listener) { * or not the delete was successful */ public void deleteLock(final String lockId, ActionListener listener) { - DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId); - client.delete(deleteRequest, ActionListener.wrap(response -> { - listener.onResponse( - response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND - ); - }, exception -> { - if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { - logger.debug("Index is not found to delete lock. {}", exception.getMessage()); - listener.onResponse(true); - } else { - listener.onFailure(exception); - } - })); + try { + DeleteDataObjectRequest deleteDataObjectRequest = DeleteDataObjectRequest.builder().index(LOCK_INDEX_NAME).id(lockId).build(); + + sdkClient.deleteDataObjectAsync(deleteDataObjectRequest).thenAccept(response -> { + DeleteResponse deleteResponse = response.deleteResponse(); + if (deleteResponse != null) { + listener.onResponse( + deleteResponse.getResult() == DocWriteResponse.Result.DELETED + || deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND + ); + } else { + listener.onResponse(false); + } + }).exceptionally(throwable -> { + Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); + if (cause instanceof IndexNotFoundException || cause.getCause() instanceof IndexNotFoundException) { + logger.debug("Index is not found to delete lock. {}", cause.getMessage()); + listener.onResponse(true); + } else { + listener.onFailure(cause); + } + return null; + }); + + } catch (Exception e) { + logger.error("Exception occurred deleting lock.", e); + listener.onFailure(e); + } } /** diff --git a/src/test/java/org/opensearch/jobscheduler/JobSchedulerPluginTests.java b/src/test/java/org/opensearch/jobscheduler/JobSchedulerPluginTests.java index 0ee4b816..5768db96 100644 --- a/src/test/java/org/opensearch/jobscheduler/JobSchedulerPluginTests.java +++ b/src/test/java/org/opensearch/jobscheduler/JobSchedulerPluginTests.java @@ -137,7 +137,7 @@ public void testLoadExtensions() { public void testGetSettings_returnsSettingsList() { List> settings = plugin.getSettings(); assertNotNull(settings); - assertEquals(13, settings.size()); + assertEquals(18, settings.size()); assertTrue(settings.contains(LegacyOpenDistroJobSchedulerSettings.SWEEP_PAGE_SIZE)); assertTrue(settings.contains(LegacyOpenDistroJobSchedulerSettings.REQUEST_TIMEOUT)); assertTrue(settings.contains(LegacyOpenDistroJobSchedulerSettings.SWEEP_BACKOFF_MILLIS)); diff --git a/src/test/java/org/opensearch/jobscheduler/JobSchedulerSettingsTests.java b/src/test/java/org/opensearch/jobscheduler/JobSchedulerSettingsTests.java index d0642d35..b3295dbb 100644 --- a/src/test/java/org/opensearch/jobscheduler/JobSchedulerSettingsTests.java +++ b/src/test/java/org/opensearch/jobscheduler/JobSchedulerSettingsTests.java @@ -17,6 +17,8 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; +import static org.opensearch.remote.metadata.common.CommonValue.AWS_DYNAMO_DB; + @SuppressWarnings({ "rawtypes" }) public class JobSchedulerSettingsTests extends OpenSearchTestCase { @@ -101,4 +103,46 @@ public void testSettingsGetValueWithLegacyFallback() { LegacyOpenDistroJobSchedulerSettings.JITTER_LIMIT } ); } + + public void testRemoteMetadataSettingsReturned() { + List> settings = plugin.getSettings(); + assertTrue( + "remote metadata settings must be returned from settings", + settings.containsAll( + Arrays.asList( + JobSchedulerSettings.REMOTE_METADATA_TYPE, + JobSchedulerSettings.REMOTE_METADATA_ENDPOINT, + JobSchedulerSettings.REMOTE_METADATA_REGION, + JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME, + JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED + ) + ) + ); + } + + public void testRemoteMetadataSettingsDefaults() { + Settings settings = Settings.EMPTY; + + assertEquals("", JobSchedulerSettings.REMOTE_METADATA_TYPE.get(settings)); + assertEquals("", JobSchedulerSettings.REMOTE_METADATA_ENDPOINT.get(settings)); + assertEquals("", JobSchedulerSettings.REMOTE_METADATA_REGION.get(settings)); + assertEquals("", JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME.get(settings)); + assertFalse(JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED.get(settings)); + } + + public void testRemoteMetadataSettingsValues() { + Settings settings = Settings.builder() + .put("plugins.jobscheduler.remote_metadata_type", AWS_DYNAMO_DB) + .put("plugins.jobscheduler.remote_metadata_endpoint", "https://dynamodb.us-east-1.amazonaws.com") + .put("plugins.jobscheduler.remote_metadata_region", "us-east-1") + .put("plugins.jobscheduler.remote_metadata_service_name", "es") + .put("plugins.jobscheduler.tenant_aware", true) + .build(); + + assertEquals(AWS_DYNAMO_DB, JobSchedulerSettings.REMOTE_METADATA_TYPE.get(settings)); + assertEquals("https://dynamodb.us-east-1.amazonaws.com", JobSchedulerSettings.REMOTE_METADATA_ENDPOINT.get(settings)); + assertEquals("us-east-1", JobSchedulerSettings.REMOTE_METADATA_REGION.get(settings)); + assertEquals("es", JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME.get(settings)); + assertTrue(JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED.get(settings)); + } } diff --git a/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetLockActionTests.java b/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetLockActionTests.java index a8788e19..bb2a0960 100644 --- a/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetLockActionTests.java +++ b/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetLockActionTests.java @@ -30,6 +30,7 @@ import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.utils.LockServiceImpl; import org.opensearch.jobscheduler.transport.AcquireLockRequest; +import org.opensearch.remote.metadata.client.SdkClient; import org.opensearch.rest.RestHandler; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.rest.RestRequest; @@ -54,7 +55,7 @@ public void setUp() throws Exception { super.setUp(); this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS); Mockito.when(this.clusterService.state().routingTable().hasIndex(".opendistro-job-scheduler-lock")).thenReturn(true); - this.lockService = new LockServiceImpl(Mockito.mock(NodeClient.class), clusterService); + this.lockService = new LockServiceImpl(Mockito.mock(NodeClient.class), clusterService, Mockito.mock(SdkClient.class), false); this.getLockAction = new RestGetLockAction(this.lockService); this.getLockPath = String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_lock"); this.testJobId = "testJobId"; diff --git a/src/test/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockActionTests.java b/src/test/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockActionTests.java index 70a928e8..23ca1695 100644 --- a/src/test/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockActionTests.java +++ b/src/test/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockActionTests.java @@ -20,6 +20,7 @@ import org.opensearch.jobscheduler.JobSchedulerPlugin; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.utils.LockServiceImpl; +import org.opensearch.remote.metadata.client.SdkClient; import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -45,7 +46,7 @@ public void setUp() throws Exception { super.setUp(); this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS); this.client = Mockito.mock(Client.class); - this.lockService = new LockServiceImpl(client, clusterService); + this.lockService = new LockServiceImpl(client, clusterService, Mockito.mock(SdkClient.class), false); restReleaseLockAction = new RestReleaseLockAction(this.lockService); this.releaseLockPath = String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", LockModel.LOCK_ID); diff --git a/src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java b/src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java index a2bc405f..9742507c 100644 --- a/src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java +++ b/src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java @@ -45,6 +45,7 @@ import org.opensearch.index.mapper.ParseContext; import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.remote.metadata.client.SdkClient; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.Scheduler; @@ -127,7 +128,7 @@ public void setup() throws IOException { xContentRegistry, jobProviderMap, scheduler, - new LockServiceImpl(client, clusterService), + new LockServiceImpl(client, clusterService, Mockito.mock(SdkClient.class), false), jobDetailsService ); } diff --git a/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java b/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java index afd39c74..c4dcf35e 100644 --- a/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java +++ b/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java @@ -45,6 +45,7 @@ import org.opensearch.jobscheduler.transport.response.JobParameterResponse; import org.opensearch.jobscheduler.transport.request.JobRunnerRequest; import org.opensearch.jobscheduler.transport.response.JobRunnerResponse; +import org.opensearch.remote.metadata.client.SdkClient; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.jobscheduler.ScheduledJobProvider; @@ -324,7 +325,7 @@ private void compareExtensionJobParameters( public void testJobRunnerExtensionJobActionRequest() throws IOException { - LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, Mockito.mock(SdkClient.class), false); JobExecutionContext jobExecutionContext = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0), diff --git a/src/test/java/org/opensearch/jobscheduler/utils/LockServiceIT.java b/src/test/java/org/opensearch/jobscheduler/utils/LockServiceIT.java index 30df5258..665aa266 100644 --- a/src/test/java/org/opensearch/jobscheduler/utils/LockServiceIT.java +++ b/src/test/java/org/opensearch/jobscheduler/utils/LockServiceIT.java @@ -13,6 +13,7 @@ import org.mockito.Mockito; import org.opensearch.core.action.ActionListener; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.jobscheduler.spi.JobDocVersion; import org.opensearch.jobscheduler.spi.JobExecutionContext; @@ -20,12 +21,15 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.remote.metadata.client.impl.SdkClientFactory; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -79,6 +83,7 @@ public Long getLockDurationSeconds() { }; private ClusterService clusterService; + private SdkClient sdkClient; @Before public void setup() { @@ -89,12 +94,13 @@ public void setup() { Mockito.when(this.clusterService.state().routingTable().hasIndex(".opendistro-job-scheduler-lock")) .thenReturn(false) .thenReturn(true); + this.sdkClient = SdkClientFactory.createSdkClient(client(), NamedXContentRegistry.EMPTY, Collections.emptyMap()); } public void testSanity() throws Exception { String uniqSuffix = "_sanity"; CountDownLatch latch = new CountDownLatch(1); - LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); final JobExecutionContext context = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0), @@ -132,7 +138,7 @@ public void testSanityWithCustomLockID() throws Exception { String lockID = "sanity_test_lock"; String uniqSuffix = "_sanity"; CountDownLatch latch = new CountDownLatch(1); - LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); final JobExecutionContext context = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0), @@ -166,7 +172,7 @@ public void testSecondAcquireLockFail() throws Exception { String uniqSuffix = "_second_acquire"; String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); - LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); final JobExecutionContext context = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0), @@ -195,7 +201,7 @@ public void testAcquireLockWithLongIdFail() throws Exception { String uniqSuffix = "_long_lock_id"; String lockID = randomAlphaOfLengthBetween(513, 1000); CountDownLatch latch = new CountDownLatch(1); - LockService lockService = new LockServiceImpl(client(), this.clusterService); + LockService lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); final JobExecutionContext context = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0), @@ -217,7 +223,7 @@ public void testLockReleasedAndAcquired() throws Exception { String uniqSuffix = "_lock_release+acquire"; String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); - LockService lockService = new LockServiceImpl(client(), this.clusterService); + LockService lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); final JobExecutionContext context = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0), @@ -249,7 +255,7 @@ public void testLockExpired() throws Exception { String uniqSuffix = "_lock_expire"; String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); - LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); // Set lock time in the past. lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS))); final JobExecutionContext context = new JobExecutionContext( @@ -283,7 +289,7 @@ public void testLockExpired() throws Exception { public void testDeleteLockWithOutIndexCreation() throws Exception { CountDownLatch latch = new CountDownLatch(1); - LockService lockService = new LockServiceImpl(client(), this.clusterService); + LockService lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); lockService.deleteLock("NonExistingLockId", ActionListener.wrap(deleted -> { assertTrue("Failed to delete lock.", deleted); latch.countDown(); @@ -293,7 +299,7 @@ public void testDeleteLockWithOutIndexCreation() throws Exception { public void testDeleteNonExistingLock() throws Exception { CountDownLatch latch = new CountDownLatch(1); - LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); lockService.createLockIndex(ActionListener.wrap(created -> { if (created) { lockService.deleteLock("NonExistingLockId", ActionListener.wrap(deleted -> { @@ -315,7 +321,7 @@ public void testMultiThreadCreateLock() throws Exception { String uniqSuffix = "_multi_thread_create"; String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); - final LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + final LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); final JobExecutionContext context = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0), @@ -378,7 +384,7 @@ public void testMultiThreadAcquireLock() throws Exception { String uniqSuffix = "_multi_thread_acquire"; String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); - final LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + final LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); final JobExecutionContext context = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0), @@ -445,7 +451,7 @@ public void testRenewLock() throws Exception { String uniqSuffix = "_lock_renew"; String lockID = randomAlphaOfLengthBetween(6, 15); CountDownLatch latch = new CountDownLatch(1); - LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService); + LockServiceImpl lockService = new LockServiceImpl(client(), this.clusterService, sdkClient, false); final JobExecutionContext context = new JobExecutionContext( Instant.now(), new JobDocVersion(0, 0, 0),