diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt index fe5cfe29..21ddfffb 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -7,17 +7,25 @@ package org.opensearch.commons.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.IndexExecutionContext +import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Monitor.Companion.NO_VERSION import org.opensearch.commons.alerting.model.MonitorMetadata import org.opensearch.commons.alerting.model.WorkflowRunContext +import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.index.shard.ShardId import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.ToXContentObject import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.seqno.SequenceNumbers import java.io.IOException +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.UUID class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val monitor: Monitor @@ -37,7 +45,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { indexExecutionContext: IndexExecutionContext?, shardIds: List, concreteIndicesSeenSoFar: List, - workflowRunContext: WorkflowRunContext? + workflowRunContext: WorkflowRunContext?, ) : super() { this.monitor = monitor this.dryRun = dryRun @@ -52,16 +60,75 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { @Throws(IOException::class) constructor(sin: StreamInput) : this( - monitor = Monitor.readFrom(sin)!!, - dryRun = sin.readBoolean(), - monitorMetadata = MonitorMetadata.readFrom(sin), - executionId = sin.readString(), - shardIds = sin.readList(::ShardId), - concreteIndicesSeenSoFar = sin.readStringList(), - workflowRunContext = if (sin.readBoolean()) { - WorkflowRunContext(sin) - } else { null }, - indexExecutionContext = IndexExecutionContext(sin) + monitor = try { + Monitor.readFrom(sin)!! + } catch (e: Exception) { + Monitor( + "failed_serde", + NO_VERSION, + "failed_serde", + true, + IntervalSchedule(1, ChronoUnit.MINUTES), + Instant.now(), + Instant.now(), + "", + null, + NO_SCHEMA_VERSION, + emptyList(), + emptyList(), + emptyMap(), + DataSources(), + false, + false, + "failed" + ) + }, + dryRun = try { + sin.readBoolean() + } catch (e: Exception) { + false + }, + monitorMetadata = try { + MonitorMetadata.readFrom(sin) + } catch (e: Exception) { + MonitorMetadata( + "failed_serde", SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + "failed_serde", emptyList(), emptyMap(), + mutableMapOf() + ) + }, + executionId = try { + sin.readString() + } catch (e: Exception) { + "" + }, + shardIds = try { + sin.readList(::ShardId) + } catch (e: Exception) { + listOf(ShardId("failed_serde", "failed_serde",999999))// to circumvent an isEmpty() check in constructor + }, + concreteIndicesSeenSoFar = try { + sin.readStringList() + } catch (e: Exception) { + emptyList() + }, + workflowRunContext = try { + if (sin.readBoolean()) { + WorkflowRunContext(sin) + } else { + null + } + } catch (e: Exception) { + null + }, + indexExecutionContext = try { + IndexExecutionContext(sin) + } catch (e: Exception) { + IndexExecutionContext( + emptyList(), mutableMapOf(), mutableMapOf(), "failed_serde", "failed_serde", emptyList(), emptyList(), emptyList(), + emptyList(), emptyList() + ) + } ) @Throws(IOException::class) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt index 1ef82f18..5320351a 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -6,6 +6,7 @@ package org.opensearch.commons.alerting.action import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue import org.junit.jupiter.api.Test import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.commons.alerting.model.ActionExecutionTime @@ -151,4 +152,64 @@ class DocLevelMonitorFanOutRequestTests { assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) } + + @Test + fun `test serde failure returning dummy object instead of exception`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("return true")) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + val monitorMetadata = MonitorMetadata( + "test", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + Monitor.NO_ID, + listOf(ActionExecutionTime("", Instant.now())), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001") + ) + val indexExecutionContext = IndexExecutionContext( + listOf(docQuery), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("index" to mutableMapOf("1" to "1")), + "test-index", + "test-index", + listOf("test-index"), + listOf("test-index"), + listOf("test-field"), + listOf("1", "2") + ) + val workflowRunContext = WorkflowRunContext( + Workflow.NO_ID, + Workflow.NO_ID, + Monitor.NO_ID, + mutableMapOf("index" to listOf("1")), + true, + listOf("finding1") + ) + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + monitor, + false, + monitorMetadata, + UUID.randomUUID().toString(), + indexExecutionContext, + listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)), + listOf("test-index"), + workflowRunContext + ) + val out = BytesStreamOutput() + monitor.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) + assertTrue(newDocLevelMonitorFanOutRequest.shardIds.isNotEmpty()) + assertTrue(newDocLevelMonitorFanOutRequest.shardIds.size == 1) + assertTrue(newDocLevelMonitorFanOutRequest.shardIds[0].indexName == "failed_serde") + assertTrue(newDocLevelMonitorFanOutRequest.shardIds[0].id == 999999) + } }