Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.schedule.CronSchedule;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
Expand Down Expand Up @@ -140,7 +141,7 @@ public SATIFSourceConfig(StreamInput sin) throws IOException {
Source.readFrom(sin), // source
sin.readOptionalInstant(), // enabled time
sin.readInstant(), // last update time
sin.readBoolean() ? new IntervalSchedule(sin) : null, // schedule
readSchedule(sin), // schedule
sin.readEnum(TIFJobState.class), // state
sin.readEnum(RefreshType.class), // refresh type
sin.readOptionalInstant(), // last refreshed time
Expand Down Expand Up @@ -174,10 +175,7 @@ public void writeTo(final StreamOutput out) throws IOException {
source.writeTo(out);
out.writeOptionalInstant(enabledTime);
out.writeInstant(lastUpdateTime);
out.writeBoolean(schedule != null);
if (schedule != null) {
schedule.writeTo(out);
}
writeSchedule(out, schedule);
out.writeEnum(state);
out.writeEnum(refreshType);
out.writeOptionalInstant(lastRefreshedTime);
Expand All @@ -200,6 +198,25 @@ public void writeTo(final StreamOutput out) throws IOException {
}
}

private static Schedule readSchedule(StreamInput sin) throws IOException {
boolean scheduleExists = sin.readBoolean();
String scheduleType = sin.readString();
if (!scheduleExists) {
return null;
}
return scheduleType.equals("interval") ? new IntervalSchedule(sin) : new CronSchedule(sin);
}

private static void writeSchedule(StreamOutput out, Schedule schedule) throws IOException {
out.writeBoolean(schedule != null);
if (schedule != null) {
out.writeString(schedule instanceof IntervalSchedule ? "interval" : "cron");
schedule.writeTo(out);
} else {
out.writeString(null);
}
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.schedule.CronSchedule;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
Expand Down Expand Up @@ -146,6 +147,25 @@ public SATIFSourceConfigDto(String id, Long version, String name, String format,
this.enabledForScan = enabledForScan;
}

private static Schedule readSchedule(StreamInput sin) throws IOException {
boolean scheduleExists = sin.readBoolean();
String scheduleType = sin.readString();
if (!scheduleExists) {
return null;
}
return scheduleType.equals("interval") ? new IntervalSchedule(sin) : new CronSchedule(sin);
}

private static void writeSchedule(StreamOutput out, Schedule schedule) throws IOException {
out.writeBoolean(schedule != null);
if (schedule != null) {
out.writeString(schedule instanceof IntervalSchedule ? "interval" : "cron");
schedule.writeTo(out);
} else {
out.writeString(null);
}
}

public SATIFSourceConfigDto(StreamInput sin) throws IOException {
this(
sin.readString(), // id
Expand All @@ -159,7 +179,7 @@ public SATIFSourceConfigDto(StreamInput sin) throws IOException {
Source.readFrom(sin), // source
sin.readOptionalInstant(), // enabled time
sin.readInstant(), // last update time
sin.readBoolean() ? new IntervalSchedule(sin) : null, // schedule
readSchedule(sin), // schedule
sin.readEnum(TIFJobState.class), // state
sin.readEnum(RefreshType.class), // refresh type
sin.readOptionalInstant(), // last refreshed time
Expand Down Expand Up @@ -191,10 +211,7 @@ public void writeTo(final StreamOutput out) throws IOException {
source.writeTo(out);
out.writeOptionalInstant(enabledTime);
out.writeInstant(lastUpdateTime);
out.writeBoolean(schedule != null);
if (schedule != null) {
schedule.writeTo(out);
}
writeSchedule(out, schedule);
out.writeEnum(state);
out.writeEnum(refreshType);
out.writeOptionalInstant(lastRefreshedTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest;
import org.opensearch.securityanalytics.threatIntel.common.TIFJobState;
Expand All @@ -34,7 +35,7 @@

import static org.opensearch.common.time.DateUtils.toInstant;

public class TIFJobParameter implements Writeable, ScheduledJobParameter {
public class TIFJobParameter implements ScheduledJobParameter {
/**
* Prefix of indices having threatIntel data
*/
Expand Down Expand Up @@ -103,7 +104,7 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter {
* @param schedule Schedule that system uses
* @return Schedule that system uses
*/
private IntervalSchedule schedule;
private Schedule schedule;


/**
Expand Down Expand Up @@ -134,7 +135,7 @@ public static TIFJobParameter parseFromParser(XContentParser xcp, String id, Lon
Boolean isEnabled = null;
TIFJobState state = null;
Instant enabledTime = null;
IntervalSchedule schedule = null;
Schedule schedule = null;
List<String> indices = new ArrayList<>();
UpdateStats updateStats = null;

Expand Down Expand Up @@ -163,7 +164,7 @@ public static TIFJobParameter parseFromParser(XContentParser xcp, String id, Lon
isEnabled = xcp.booleanValue();
break;
case SCHEDULE_FIELD:
schedule = (IntervalSchedule) ScheduleParser.parse(xcp);
schedule = ScheduleParser.parse(xcp);
break;
case STATE_FIELD:
state = toState(xcp.text());
Expand Down Expand Up @@ -293,7 +294,7 @@ public TIFJobParameter() {
}

public TIFJobParameter(final String name, final Instant lastUpdateTime, final Instant enabledTime, final Boolean isEnabled,
final IntervalSchedule schedule, final TIFJobState state,
final Schedule schedule, final TIFJobState state,
final List<String> indices, final UpdateStats updateStats) {
this.name = name;
this.lastUpdateTime = lastUpdateTime;
Expand All @@ -305,7 +306,7 @@ public TIFJobParameter(final String name, final Instant lastUpdateTime, final In
this.updateStats = updateStats;
}

public TIFJobParameter(final String name, final IntervalSchedule schedule) {
public TIFJobParameter(final String name, final Schedule schedule) {
this(
name,
Instant.now().truncatedTo(ChronoUnit.MILLIS),
Expand All @@ -318,28 +319,6 @@ public TIFJobParameter(final String name, final IntervalSchedule schedule) {
);
}

public TIFJobParameter(final StreamInput in) throws IOException {
name = in.readString();
lastUpdateTime = toInstant(in.readVLong());
enabledTime = toInstant(in.readOptionalVLong());
isEnabled = in.readBoolean();
schedule = new IntervalSchedule(in);
state = TIFJobState.valueOf(in.readString());
indices = in.readStringList();
updateStats = new UpdateStats(in);
}

public void writeTo(final StreamOutput out) throws IOException {
out.writeString(name);
out.writeVLong(lastUpdateTime.toEpochMilli());
out.writeOptionalVLong(enabledTime == null ? null : enabledTime.toEpochMilli());
out.writeBoolean(isEnabled);
schedule.writeTo(out);
out.writeString(state.name());
out.writeStringCollection(indices);
updateStats.writeTo(out);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -398,7 +377,7 @@ public Instant getEnabledTime() {
}

@Override
public IntervalSchedule getSchedule() {
public Schedule getSchedule() {
return this.schedule;
}

Expand Down Expand Up @@ -435,7 +414,7 @@ public void disable() {
isEnabled = false;
}

public void setSchedule(IntervalSchedule schedule) {
public void setSchedule(Schedule schedule) {
this.schedule = schedule;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.securityanalytics.threatIntel.sacommons;

import org.opensearch.commons.authuser.User;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.securityanalytics.threatIntel.common.SourceConfigType;
import org.opensearch.securityanalytics.threatIntel.common.TIFJobState;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.securityanalytics.threatIntel.sacommons;

import org.opensearch.commons.authuser.User;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.securityanalytics.threatIntel.common.SourceConfigType;
import org.opensearch.securityanalytics.threatIntel.common.TIFJobState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.CronSchedule;
import java.time.ZoneId;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.common.TIFJobState;
Expand Down Expand Up @@ -156,13 +158,19 @@ protected TIFJobParameter randomTifJobParameter(final Instant updateStartTime) {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
TIFJobParameter tifJobParameter = new TIFJobParameter();
tifJobParameter.setName(TestHelpers.randomLowerCaseString());
tifJobParameter.setSchedule(
new IntervalSchedule(
updateStartTime.truncatedTo(ChronoUnit.MILLIS),
1,
ChronoUnit.DAYS
)
);
if (Randomness.get().nextBoolean()) {
tifJobParameter.setSchedule(
new IntervalSchedule(
updateStartTime.truncatedTo(ChronoUnit.MILLIS),
1,
ChronoUnit.DAYS
)
);
} else {
tifJobParameter.setSchedule(
new CronSchedule("0 0 0 * * ?", ZoneId.systemDefault())
);
}
tifJobParameter.setState(randomState());
tifJobParameter.setIndices(Arrays.asList(TestHelpers.randomLowerCaseString(), TestHelpers.randomLowerCaseString()));
tifJobParameter.getUpdateStats().setLastSkippedAt(now);
Expand Down
Loading