Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Construct dummy request for doc level monitor fanout request on serde failure #806

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@ package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Monitor.Companion.NO_VERSION
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.seqno.SequenceNumbers
import java.io.IOException
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.UUID

class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
val monitor: Monitor
Expand All @@ -37,7 +45,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
indexExecutionContext: IndexExecutionContext?,
shardIds: List<ShardId>,
concreteIndicesSeenSoFar: List<String>,
workflowRunContext: WorkflowRunContext?
workflowRunContext: WorkflowRunContext?,
) : super() {
this.monitor = monitor
this.dryRun = dryRun
Expand All @@ -52,16 +60,75 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitor = Monitor.readFrom(sin)!!,
dryRun = sin.readBoolean(),
monitorMetadata = MonitorMetadata.readFrom(sin),
executionId = sin.readString(),
shardIds = sin.readList(::ShardId),
concreteIndicesSeenSoFar = sin.readStringList(),
workflowRunContext = if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else { null },
indexExecutionContext = IndexExecutionContext(sin)
monitor = try {
Monitor.readFrom(sin)!!
} catch (e: Exception) {
Monitor(
"failed_serde",
NO_VERSION,
"failed_serde",
true,
IntervalSchedule(1, ChronoUnit.MINUTES),
Instant.now(),
Instant.now(),
"",
null,
NO_SCHEMA_VERSION,
emptyList(),
emptyList(),
emptyMap(),
DataSources(),
false,
false,
"failed"
)
},
dryRun = try {
sin.readBoolean()
} catch (e: Exception) {
false
},
monitorMetadata = try {
MonitorMetadata.readFrom(sin)
} catch (e: Exception) {
MonitorMetadata(
"failed_serde", SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
"failed_serde", emptyList(), emptyMap(),
mutableMapOf()
)
},
executionId = try {
sin.readString()
} catch (e: Exception) {
""
},
shardIds = try {
sin.readList(::ShardId)
} catch (e: Exception) {
listOf(ShardId("failed_serde", "failed_serde",999999))// to circumvent an isEmpty() check in constructor
},
concreteIndicesSeenSoFar = try {
sin.readStringList()
} catch (e: Exception) {
emptyList()
},
workflowRunContext = try {
if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else {
null
}
} catch (e: Exception) {
null
},
indexExecutionContext = try {
IndexExecutionContext(sin)
} catch (e: Exception) {
IndexExecutionContext(
emptyList(), mutableMapOf(), mutableMapOf(), "failed_serde", "failed_serde", emptyList(), emptyList(), emptyList(),
emptyList(), emptyList()
)
}
)

@Throws(IOException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.commons.alerting.action

import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.model.ActionExecutionTime
Expand Down Expand Up @@ -151,4 +152,64 @@ class DocLevelMonitorFanOutRequestTests {
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
}

@Test
fun `test serde failure returning dummy object instead of exception`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
val monitorMetadata = MonitorMetadata(
"test",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
Monitor.NO_ID,
listOf(ActionExecutionTime("", Instant.now())),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
)
val indexExecutionContext = IndexExecutionContext(
listOf(docQuery),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("index" to mutableMapOf("1" to "1")),
"test-index",
"test-index",
listOf("test-index"),
listOf("test-index"),
listOf("test-field"),
listOf("1", "2")
)
val workflowRunContext = WorkflowRunContext(
Workflow.NO_ID,
Workflow.NO_ID,
Monitor.NO_ID,
mutableMapOf("index" to listOf("1")),
true,
listOf("finding1")
)
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
monitor,
false,
monitorMetadata,
UUID.randomUUID().toString(),
indexExecutionContext,
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
listOf("test-index"),
workflowRunContext
)
val out = BytesStreamOutput()
monitor.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
assertTrue(newDocLevelMonitorFanOutRequest.shardIds.isNotEmpty())
assertTrue(newDocLevelMonitorFanOutRequest.shardIds.size == 1)
assertTrue(newDocLevelMonitorFanOutRequest.shardIds[0].indexName == "failed_serde")
assertTrue(newDocLevelMonitorFanOutRequest.shardIds[0].id == 999999)
}
}
Loading