Skip to content

Commit a215e02

Browse files
committed
construct dummy request for doc level monitor fanout request on serde failure
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent c9c0747 commit a215e02

File tree

2 files changed

+152
-12
lines changed

2 files changed

+152
-12
lines changed

src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt

+91-12
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,24 @@ package org.opensearch.commons.alerting.action
77

88
import org.opensearch.action.ActionRequest
99
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.commons.alerting.model.DataSources
1011
import org.opensearch.commons.alerting.model.IndexExecutionContext
12+
import org.opensearch.commons.alerting.model.IntervalSchedule
1113
import org.opensearch.commons.alerting.model.Monitor
14+
import org.opensearch.commons.alerting.model.Monitor.Companion.NO_VERSION
1215
import org.opensearch.commons.alerting.model.MonitorMetadata
1316
import org.opensearch.commons.alerting.model.WorkflowRunContext
17+
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
1418
import org.opensearch.core.common.io.stream.StreamInput
1519
import org.opensearch.core.common.io.stream.StreamOutput
1620
import org.opensearch.core.index.shard.ShardId
1721
import org.opensearch.core.xcontent.ToXContent
1822
import org.opensearch.core.xcontent.ToXContentObject
1923
import org.opensearch.core.xcontent.XContentBuilder
24+
import org.opensearch.index.seqno.SequenceNumbers
2025
import java.io.IOException
26+
import java.time.Instant
27+
import java.time.temporal.ChronoUnit
2128

2229
class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
2330
val monitor: Monitor
@@ -29,6 +36,80 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
2936
val concreteIndicesSeenSoFar: List<String>
3037
val workflowRunContext: WorkflowRunContext?
3138

39+
companion object {
40+
private fun safeReadMonitor(sin: StreamInput): Monitor =
41+
try {
42+
Monitor.readFrom(sin)!!
43+
} catch (e: Exception) {
44+
Monitor(
45+
"failed_serde", NO_VERSION, "failed_serde", true,
46+
IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "",
47+
null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(),
48+
DataSources(), false, false, "failed"
49+
)
50+
}
51+
52+
private fun safeReadBoolean(sin: StreamInput): Boolean =
53+
try {
54+
sin.readBoolean()
55+
} catch (e: Exception) {
56+
false
57+
}
58+
59+
private fun safeReadMonitorMetadata(sin: StreamInput): MonitorMetadata =
60+
try {
61+
MonitorMetadata.readFrom(sin)
62+
} catch (e: Exception) {
63+
MonitorMetadata(
64+
"failed_serde",
65+
SequenceNumbers.UNASSIGNED_SEQ_NO,
66+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
67+
"failed_serde",
68+
emptyList(),
69+
emptyMap(),
70+
mutableMapOf()
71+
)
72+
}
73+
74+
private fun safeReadString(sin: StreamInput): String =
75+
try {
76+
sin.readString()
77+
} catch (e: Exception) {
78+
""
79+
}
80+
81+
private fun safeReadShardIds(sin: StreamInput): List<ShardId> =
82+
try {
83+
sin.readList(::ShardId)
84+
} catch (e: Exception) {
85+
listOf(ShardId("failed_serde", "failed_serde", 999999))
86+
}
87+
88+
private fun safeReadStringList(sin: StreamInput): List<String> =
89+
try {
90+
sin.readStringList()
91+
} catch (e: Exception) {
92+
emptyList()
93+
}
94+
95+
private fun safeReadWorkflowRunContext(sin: StreamInput): WorkflowRunContext? =
96+
try {
97+
if (sin.readBoolean()) WorkflowRunContext(sin) else null
98+
} catch (e: Exception) {
99+
null
100+
}
101+
102+
private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext =
103+
try {
104+
IndexExecutionContext(sin)
105+
} catch (e: Exception) {
106+
IndexExecutionContext(
107+
emptyList(), mutableMapOf(), mutableMapOf(), "failed_serde", "failed_serde",
108+
emptyList(), emptyList(), emptyList(), emptyList(), emptyList()
109+
)
110+
}
111+
}
112+
32113
constructor(
33114
monitor: Monitor,
34115
dryRun: Boolean,
@@ -52,16 +133,14 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
52133

53134
@Throws(IOException::class)
54135
constructor(sin: StreamInput) : this(
55-
monitor = Monitor.readFrom(sin)!!,
56-
dryRun = sin.readBoolean(),
57-
monitorMetadata = MonitorMetadata.readFrom(sin),
58-
executionId = sin.readString(),
59-
shardIds = sin.readList(::ShardId),
60-
concreteIndicesSeenSoFar = sin.readStringList(),
61-
workflowRunContext = if (sin.readBoolean()) {
62-
WorkflowRunContext(sin)
63-
} else { null },
64-
indexExecutionContext = IndexExecutionContext(sin)
136+
monitor = safeReadMonitor(sin),
137+
dryRun = safeReadBoolean(sin),
138+
monitorMetadata = safeReadMonitorMetadata(sin),
139+
executionId = safeReadString(sin),
140+
shardIds = safeReadShardIds(sin),
141+
concreteIndicesSeenSoFar = safeReadStringList(sin),
142+
workflowRunContext = safeReadWorkflowRunContext(sin),
143+
indexExecutionContext = safeReadIndexExecutionContext(sin)
65144
)
66145

67146
@Throws(IOException::class)
@@ -88,14 +167,14 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
88167

89168
@Throws(IOException::class)
90169
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
91-
builder.startObject()
170+
return builder.startObject()
92171
.field("monitor", monitor)
93172
.field("dry_run", dryRun)
94173
.field("execution_id", executionId)
95174
.field("index_execution_context", indexExecutionContext)
96175
.field("shard_ids", shardIds)
97176
.field("concrete_indices", concreteIndicesSeenSoFar)
98177
.field("workflow_run_context", workflowRunContext)
99-
return builder.endObject()
178+
.endObject()
100179
}
101180
}

src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt

+61
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.commons.alerting.action
77

88
import org.junit.Assert.assertEquals
9+
import org.junit.Assert.assertTrue
910
import org.junit.jupiter.api.Test
1011
import org.opensearch.common.io.stream.BytesStreamOutput
1112
import org.opensearch.commons.alerting.model.ActionExecutionTime
@@ -151,4 +152,64 @@ class DocLevelMonitorFanOutRequestTests {
151152
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
152153
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
153154
}
155+
156+
@Test
157+
fun `test serde failure returning dummy object instead of exception`() {
158+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
159+
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))
160+
161+
val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
162+
val monitor = randomDocumentLevelMonitor(
163+
inputs = listOf(docLevelInput),
164+
triggers = listOf(trigger),
165+
enabled = true,
166+
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
167+
)
168+
val monitorMetadata = MonitorMetadata(
169+
"test",
170+
SequenceNumbers.UNASSIGNED_SEQ_NO,
171+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
172+
Monitor.NO_ID,
173+
listOf(ActionExecutionTime("", Instant.now())),
174+
mutableMapOf("index" to mutableMapOf("1" to "1")),
175+
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
176+
)
177+
val indexExecutionContext = IndexExecutionContext(
178+
listOf(docQuery),
179+
mutableMapOf("index" to mutableMapOf("1" to "1")),
180+
mutableMapOf("index" to mutableMapOf("1" to "1")),
181+
"test-index",
182+
"test-index",
183+
listOf("test-index"),
184+
listOf("test-index"),
185+
listOf("test-field"),
186+
listOf("1", "2")
187+
)
188+
val workflowRunContext = WorkflowRunContext(
189+
Workflow.NO_ID,
190+
Workflow.NO_ID,
191+
Monitor.NO_ID,
192+
mutableMapOf("index" to listOf("1")),
193+
true,
194+
listOf("finding1")
195+
)
196+
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
197+
monitor,
198+
false,
199+
monitorMetadata,
200+
UUID.randomUUID().toString(),
201+
indexExecutionContext,
202+
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
203+
listOf("test-index"),
204+
workflowRunContext
205+
)
206+
val out = BytesStreamOutput()
207+
monitor.writeTo(out)
208+
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
209+
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
210+
assertTrue(newDocLevelMonitorFanOutRequest.shardIds.isNotEmpty())
211+
assertTrue(newDocLevelMonitorFanOutRequest.shardIds.size == 1)
212+
assertTrue(newDocLevelMonitorFanOutRequest.shardIds[0].indexName == "failed_serde")
213+
assertTrue(newDocLevelMonitorFanOutRequest.shardIds[0].id == 999999)
214+
}
154215
}

0 commit comments

Comments
 (0)