Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
61b6a6a
add job scheduler
ajleong623 Aug 12, 2025
ade1192
add job scheduler plugin
ajleong623 Aug 12, 2025
50319e4
fixed pairwise error
ajleong623 Aug 13, 2025
b554ea6
added actions for scheduling and deleting jobs, validations, and new …
ajleong623 Aug 19, 2025
8309d93
added initial draft of technical design
ajleong623 Aug 20, 2025
4b5340a
made changes based on small suggestions
ajleong623 Aug 21, 2025
7f6352d
Apply suggestions from code review
ajleong623 Aug 21, 2025
b73746a
made changes based on small suggestions
ajleong623 Aug 21, 2025
fd62b14
Revert "Apply suggestions from code review"
ajleong623 Aug 21, 2025
d3d053a
reapply changes from suggestion
ajleong623 Aug 21, 2025
d338bf4
add new persistent index and modified request url
ajleong623 Aug 27, 2025
bb3426d
still need to add integration tests
ajleong623 Aug 31, 2025
03725fa
finished all integration tests
ajleong623 Sep 1, 2025
efdc0c6
update gradle build file
ajleong623 Sep 1, 2025
95581cb
update design document
ajleong623 Sep 1, 2025
a547b68
update build file
ajleong623 Sep 1, 2025
fde4c04
update build.gradle
ajleong623 Sep 2, 2025
8a1efcb
yamlRestTest dependencies installed
ajleong623 Sep 2, 2025
827acdd
Merge branch 'opensearch-project:main' into job-scheduler
ajleong623 Sep 2, 2025
51de07a
add changelog line
ajleong623 Sep 2, 2025
5d12f02
worked on some suggestions
ajleong623 Sep 4, 2025
7aa0b44
add futures and timeout
ajleong623 Sep 4, 2025
be09cd9
update as described in comment
ajleong623 Sep 5, 2025
c8740c6
refactored experiment runner and updated scheduledRunId
ajleong623 Sep 6, 2025
a8b5bf7
marked new apis as experimental, updated documentation, and added mor…
ajleong623 Sep 7, 2025
52955a2
fix forbidden apis
ajleong623 Sep 8, 2025
56c1911
fix failing tests
ajleong623 Sep 8, 2025
df91219
scheduled experiment concurrency with timeout and cleanup is now ready
ajleong623 Sep 9, 2025
66b5489
added tests for concurrency, timeout mechanism, and also async timeou…
ajleong623 Sep 16, 2025
ed511ac
add more comments and documentations
ajleong623 Sep 17, 2025
031b97b
cleaned up deleted job scheduled
ajleong623 Sep 19, 2025
f19c375
added scheduled parameter to experiment and cleanup scheduled experim…
ajleong623 Sep 20, 2025
9941bd7
help fix forbidden apis
ajleong623 Sep 20, 2025
7aa1c32
retry tests
ajleong623 Sep 23, 2025
e2cc6c1
Merge branch 'opensearch-project:main' into job-scheduler
ajleong623 Sep 23, 2025
1a404f0
Couple of text tweaks...
epugh Sep 23, 2025
fc9ab7f
increase timeout to one hour for production
ajleong623 Sep 23, 2025
7ef0e60
Merge branch 'job-scheduler' of https://github.com/ajleong623/search-…
ajleong623 Sep 23, 2025
eaabfd1
fix timeout test
ajleong623 Sep 23, 2025
3e514e4
update action names
ajleong623 Sep 24, 2025
24852eb
reenable neural search
ajleong623 Sep 24, 2025
dbf5f73
update scheduled run id value
ajleong623 Oct 7, 2025
bfc13cc
reenable ml plugin
ajleong623 Oct 7, 2025
1a27b3d
cleanup unused constants
ajleong623 Oct 7, 2025
7cf94bf
Update src/main/java/org/opensearch/searchrelevance/dao/ScheduledJobs…
ajleong623 Oct 14, 2025
5082b5d
update comments
ajleong623 Oct 16, 2025
5a3fd52
add more detailed logs and handle concurrent updates to scheduled fut…
ajleong623 Oct 20, 2025
1555f26
do not block calling thread of runJob in SearchRelevanceJobRunner
ajleong623 Oct 20, 2025
b60acda
Merge branch 'opensearch-project:main' into job-scheduler
ajleong623 Oct 27, 2025
f26fc1e
finish testing and make adjustments based on comments
ajleong623 Oct 29, 2025
1f87a18
pass tests
ajleong623 Oct 29, 2025
ed61140
pass tests
ajleong623 Oct 29, 2025
81dbc13
clarify future map comments
ajleong623 Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ opensearchplugin {
classname "${packagePath}.${pathToPlugin}.${pluginClassName}"
licenseFile rootProject.file('LICENSE.txt')
noticeFile rootProject.file('NOTICE.txt')
extendedPlugins = ['opensearch-job-scheduler']
}

dependencyLicenses.enabled = false
Expand All @@ -195,6 +196,8 @@ dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
api "org.opensearch:opensearch:${opensearch_version}"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
compileOnly "org.opensearch:opensearch-job-scheduler:${opensearch_build}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the dependency on the non-SPI jar for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

implementation group: 'com.google.guava', name: 'guava', version:'33.4.8-jre'
compileOnly group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.18.0'
Expand Down Expand Up @@ -230,6 +233,7 @@ dependencies {
zipArchive group: 'org.opensearch.plugin', name:'neural-search', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ubi', version: "${opensearch_build}"
opensearchPlugin "org.opensearch.plugin:opensearch-security:${opensearch_build}@zip"
opensearchPlugin "org.opensearch.plugin:opensearch-job-scheduler:${opensearch_build}@zip"

configurations.all {
resolutionStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ private PluginConstants() {}
public static final String JUDGMENTS_URL = SEARCH_RELEVANCE_BASE_URI + "/judgments";
/** The URI for this plugin's search configurations rest actions */
public static final String SEARCH_CONFIGURATIONS_URL = SEARCH_RELEVANCE_BASE_URI + "/search_configurations";
/** The URI for this plugin's search configurations rest actions */
public static final String SCHEDULED_EXPERIMENT_URL = SEARCH_RELEVANCE_BASE_URI + "/scheduled_experiment";
/** The URI for initializing the UBI indices */
public static final String INITIALIZE_URL = "/_plugins/ubi/initialize";

Expand Down Expand Up @@ -53,6 +55,8 @@ private PluginConstants() {}
public static final String JUDGMENT_CACHE_INDEX_MAPPING = "mappings/judgment_cache.json";
public static final String EXPERIMENT_VARIANT_INDEX = "search-relevance-experiment-variant";
public static final String EXPERIMENT_VARIANT_INDEX_MAPPING = "mappings/experiment_variant.json";
public static final String SCHEDULED_JOBS_INDEX = ".scheduled-jobs";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why to use system index here ? does it store sensitive data ? or is it expected not to be visible to dashboards in the future ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I think originally, I did reuse a lot of the logic from the job-scheduler plugin, and the way the index was defined in the plugin was with a systems index, but we should change it here since there is an api call to return all scheduled jobs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the decision to use system index is not based on whether it stores sensitive data or not. Its whether it needs write protections in place to keep regular users from corrupting the index. It certainly may be paranoia as regular users shouldn't directly write to a system index, but its good practice to have a mechanism in place to enforce it.

public static final String SCHEDULED_JOBS_INDEX_MAPPING = "mappings/scheduled_job.json";

/**
* UBI
Expand Down Expand Up @@ -80,8 +84,10 @@ private PluginConstants() {}
public static final String SEARCH_PIPELINE = "searchPipeline";
public static final String SIZE = "size";
public static final String QUERYSET_ID = "querySetId";
public static final String EXPERIMENT_ID = "experiment_id";
public static final String SEARCH_CONFIGURATION_LIST = "searchConfigurationList";
public static final String JUDGMENT_LIST = "judgmentList";
public static final String CRON_EXPRESSION = "cron_expression";

public static final String JUDGMENT_RATINGS = "judgmentRatings";
public static final String CONTEXT_FIELDS = "contextFields";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.searchrelevance.dao;

import static org.opensearch.searchrelevance.indices.SearchRelevanceIndices.SCHEDULED_JOBS;

import java.io.IOException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.StepListener;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.searchrelevance.exception.SearchRelevanceException;
import org.opensearch.searchrelevance.indices.SearchRelevanceIndicesManager;
import org.opensearch.searchrelevance.model.ScheduledJob;

public class ScheduledJobsDao {
private static final Logger LOGGER = LogManager.getLogger(ScheduledJobsDao.class);
private final SearchRelevanceIndicesManager searchRelevanceIndicesManager;

public ScheduledJobsDao(SearchRelevanceIndicesManager searchRelevanceIndicesManager) {
this.searchRelevanceIndicesManager = searchRelevanceIndicesManager;
}

/**
* Create scheduled jobs index if not exists
* @param stepListener - step lister for async operation
*/
public void createIndexIfAbsent(final StepListener<Void> stepListener) {
searchRelevanceIndicesManager.createIndexIfAbsent(SCHEDULED_JOBS, stepListener);
}

/**
* Stores scheduled job to in the system index
* @param scheduledJob - Scheduled job content to be stored
* @param listener - action lister for async operation
*/
public void putScheduledJob(final ScheduledJob scheduledJob, final ActionListener listener) {
if (scheduledJob == null) {
listener.onFailure(new SearchRelevanceException("Scheduled job cannot be null", RestStatus.BAD_REQUEST));
return;
}
try {
searchRelevanceIndicesManager.putDoc(
scheduledJob.getId(),
scheduledJob.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS),
SCHEDULED_JOBS,
listener
);
} catch (IOException e) {
throw new SearchRelevanceException("Failed to store scheduled job", e, RestStatus.INTERNAL_SERVER_ERROR);
}
}

public void updateScheduledJob(final ScheduledJob scheduledJob, final ActionListener listener) {
if (scheduledJob == null) {
listener.onFailure(new SearchRelevanceException("Scheduled job cannot be null", RestStatus.BAD_REQUEST));
return;
}
try {
searchRelevanceIndicesManager.updateDoc(
scheduledJob.getId(),
scheduledJob.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS),
SCHEDULED_JOBS,
listener
);
} catch (IOException e) {
throw new SearchRelevanceException("Failed to store scheduled job", e, RestStatus.INTERNAL_SERVER_ERROR);
}
}

/**
* Delete scheduled job by scheduledJobId
* @param scheduledJobId - id to be deleted
* @param listener - action lister for async operation
*/
public void deleteScheduledJob(final String scheduledJobId, final ActionListener<DeleteResponse> listener) {
searchRelevanceIndicesManager.deleteDocByDocId(scheduledJobId, SCHEDULED_JOBS, listener);
}

/**
* Get scheduled job by scheduledJobId
* @param scheduledJobId - id to be deleted
* @param listener - action lister for async operation
*/
public SearchResponse getScheduledJob(String scheduledJobId, ActionListener<SearchResponse> listener) {
if (scheduledJobId == null || scheduledJobId.isEmpty()) {
listener.onFailure(new SearchRelevanceException("jobId must not be null or empty", RestStatus.BAD_REQUEST));
return null;
}
return searchRelevanceIndicesManager.getDocByDocId(scheduledJobId, SCHEDULED_JOBS, listener);
}

/**
* List scheduled jobs by source builder
* @param sourceBuilder - source builder to be searched
* @param listener - action lister for async operation
*/
public SearchResponse listScheduledJob(SearchSourceBuilder sourceBuilder, ActionListener<SearchResponse> listener) {
// Apply default values if not set
if (sourceBuilder == null) {
sourceBuilder = new SearchSourceBuilder();
}

// Ensure we have a query
if (sourceBuilder.query() == null) {
sourceBuilder.query(QueryBuilders.matchAllQuery());
}

return searchRelevanceIndicesManager.listDocsBySearchRequest(sourceBuilder, SCHEDULED_JOBS, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.opensearch.searchrelevance.common.PluginConstants.JUDGMENT_INDEX_MAPPING;
import static org.opensearch.searchrelevance.common.PluginConstants.QUERY_SET_INDEX;
import static org.opensearch.searchrelevance.common.PluginConstants.QUERY_SET_INDEX_MAPPING;
import static org.opensearch.searchrelevance.common.PluginConstants.SCHEDULED_JOBS_INDEX;
import static org.opensearch.searchrelevance.common.PluginConstants.SCHEDULED_JOBS_INDEX_MAPPING;
import static org.opensearch.searchrelevance.common.PluginConstants.SEARCH_CONFIGURATION_INDEX;
import static org.opensearch.searchrelevance.common.PluginConstants.SEARCH_CONFIGURATION_INDEX_MAPPING;
import static org.opensearch.searchrelevance.indices.SearchRelevanceIndicesManager.getIndexMappings;
Expand Down Expand Up @@ -66,7 +68,12 @@ public enum SearchRelevanceIndices {
/**
* Experiment Variant Index
*/
EXPERIMENT_VARIANT(EXPERIMENT_VARIANT_INDEX, EXPERIMENT_VARIANT_INDEX_MAPPING, false);
EXPERIMENT_VARIANT(EXPERIMENT_VARIANT_INDEX, EXPERIMENT_VARIANT_INDEX_MAPPING, false),

/**
* Scheduled Jobs Index
*/
SCHEDULED_JOBS(SCHEDULED_JOBS_INDEX, SCHEDULED_JOBS_INDEX_MAPPING, false);

private final String indexName;
private final String mapping;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.searchrelevance.model;

import java.io.IOException;
import java.time.Instant;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.jobscheduler.spi.schedule.Schedule;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public class ScheduledJob implements ToXContentObject {
public static final String ID = "id";
public static final String NAME_FIELD = "name";
public static final String ENABLED_FILED = "enabled";
public static final String LAST_UPDATE_TIME_FIELD = "last_update_time";
public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field";
public static final String SCHEDULE_FIELD = "schedule";
public static final String ENABLED_TIME_FILED = "enabled_time";
public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field";
public static final String INDEX_NAME_FIELD = "index_name_to_watch";
public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds";
public static final String JITTER = "jitter";
public static final String EXPERIMENT_ID = "experiment_id";

private final String id;
private final String jobName;
private final Instant lastUpdateTime;
private final Instant enabledTime;
private final boolean isEnabled;
private final Schedule schedule;
private final String indexToWatch;
private final Long lockDurationSeconds;
private final Double jitter;
private final String experimentId;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID, this.id)
.field(NAME_FIELD, this.jobName)
.field(ENABLED_FILED, this.isEnabled)
.field(SCHEDULE_FIELD, this.schedule)
.field(INDEX_NAME_FIELD, this.indexToWatch);
if (this.enabledTime != null) {
builder.timeField(ENABLED_TIME_FILED, ENABLED_TIME_FILED_READABLE, this.enabledTime.toEpochMilli());
}
if (this.lastUpdateTime != null) {
builder.timeField(LAST_UPDATE_TIME_FIELD, LAST_UPDATE_TIME_FIELD_READABLE, this.lastUpdateTime.toEpochMilli());
}
if (this.lockDurationSeconds != null) {
builder.field(LOCK_DURATION_SECONDS, this.lockDurationSeconds);
}
if (this.jitter != null) {
builder.field(JITTER, this.jitter);
}
if (this.experimentId != null) {
builder.field(EXPERIMENT_ID, this.experimentId);
}
builder.endObject();
return builder;
}
}
Loading
Loading