Skip to content

Commit 41c9fcf

Browse files
authored
support list of monitor ids in Chained Monitor Findings (#514) (#515)
support list of monitor ids in Chained Monitor Findings Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent e6783b6 commit 41c9fcf

File tree

6 files changed

+156
-25
lines changed

6 files changed

+156
-25
lines changed

src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,41 @@ class IndexWorkflowRequest : ActionRequest {
105105
val monitorIdOrderMap: Map<String, Int> = delegates.associate { it.monitorId to it.order }
106106
delegates.forEach {
107107
if (it.chainedMonitorFindings != null) {
108-
if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) {
109-
validationException = ValidateActions.addValidationError(
110-
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence",
111-
validationException
112-
)
113-
// Break the flow because next check will generate the NPE
114-
return validationException
115-
}
116-
if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) {
117-
validationException = ValidateActions.addValidationError(
118-
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}",
119-
validationException
120-
)
108+
109+
if (it.chainedMonitorFindings.monitorId != null) {
110+
if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) {
111+
validationException = ValidateActions.addValidationError(
112+
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence",
113+
validationException
114+
)
115+
// Break the flow because next check will generate the NPE
116+
return validationException
117+
}
118+
if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) {
119+
validationException = ValidateActions.addValidationError(
120+
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}",
121+
validationException
122+
)
123+
}
124+
} else {
125+
for (monitorId in it.chainedMonitorFindings.monitorIds) {
126+
if (!monitorIdOrderMap.containsKey(monitorId)) {
127+
validationException = ValidateActions.addValidationError(
128+
"Chained Findings Monitor $monitorId doesn't exist in sequence",
129+
validationException
130+
)
131+
return validationException
132+
} else {
133+
val order = monitorIdOrderMap.get(monitorId)!!
134+
if (order >= it.order) {
135+
return ValidateActions.addValidationError(
136+
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}. " +
137+
"Order of monitor being chained [$order] should be smaller than order of monitor using findings as source data [${it.order}] in sequence",
138+
validationException
139+
)
140+
}
141+
}
142+
}
121143
}
122144
}
123145
}

src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,63 +9,89 @@ import org.opensearch.core.xcontent.XContentBuilder
99
import org.opensearch.core.xcontent.XContentParser
1010
import org.opensearch.core.xcontent.XContentParserUtils
1111
import java.io.IOException
12+
import java.util.Collections
1213

1314
/**
14-
* Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id.
15+
* Context passed in delegate monitor to filter data matched by a list of monitors based on the findings of the given monitor ids.
1516
*/
1617
// TODO - Remove the class and move the monitorId to Delegate (as a chainedMonitorId property) if this class won't be updated by adding new properties
1718
data class ChainedMonitorFindings(
18-
val monitorId: String
19+
val monitorId: String? = null,
20+
val monitorIds: List<String> = emptyList(), // if monitorId field is non-null it would be given precendence for BWC
1921
) : BaseModel {
2022

2123
init {
22-
validateId(monitorId)
24+
require(!(monitorId.isNullOrBlank() && monitorIds.isEmpty())) {
25+
"at least one of fields, 'monitorIds' and 'monitorId' should be provided"
26+
}
27+
if (monitorId != null && monitorId.isBlank()) {
28+
validateId(monitorId)
29+
} else {
30+
monitorIds.forEach { validateId(it) }
31+
}
2332
}
2433

2534
@Throws(IOException::class)
2635
constructor(sin: StreamInput) : this(
27-
sin.readString(), // monitorId
36+
sin.readOptionalString(), // monitorId
37+
Collections.unmodifiableList(sin.readStringList())
2838
)
2939

40+
@Suppress("UNCHECKED_CAST")
3041
fun asTemplateArg(): Map<String, Any> {
3142
return mapOf(
3243
MONITOR_ID_FIELD to monitorId,
33-
)
44+
MONITOR_IDS_FIELD to monitorIds
45+
) as Map<String, Any>
3446
}
3547

3648
@Throws(IOException::class)
3749
override fun writeTo(out: StreamOutput) {
38-
out.writeString(monitorId)
50+
out.writeOptionalString(monitorId)
51+
out.writeStringCollection(monitorIds)
3952
}
4053

4154
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
4255
builder.startObject()
4356
.field(MONITOR_ID_FIELD, monitorId)
57+
.field(MONITOR_IDS_FIELD, monitorIds)
4458
.endObject()
4559
return builder
4660
}
4761

4862
companion object {
4963
const val MONITOR_ID_FIELD = "monitor_id"
64+
const val MONITOR_IDS_FIELD = "monitor_ids"
5065

5166
@JvmStatic
5267
@Throws(IOException::class)
5368
fun parse(xcp: XContentParser): ChainedMonitorFindings {
54-
lateinit var monitorId: String
55-
69+
var monitorId: String? = null
70+
val monitorIds = mutableListOf<String>()
5671
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
5772
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
5873
val fieldName = xcp.currentName()
5974
xcp.nextToken()
6075

6176
when (fieldName) {
6277
MONITOR_ID_FIELD -> {
63-
monitorId = xcp.text()
64-
validateId(monitorId)
78+
if (!xcp.currentToken().equals(XContentParser.Token.VALUE_NULL))
79+
monitorId = xcp.text()
80+
}
81+
82+
MONITOR_IDS_FIELD -> {
83+
XContentParserUtils.ensureExpectedToken(
84+
XContentParser.Token.START_ARRAY,
85+
xcp.currentToken(),
86+
xcp
87+
)
88+
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
89+
monitorIds.add(xcp.text())
90+
}
6591
}
6692
}
6793
}
68-
return ChainedMonitorFindings(monitorId)
94+
return ChainedMonitorFindings(monitorId, monitorIds)
6995
}
7096

7197
@JvmStatic

src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,16 @@ fun randomClusterMetricsInput(
405405
return ClusterMetricsInput(path, pathParams, url)
406406
}
407407

408+
fun ChainedMonitorFindings.toJsonString(): String {
409+
val builder = XContentFactory.jsonBuilder()
410+
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
411+
}
412+
413+
fun Workflow.toJsonString(): String {
414+
val builder = XContentFactory.jsonBuilder()
415+
return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string()
416+
}
417+
408418
fun Monitor.toJsonString(): String {
409419
val builder = XContentFactory.jsonBuilder()
410420
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()

src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import org.opensearch.search.SearchModule
2121
import java.lang.Exception
2222
import java.lang.IllegalArgumentException
2323
import java.util.UUID
24+
import kotlin.test.assertNotNull
25+
import kotlin.test.assertNull
26+
import kotlin.test.assertTrue
2427

2528
class IndexWorkflowRequestTests {
2629

@@ -196,6 +199,21 @@ class IndexWorkflowRequestTests {
196199
delegates = listOf(
197200
Delegate(1, "monitor-1")
198201
)
202+
203+
// Chained finding list of monitors valid
204+
delegates = listOf(
205+
Delegate(1, "monitor-1"),
206+
Delegate(2, "monitor-2"),
207+
Delegate(3, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))),
208+
209+
)
210+
val req7 = IndexWorkflowRequest(
211+
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
212+
randomWorkflowWithDelegates(
213+
input = listOf(CompositeInput(Sequence(delegates = delegates)))
214+
)
215+
)
216+
assertNull(req7.validate())
199217
try {
200218
IndexWorkflowRequest(
201219
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
@@ -207,5 +225,21 @@ class IndexWorkflowRequestTests {
207225
Assert.assertTrue(ex is IllegalArgumentException)
208226
Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input."))
209227
}
228+
229+
// Chained finding list of monitors invalid order and old field null
230+
delegates = listOf(
231+
Delegate(1, "monitor-1"),
232+
Delegate(3, "monitor-2"),
233+
Delegate(2, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))),
234+
235+
)
236+
val req8 = IndexWorkflowRequest(
237+
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
238+
randomWorkflowWithDelegates(
239+
input = listOf(CompositeInput(Sequence(delegates = delegates)))
240+
)
241+
)
242+
assertNotNull(req8.validate())
243+
assertTrue(req8.validate()!!.message!!.contains("should be executed before monitor"))
210244
}
211245
}

src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,25 @@ class CompositeInputTests {
7070
}
7171

7272
@Test
73-
fun `test create Chained Findings with illegal monitorId value`() {
73+
fun `test create Chained Findings with illegal monitorId value and empty monitorIds list`() {
7474
try {
7575
ChainedMonitorFindings("")
7676
Assertions.fail("Expecting an illegal argument exception")
77+
} catch (e: IllegalArgumentException) {
78+
e.message?.let {
79+
Assertions.assertTrue(
80+
it.contains("at least one of fields, 'monitorIds' and 'monitorId' should be provided")
81+
82+
)
83+
}
84+
}
85+
}
86+
87+
@Test
88+
fun `test create Chained Findings with null monitorId value and monitorIds list with blank monitorIds`() {
89+
try {
90+
ChainedMonitorFindings("", listOf("", ""))
91+
Assertions.fail("Expecting an illegal argument exception")
7792
} catch (e: IllegalArgumentException) {
7893
e.message?.let {
7994
Assertions.assertTrue(

src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,30 @@ class XContentTests {
264264
Assertions.assertNull(parsedMonitor.user)
265265
}
266266

267+
@Test
268+
fun `test workflow parsing`() {
269+
val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3"))
270+
val monitorString = workflow.toJsonString()
271+
val parsedWorkflow = Workflow.parse(parser(monitorString))
272+
Assertions.assertEquals(workflow, parsedWorkflow, "Round tripping workflow failed")
273+
}
274+
275+
@Test
276+
fun `test chainedMonitorFindings parsing`() {
277+
val cmf1 = ChainedMonitorFindings(monitorId = "m1")
278+
val cmf1String = cmf1.toJsonString()
279+
Assertions.assertEquals(
280+
ChainedMonitorFindings.parse(parser(cmf1String)), cmf1,
281+
"Round tripping chained monitor findings failed"
282+
)
283+
val cmf2 = ChainedMonitorFindings(monitorIds = listOf("m1", "m2"))
284+
val cmf2String = cmf2.toJsonString()
285+
Assertions.assertEquals(
286+
ChainedMonitorFindings.parse(parser(cmf2String)), cmf2,
287+
"Round tripping chained monitor findings failed"
288+
)
289+
}
290+
267291
@Test
268292
fun `test old monitor format parsing`() {
269293
val monitorString = """

0 commit comments

Comments
 (0)