Skip to content

Commit 1fef233

Browse files
committed
construct dummy request for doc level monitor fanout request on serde failure
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent c9c0747 commit 1fef233

File tree

2 files changed

+139
-11
lines changed

2 files changed

+139
-11
lines changed

src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt

+78-11
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,25 @@ package org.opensearch.commons.alerting.action
77

88
import org.opensearch.action.ActionRequest
99
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.commons.alerting.model.DataSources
1011
import org.opensearch.commons.alerting.model.IndexExecutionContext
12+
import org.opensearch.commons.alerting.model.IntervalSchedule
1113
import org.opensearch.commons.alerting.model.Monitor
14+
import org.opensearch.commons.alerting.model.Monitor.Companion.NO_VERSION
1215
import org.opensearch.commons.alerting.model.MonitorMetadata
1316
import org.opensearch.commons.alerting.model.WorkflowRunContext
17+
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
1418
import org.opensearch.core.common.io.stream.StreamInput
1519
import org.opensearch.core.common.io.stream.StreamOutput
1620
import org.opensearch.core.index.shard.ShardId
1721
import org.opensearch.core.xcontent.ToXContent
1822
import org.opensearch.core.xcontent.ToXContentObject
1923
import org.opensearch.core.xcontent.XContentBuilder
24+
import org.opensearch.index.seqno.SequenceNumbers
2025
import java.io.IOException
26+
import java.time.Instant
27+
import java.time.temporal.ChronoUnit
28+
import java.util.UUID
2129

2230
class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
2331
val monitor: Monitor
@@ -37,7 +45,7 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
3745
indexExecutionContext: IndexExecutionContext?,
3846
shardIds: List<ShardId>,
3947
concreteIndicesSeenSoFar: List<String>,
40-
workflowRunContext: WorkflowRunContext?
48+
workflowRunContext: WorkflowRunContext?,
4149
) : super() {
4250
this.monitor = monitor
4351
this.dryRun = dryRun
@@ -52,16 +60,75 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
5260

5361
@Throws(IOException::class)
5462
constructor(sin: StreamInput) : this(
55-
monitor = Monitor.readFrom(sin)!!,
56-
dryRun = sin.readBoolean(),
57-
monitorMetadata = MonitorMetadata.readFrom(sin),
58-
executionId = sin.readString(),
59-
shardIds = sin.readList(::ShardId),
60-
concreteIndicesSeenSoFar = sin.readStringList(),
61-
workflowRunContext = if (sin.readBoolean()) {
62-
WorkflowRunContext(sin)
63-
} else { null },
64-
indexExecutionContext = IndexExecutionContext(sin)
63+
monitor = try {
64+
Monitor.readFrom(sin)!!
65+
} catch (e: Exception) {
66+
Monitor(
67+
"failed_serde",
68+
NO_VERSION,
69+
"failed_serde",
70+
true,
71+
IntervalSchedule(1, ChronoUnit.MINUTES),
72+
Instant.now(),
73+
Instant.now(),
74+
"",
75+
null,
76+
NO_SCHEMA_VERSION,
77+
emptyList(),
78+
emptyList(),
79+
emptyMap(),
80+
DataSources(),
81+
false,
82+
false,
83+
"failed"
84+
)
85+
},
86+
dryRun = try {
87+
sin.readBoolean()
88+
} catch (e: Exception) {
89+
false
90+
},
91+
monitorMetadata = try {
92+
MonitorMetadata.readFrom(sin)
93+
} catch (e: Exception) {
94+
MonitorMetadata(
95+
"failed_serde", SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
96+
"failed_serde", emptyList(), emptyMap(),
97+
mutableMapOf()
98+
)
99+
},
100+
executionId = try {
101+
sin.readString()
102+
} catch (e: Exception) {
103+
""
104+
},
105+
shardIds = try {
106+
sin.readList(::ShardId)
107+
} catch (e: Exception) {
108+
listOf(ShardId("failed_serde", "failed_serde",999999))// to circumvent an isEmpty() check in constructor
109+
},
110+
concreteIndicesSeenSoFar = try {
111+
sin.readStringList()
112+
} catch (e: Exception) {
113+
emptyList()
114+
},
115+
workflowRunContext = try {
116+
if (sin.readBoolean()) {
117+
WorkflowRunContext(sin)
118+
} else {
119+
null
120+
}
121+
} catch (e: Exception) {
122+
null
123+
},
124+
indexExecutionContext = try {
125+
IndexExecutionContext(sin)
126+
} catch (e: Exception) {
127+
IndexExecutionContext(
128+
emptyList(), mutableMapOf(), mutableMapOf(), "failed_serde", "failed_serde", emptyList(), emptyList(), emptyList(),
129+
emptyList(), emptyList()
130+
)
131+
}
65132
)
66133

67134
@Throws(IOException::class)

src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt

+61
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.commons.alerting.action
77

88
import org.junit.Assert.assertEquals
9+
import org.junit.Assert.assertTrue
910
import org.junit.jupiter.api.Test
1011
import org.opensearch.common.io.stream.BytesStreamOutput
1112
import org.opensearch.commons.alerting.model.ActionExecutionTime
@@ -151,4 +152,64 @@ class DocLevelMonitorFanOutRequestTests {
151152
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
152153
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
153154
}
155+
156+
@Test
157+
fun `test serde failure returning dummy object instead of exception`() {
158+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
159+
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))
160+
161+
val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
162+
val monitor = randomDocumentLevelMonitor(
163+
inputs = listOf(docLevelInput),
164+
triggers = listOf(trigger),
165+
enabled = true,
166+
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
167+
)
168+
val monitorMetadata = MonitorMetadata(
169+
"test",
170+
SequenceNumbers.UNASSIGNED_SEQ_NO,
171+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
172+
Monitor.NO_ID,
173+
listOf(ActionExecutionTime("", Instant.now())),
174+
mutableMapOf("index" to mutableMapOf("1" to "1")),
175+
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
176+
)
177+
val indexExecutionContext = IndexExecutionContext(
178+
listOf(docQuery),
179+
mutableMapOf("index" to mutableMapOf("1" to "1")),
180+
mutableMapOf("index" to mutableMapOf("1" to "1")),
181+
"test-index",
182+
"test-index",
183+
listOf("test-index"),
184+
listOf("test-index"),
185+
listOf("test-field"),
186+
listOf("1", "2")
187+
)
188+
val workflowRunContext = WorkflowRunContext(
189+
Workflow.NO_ID,
190+
Workflow.NO_ID,
191+
Monitor.NO_ID,
192+
mutableMapOf("index" to listOf("1")),
193+
true,
194+
listOf("finding1")
195+
)
196+
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
197+
monitor,
198+
false,
199+
monitorMetadata,
200+
UUID.randomUUID().toString(),
201+
indexExecutionContext,
202+
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
203+
listOf("test-index"),
204+
workflowRunContext
205+
)
206+
val out = BytesStreamOutput()
207+
monitor.writeTo(out)
208+
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
209+
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
210+
assertTrue(newDocLevelMonitorFanOutRequest.shardIds.isNotEmpty())
211+
assertTrue(newDocLevelMonitorFanOutRequest.shardIds.size == 1)
212+
assertTrue(newDocLevelMonitorFanOutRequest.shardIds[0].indexName == "failed_serde")
213+
assertTrue(newDocLevelMonitorFanOutRequest.shardIds[0].id == 999999)
214+
}
154215
}

0 commit comments

Comments
 (0)