From df759726b624151c92b537eb17b6dcadfd9863fe Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Fri, 17 Oct 2025 10:34:39 -0400 Subject: [PATCH 1/2] Support CRON expressions for scheduling Signed-off-by: Craig Perkins --- .../threatIntel/model/TIFJobParameter.java | 15 +++++++------ .../threatIntel/ThreatIntelTestCase.java | 22 +++++++++++++------ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java index 6e74b3b5a..5b6d4f50c 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java @@ -19,6 +19,7 @@ import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; import org.opensearch.securityanalytics.threatIntel.common.TIFJobState; @@ -103,7 +104,7 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter { * @param schedule Schedule that system uses * @return Schedule that system uses */ - private IntervalSchedule schedule; + private Schedule schedule; /** @@ -134,7 +135,7 @@ public static TIFJobParameter parseFromParser(XContentParser xcp, String id, Lon Boolean isEnabled = null; TIFJobState state = null; Instant enabledTime = null; - IntervalSchedule schedule = null; + Schedule schedule = null; List indices = new ArrayList<>(); UpdateStats updateStats = null; @@ -163,7 +164,7 @@ public static TIFJobParameter parseFromParser(XContentParser xcp, String id, Lon isEnabled = xcp.booleanValue(); break; case SCHEDULE_FIELD: - schedule = (IntervalSchedule) ScheduleParser.parse(xcp); + schedule = ScheduleParser.parse(xcp); break; case STATE_FIELD: state = toState(xcp.text()); @@ -293,7 +294,7 @@ public TIFJobParameter() { } public TIFJobParameter(final String name, final Instant lastUpdateTime, final Instant enabledTime, final Boolean isEnabled, - final IntervalSchedule schedule, final TIFJobState state, + final Schedule schedule, final TIFJobState state, final List indices, final UpdateStats updateStats) { this.name = name; this.lastUpdateTime = lastUpdateTime; @@ -305,7 +306,7 @@ public TIFJobParameter(final String name, final Instant lastUpdateTime, final In this.updateStats = updateStats; } - public TIFJobParameter(final String name, final IntervalSchedule schedule) { + public TIFJobParameter(final String name, final Schedule schedule) { this( name, Instant.now().truncatedTo(ChronoUnit.MILLIS), @@ -398,7 +399,7 @@ public Instant getEnabledTime() { } @Override - public IntervalSchedule getSchedule() { + public Schedule getSchedule() { return this.schedule; } @@ -435,7 +436,7 @@ public void disable() { isEnabled = false; } - public void setSchedule(IntervalSchedule schedule) { + public void setSchedule(Schedule schedule) { this.schedule = schedule; } diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelTestCase.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelTestCase.java index 9e0474617..08d1a5216 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelTestCase.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelTestCase.java @@ -26,6 +26,8 @@ import org.opensearch.ingest.IngestService; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.CronSchedule; +import java.time.ZoneId; import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; import org.opensearch.securityanalytics.threatIntel.common.TIFJobState; @@ -156,13 +158,19 @@ protected TIFJobParameter randomTifJobParameter(final Instant updateStartTime) { Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); TIFJobParameter tifJobParameter = new TIFJobParameter(); tifJobParameter.setName(TestHelpers.randomLowerCaseString()); - tifJobParameter.setSchedule( - new IntervalSchedule( - updateStartTime.truncatedTo(ChronoUnit.MILLIS), - 1, - ChronoUnit.DAYS - ) - ); + if (Randomness.get().nextBoolean()) { + tifJobParameter.setSchedule( + new IntervalSchedule( + updateStartTime.truncatedTo(ChronoUnit.MILLIS), + 1, + ChronoUnit.DAYS + ) + ); + } else { + tifJobParameter.setSchedule( + new CronSchedule("0 0 0 * * ?", ZoneId.systemDefault()) + ); + } tifJobParameter.setState(randomState()); tifJobParameter.setIndices(Arrays.asList(TestHelpers.randomLowerCaseString(), TestHelpers.randomLowerCaseString())); tifJobParameter.getUpdateStats().setLastSkippedAt(now); From 5dde7800a5d8330edb6eaa8b553338f02094a97a Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Fri, 17 Oct 2025 11:57:01 -0400 Subject: [PATCH 2/2] Update more areas Signed-off-by: Craig Perkins --- .../threatIntel/model/SATIFSourceConfig.java | 27 +++++++++++++++---- .../model/SATIFSourceConfigDto.java | 27 +++++++++++++++---- .../threatIntel/model/TIFJobParameter.java | 24 +---------------- .../sacommons/TIFSourceConfig.java | 1 - .../sacommons/TIFSourceConfigDto.java | 1 - 5 files changed, 45 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/SATIFSourceConfig.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/SATIFSourceConfig.java index 605474512..3a3ea19fb 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/SATIFSourceConfig.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/SATIFSourceConfig.java @@ -19,6 +19,7 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.CronSchedule; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; @@ -140,7 +141,7 @@ public SATIFSourceConfig(StreamInput sin) throws IOException { Source.readFrom(sin), // source sin.readOptionalInstant(), // enabled time sin.readInstant(), // last update time - sin.readBoolean() ? new IntervalSchedule(sin) : null, // schedule + readSchedule(sin), // schedule sin.readEnum(TIFJobState.class), // state sin.readEnum(RefreshType.class), // refresh type sin.readOptionalInstant(), // last refreshed time @@ -174,10 +175,7 @@ public void writeTo(final StreamOutput out) throws IOException { source.writeTo(out); out.writeOptionalInstant(enabledTime); out.writeInstant(lastUpdateTime); - out.writeBoolean(schedule != null); - if (schedule != null) { - schedule.writeTo(out); - } + writeSchedule(out, schedule); out.writeEnum(state); out.writeEnum(refreshType); out.writeOptionalInstant(lastRefreshedTime); @@ -200,6 +198,25 @@ public void writeTo(final StreamOutput out) throws IOException { } } + private static Schedule readSchedule(StreamInput sin) throws IOException { + boolean scheduleExists = sin.readBoolean(); + String scheduleType = sin.readString(); + if (!scheduleExists) { + return null; + } + return scheduleType.equals("interval") ? new IntervalSchedule(sin) : new CronSchedule(sin); + } + + private static void writeSchedule(StreamOutput out, Schedule schedule) throws IOException { + out.writeBoolean(schedule != null); + if (schedule != null) { + out.writeString(schedule instanceof IntervalSchedule ? "interval" : "cron"); + schedule.writeTo(out); + } else { + out.writeString(null); + } + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject() diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/SATIFSourceConfigDto.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/SATIFSourceConfigDto.java index 2fbf8d517..f238f9c02 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/SATIFSourceConfigDto.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/SATIFSourceConfigDto.java @@ -21,6 +21,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.jobscheduler.spi.schedule.CronSchedule; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; @@ -146,6 +147,25 @@ public SATIFSourceConfigDto(String id, Long version, String name, String format, this.enabledForScan = enabledForScan; } + private static Schedule readSchedule(StreamInput sin) throws IOException { + boolean scheduleExists = sin.readBoolean(); + String scheduleType = sin.readString(); + if (!scheduleExists) { + return null; + } + return scheduleType.equals("interval") ? new IntervalSchedule(sin) : new CronSchedule(sin); + } + + private static void writeSchedule(StreamOutput out, Schedule schedule) throws IOException { + out.writeBoolean(schedule != null); + if (schedule != null) { + out.writeString(schedule instanceof IntervalSchedule ? "interval" : "cron"); + schedule.writeTo(out); + } else { + out.writeString(null); + } + } + public SATIFSourceConfigDto(StreamInput sin) throws IOException { this( sin.readString(), // id @@ -159,7 +179,7 @@ public SATIFSourceConfigDto(StreamInput sin) throws IOException { Source.readFrom(sin), // source sin.readOptionalInstant(), // enabled time sin.readInstant(), // last update time - sin.readBoolean() ? new IntervalSchedule(sin) : null, // schedule + readSchedule(sin), // schedule sin.readEnum(TIFJobState.class), // state sin.readEnum(RefreshType.class), // refresh type sin.readOptionalInstant(), // last refreshed time @@ -191,10 +211,7 @@ public void writeTo(final StreamOutput out) throws IOException { source.writeTo(out); out.writeOptionalInstant(enabledTime); out.writeInstant(lastUpdateTime); - out.writeBoolean(schedule != null); - if (schedule != null) { - schedule.writeTo(out); - } + writeSchedule(out, schedule); out.writeEnum(state); out.writeEnum(refreshType); out.writeOptionalInstant(lastRefreshedTime); diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java index 5b6d4f50c..85c4f41a4 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java @@ -35,7 +35,7 @@ import static org.opensearch.common.time.DateUtils.toInstant; -public class TIFJobParameter implements Writeable, ScheduledJobParameter { +public class TIFJobParameter implements ScheduledJobParameter { /** * Prefix of indices having threatIntel data */ @@ -319,28 +319,6 @@ public TIFJobParameter(final String name, final Schedule schedule) { ); } - public TIFJobParameter(final StreamInput in) throws IOException { - name = in.readString(); - lastUpdateTime = toInstant(in.readVLong()); - enabledTime = toInstant(in.readOptionalVLong()); - isEnabled = in.readBoolean(); - schedule = new IntervalSchedule(in); - state = TIFJobState.valueOf(in.readString()); - indices = in.readStringList(); - updateStats = new UpdateStats(in); - } - - public void writeTo(final StreamOutput out) throws IOException { - out.writeString(name); - out.writeVLong(lastUpdateTime.toEpochMilli()); - out.writeOptionalVLong(enabledTime == null ? null : enabledTime.toEpochMilli()); - out.writeBoolean(isEnabled); - schedule.writeTo(out); - out.writeString(state.name()); - out.writeStringCollection(indices); - updateStats.writeTo(out); - } - @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/sacommons/TIFSourceConfig.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/sacommons/TIFSourceConfig.java index dae00034a..f79131da9 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/sacommons/TIFSourceConfig.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/sacommons/TIFSourceConfig.java @@ -1,7 +1,6 @@ package org.opensearch.securityanalytics.threatIntel.sacommons; import org.opensearch.commons.authuser.User; -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.securityanalytics.threatIntel.common.SourceConfigType; import org.opensearch.securityanalytics.threatIntel.common.TIFJobState; diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/sacommons/TIFSourceConfigDto.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/sacommons/TIFSourceConfigDto.java index 776b0c1b4..a7958d42f 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/sacommons/TIFSourceConfigDto.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/sacommons/TIFSourceConfigDto.java @@ -1,7 +1,6 @@ package org.opensearch.securityanalytics.threatIntel.sacommons; import org.opensearch.commons.authuser.User; -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.securityanalytics.threatIntel.common.SourceConfigType; import org.opensearch.securityanalytics.threatIntel.common.TIFJobState;