Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4665d1b
Support defining IntervalSchedule in seconds (#796)
cwperks Jul 11, 2025
5423a25
Add test that disables watcher job and verifies that it stops running…
cwperks Jul 16, 2025
5fd4335
Removing lock from the GetJobInfo Transport action and reformatting J…
Jeremydupras Jul 23, 2025
bddf6ce
Rest API to list all locks with option to get a specific lock (#802)
Jeremydupras Jul 30, 2025
a8c3c1e
Add release notes for 3.2.0 (#805)
opensearch-ci-bot Aug 6, 2025
dc7ec08
Improve Health Status of lock Index in Single Node Clusters (#807)
cwperks Aug 7, 2025
426e863
adding descheduled jobs the the Jobs API call (#806)
Jeremydupras Aug 7, 2025
780144d
Aggregate codecov across root + sub projects for cumulative report (#…
cwperks Aug 7, 2025
3f2c2d1
adding descheduled jobs
Aug 6, 2025
34bc09f
adding job history service
Aug 8, 2025
fbe99c2
adding settings
Aug 11, 2025
dca29fb
changing to Integer
Aug 11, 2025
d5e2292
adding tests, status fix
Aug 12, 2025
c7e60e8
changing history service to be integrated with lock service
Aug 13, 2025
b11c20a
reverting lockServiceIT
Aug 13, 2025
80350b7
spotlessapply and codecov
Aug 13, 2025
3f3f2a3
setting history Service default to false
Aug 13, 2025
159655e
fising test
Aug 13, 2025
7cdb73f
spi spotlessapply
Aug 13, 2025
9e1da54
codecov
Aug 13, 2025
3290b5a
addressing comments
Aug 14, 2025
14fad45
changing logger and threadpool (#812)
Jeremydupras Aug 11, 2025
1d660f8
rerun tests
Aug 14, 2025
48a13d8
rerun tests
Aug 14, 2025
9e9f156
commit
Aug 14, 2025
22a261f
updating tests
Aug 15, 2025
9450551
Increment version to 3.3.0-SNAPSHOT (#815)
opensearch-trigger-bot[bot] Aug 15, 2025
b6e8360
changes
Aug 14, 2025
26fbe60
changing default value
Aug 14, 2025
af5d13d
Trigger Tests
Aug 14, 2025
5f44c12
making changes
Aug 19, 2025
026f021
fixing tests and addressing comments
Aug 20, 2025
c0901ac
spotlessapply
Aug 20, 2025
c413031
removing job status from Job execution status
Aug 20, 2025
8ba572b
Merge branch 'main' into historyAPI-v2
Jeremydupras Aug 20, 2025
7c1e3f8
adding comma
Aug 21, 2025
53226c2
mergin main Merge branch 'main' into historyAPI-v2
Aug 21, 2025
d434666
removing empty line
Aug 21, 2025
d24bb63
adding tests, changing settings call, adding security setting
Aug 21, 2025
554a976
Merge branch 'opensearch-project:main' into historyAPI-v2
Jeremydupras Aug 21, 2025
f3a7a0e
spotlessapply
Aug 21, 2025
2e28028
adding additional test
Aug 22, 2025
91a7f9c
Merge branch 'main' into historyAPI-v2
cwperks Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class SampleJobRunnerRestIT extends SampleExtensionIntegTestCase {
public static final String LOCK_INFO_URI = "/_plugins/_job_scheduler/api/locks";
public static final String SCHEDULER_INFO_URI = "/_plugins/_job_scheduler/api/jobs?by_node";
public static final String SCHEDULER_INFO_URI_CLUSTER = "/_plugins/_job_scheduler/api/jobs";
public static final String HISTORY_INFO_URI = "/_plugins/_job_scheduler/api/history";

public void testJobCreateWithCorrectParams() throws IOException {
SampleJobParameter jobParameter = new SampleJobParameter();
Expand Down Expand Up @@ -211,6 +212,94 @@ public void testRunThenListJobs() throws Exception {
deleteWatcherJob(jobId);
}

public void testJobHistoryService() throws Exception {
String index = createTestIndex();
SampleJobParameter jobParameter = new SampleJobParameter();
jobParameter.setJobName("sample-job-lock-test-it");
jobParameter.setIndexToWatch(index);
// ensures that the next job tries to run even before the previous job finished & released its lock. Also look at
// SampleJobRunner.runTaskForLockIntegrationTests
jobParameter.setSchedule(new IntervalSchedule(Instant.now(), 5, ChronoUnit.SECONDS));
jobParameter.setLockDurationSeconds(10L);

// Creates a new watcher job.
String jobId = OpenSearchRestTestCase.randomAlphaOfLength(10);
createWatcherJob(jobId, jobParameter);

waitUntilLockIsAcquiredAndReleased(jobId);

Response response = makeRequest(client(), "GET", HISTORY_INFO_URI, Map.of(), null);
Map<String, Object> responseJson = parseResponse(response);

Assert.assertTrue("Response should contain total_history", responseJson.containsKey("total_history"));
Assert.assertTrue("Response should contain history", responseJson.containsKey("history"));

Integer totalHistory = (Integer) responseJson.get("total_history");
Assert.assertTrue("Total history should be greater than 0", totalHistory > 0);

Map<String, Object> history = (Map<String, Object>) responseJson.get("history");
Assert.assertFalse("History should not be empty", history.isEmpty());

for (Map.Entry<String, Object> entry : history.entrySet()) {
Map<String, Object> historyRecord = (Map<String, Object>) entry.getValue();
Assert.assertTrue("History record should contain job_index_name", historyRecord.containsKey("job_index_name"));
Assert.assertTrue("History record should contain job_id", historyRecord.containsKey("job_id"));
Assert.assertTrue("History record should contain start_time", historyRecord.containsKey("start_time"));
Assert.assertTrue("History record should contain completion_status", historyRecord.containsKey("completion_status"));
Assert.assertTrue("History record should contain end_time", historyRecord.containsKey("end_time"));

Assert.assertEquals("Job ID should match", jobId, historyRecord.get("job_id"));
Assert.assertEquals("Completion status should be 0", 0, historyRecord.get("completion_status"));
}

deleteWatcherJob(jobId);
}

public void testJobHistoryServiceById() throws Exception {
String index = createTestIndex();
SampleJobParameter jobParameter = new SampleJobParameter();
jobParameter.setJobName("sample-job-lock-test-it");
jobParameter.setIndexToWatch(index);
// ensures that the next job tries to run even before the previous job finished & released its lock. Also look at
// SampleJobRunner.runTaskForLockIntegrationTests
jobParameter.setSchedule(new IntervalSchedule(Instant.now(), 5, ChronoUnit.SECONDS));
jobParameter.setLockDurationSeconds(10L);

// Creates a new watcher job.
String jobId = OpenSearchRestTestCase.randomAlphaOfLength(10);
createWatcherJob(jobId, jobParameter);

waitUntilLockIsAcquiredAndReleased(jobId);

String historyIndexById = HISTORY_INFO_URI + "/.scheduler_sample_extension-" + jobId;

Response response = makeRequest(client(), "GET", historyIndexById, Map.of(), null);
Map<String, Object> responseJson = parseResponse(response);

Assert.assertTrue("Response should contain total_history", responseJson.containsKey("total_history"));
Assert.assertTrue("Response should contain history", responseJson.containsKey("history"));

Integer totalHistory = (Integer) responseJson.get("total_history");
Assert.assertTrue("Total history should be greater than 0", totalHistory > 0);

Map<String, Object> history = (Map<String, Object>) responseJson.get("history");
Assert.assertFalse("History should not be empty", history.isEmpty());

for (Map.Entry<String, Object> entry : history.entrySet()) {
Map<String, Object> historyRecord = (Map<String, Object>) entry.getValue();
Assert.assertTrue("History record should contain job_index_name", historyRecord.containsKey("job_index_name"));
Assert.assertTrue("History record should contain job_id", historyRecord.containsKey("job_id"));
Assert.assertTrue("History record should contain start_time", historyRecord.containsKey("start_time"));
Assert.assertTrue("History record should contain completion_status", historyRecord.containsKey("completion_status"));
Assert.assertTrue("History record should contain end_time", historyRecord.containsKey("end_time"));

Assert.assertEquals("Job ID should match", jobId, historyRecord.get("job_id"));
Assert.assertEquals("Completion status should be 0", 0, historyRecord.get("completion_status"));
}

deleteWatcherJob(jobId);
}

public void testAcquiredLockPreventExecOfTasks() throws Exception {
String index = createTestIndex();
SampleJobParameter jobParameter = new SampleJobParameter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import org.opensearch.jobscheduler.rest.action.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestGetScheduledInfoAction;
import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction;
import org.opensearch.jobscheduler.rest.action.RestGetHistoryAction;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.jobscheduler.transport.PluginClient;
import org.opensearch.jobscheduler.transport.action.GetAllLocksAction;
import org.opensearch.jobscheduler.transport.action.GetScheduledInfoAction;
import org.opensearch.jobscheduler.transport.action.TransportGetAllLocksAction;
import org.opensearch.jobscheduler.transport.action.TransportGetScheduledInfoAction;
import org.opensearch.jobscheduler.transport.action.TransportGetHistoryAction;
import org.opensearch.jobscheduler.transport.action.GetHistoryAction;
import org.opensearch.jobscheduler.scheduler.JobScheduler;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
Expand Down Expand Up @@ -89,6 +92,7 @@ public class JobSchedulerPlugin extends Plugin implements ActionPlugin, Extensib
private PluginClient pluginClient;

private JobDetailsService jobDetailsService;
private ClusterService clusterService;

public JobSchedulerPlugin() {
this.indicesToListen = new HashSet<>();
Expand All @@ -112,7 +116,7 @@ public Collection<Module> createGuiceModules() {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return List.of(
new SystemIndexDescriptor(LockServiceImpl.LOCK_INDEX_NAME, "Stores lock documents used for plugin job execution"),
new SystemIndexDescriptor(JobHistoryService.JOB_HISTORY_INDEX_NAME, "Stores history documents used for plugin job execution")
new SystemIndexDescriptor(JobHistoryService.JOB_HISTORY_INDEX_NAME, "Stores job execution history")
);
}

Expand All @@ -130,7 +134,7 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Supplier<Boolean> statusHistoryEnabled = () -> JobSchedulerSettings.STATUS_HISTORY.get(environment.settings());
Supplier<Boolean> statusHistoryEnabled = () -> clusterService.getClusterSettings().get(JobSchedulerSettings.STATUS_HISTORY);
this.pluginClient = new PluginClient(client);
this.historyService = new JobHistoryService(pluginClient, clusterService);
this.lockService = new LockServiceImpl(pluginClient, clusterService, historyService, statusHistoryEnabled);
Expand Down Expand Up @@ -272,20 +276,24 @@ public List getRestHandlers(
RestReleaseLockAction restReleaseLockAction = new RestReleaseLockAction(lockService);
RestGetScheduledInfoAction restGetScheduledInfoAction = new RestGetScheduledInfoAction();
RestGetLocksAction restGetAllLocksAction = new RestGetLocksAction();
RestGetHistoryAction restGetHistoryAction = new RestGetHistoryAction();
return List.of(
restGetJobDetailsAction,
restGetLockAction,
restReleaseLockAction,
restGetScheduledInfoAction,
restGetAllLocksAction
restGetAllLocksAction,
restGetHistoryAction
);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>(2);
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>(3);
actions.add(new ActionHandler<>(GetScheduledInfoAction.INSTANCE, TransportGetScheduledInfoAction.class));
actions.add(new ActionHandler<>(GetAllLocksAction.INSTANCE, TransportGetAllLocksAction.class));
actions.add(new ActionHandler<>(GetHistoryAction.INSTANCE, TransportGetHistoryAction.class));

return actions;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest.action;

import org.opensearch.transport.client.node.NodeClient;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.transport.action.GetHistoryAction;
import org.opensearch.jobscheduler.transport.request.GetHistoryRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.util.List;

import static org.opensearch.rest.RestRequest.Method.GET;

/**
* REST handler for getting job history
*/
public class RestGetHistoryAction extends BaseRestHandler {

@Override
public String getName() {
return "get_history_action";
}

@Override
public List<Route> routes() {
return List.of(
new Route(GET, JobSchedulerPlugin.JS_BASE_URI + "/api/history"),
new Route(GET, JobSchedulerPlugin.JS_BASE_URI + "/api/history/{history_id}")
);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
String history_id = request.param("history_id");
GetHistoryRequest getHistoryRequest = new GetHistoryRequest(history_id);
return channel -> client.execute(GetHistoryAction.INSTANCE, getHistoryRequest, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.transport.action;

import org.opensearch.action.ActionType;
import org.opensearch.jobscheduler.transport.response.GetHistoryResponse;

public class GetHistoryAction extends ActionType<GetHistoryResponse> {
public static final String NAME = "cluster:admin/opensearch/jobscheduler/history";
public static final GetHistoryAction INSTANCE = new GetHistoryAction();

private GetHistoryAction() {
super(NAME, GetHistoryResponse::new);
}
}
Loading
Loading