Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Implemented cross-cluster monitor support #584 #586

Merged
merged 4 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ data class Alert(
val aggregationResultBucket: AggregationResultBucket? = null,
val executionId: String? = null,
val associatedAlertIds: List<String>,
val clusters: List<String>? = null,
) : Writeable, ToXContent {

init {
Expand All @@ -61,6 +62,7 @@ data class Alert(
chainedAlertTrigger: ChainedAlertTrigger,
workflow: Workflow,
associatedAlertIds: List<String>,
clusters: List<String>? = null
) : this(
monitorId = NO_ID,
monitorName = "",
Expand All @@ -82,7 +84,8 @@ data class Alert(
executionId = executionId,
workflowId = workflow.id,
workflowName = workflow.name,
associatedAlertIds = associatedAlertIds
associatedAlertIds = associatedAlertIds,
clusters = clusters
)

constructor(
Expand All @@ -97,6 +100,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -118,7 +122,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -134,6 +139,7 @@ data class Alert(
findingIds: List<String> = emptyList(),
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -155,7 +161,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -172,6 +179,7 @@ data class Alert(
findingIds: List<String> = emptyList(),
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -193,7 +201,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -211,6 +220,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
id = id,
monitorId = monitor.id,
Expand All @@ -233,7 +243,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -248,6 +259,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
workflowId: String? = null,
executionId: String?,
clusters: List<String>? = null
) : this(
id = id,
monitorId = monitor.id,
Expand All @@ -270,7 +282,8 @@ data class Alert(
relatedDocIds = listOf(),
workflowId = workflowId ?: "",
executionId = executionId,
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

enum class State {
Expand Down Expand Up @@ -311,7 +324,8 @@ data class Alert(
actionExecutionResults = sin.readList(::ActionExecutionResult),
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null,
executionId = sin.readOptionalString(),
associatedAlertIds = sin.readStringList()
associatedAlertIds = sin.readStringList(),
clusters = sin.readOptionalStringList()
)

fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
Expand Down Expand Up @@ -349,6 +363,7 @@ data class Alert(
}
out.writeOptionalString(executionId)
out.writeStringCollection(associatedAlertIds)
out.writeOptionalStringArray(clusters?.toTypedArray())
}

companion object {
Expand Down Expand Up @@ -379,6 +394,7 @@ data class Alert(
const val ASSOCIATED_ALERT_IDS_FIELD = "associated_alert_ids"
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val CLUSTERS_FIELD = "clusters"
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND

Expand Down Expand Up @@ -409,6 +425,7 @@ data class Alert(
val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
var aggAlertBucket: AggregationResultBucket? = null
val associatedAlertIds = mutableListOf<String>()
val clusters = mutableListOf<String>()
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down Expand Up @@ -475,6 +492,12 @@ data class Alert(
AggregationResultBucket.parse(xcp)
}
}
CLUSTERS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
clusters.add(xcp.text())
}
}
}
}

Expand Down Expand Up @@ -503,7 +526,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId,
workflowName = workflowName,
associatedAlertIds = associatedAlertIds
associatedAlertIds = associatedAlertIds,
clusters = if (clusters.size > 0) clusters else null
)
}

Expand Down Expand Up @@ -553,6 +577,9 @@ data class Alert(
.optionalTimeField(END_TIME_FIELD, endTime)
.optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime)
aggregationResultBucket?.innerXContent(builder)

if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray())

builder.endObject()
return builder
}
Expand All @@ -576,7 +603,8 @@ data class Alert(
BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","),
PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath,
FINDING_IDS to findingIds.joinToString(","),
RELATED_DOC_IDS to relatedDocIds.joinToString(",")
RELATED_DOC_IDS to relatedDocIds.joinToString(","),
CLUSTERS_FIELD to clusters?.joinToString(",")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.net.URI
import java.net.URISyntaxException

val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '#', '>', '<', ' ')

Expand All @@ -22,7 +23,8 @@ val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '
data class ClusterMetricsInput(
var path: String,
var pathParams: String = "",
var url: String
var url: String,
var clusters: List<String> = listOf()
) : Input {
val clusterMetricType: ClusterMetricType
val constructedUri: URI
Expand All @@ -43,11 +45,10 @@ data class ClusterMetricsInput(
"Invalid URI constructed from the path and path_params inputs, or the url input."
}

if (url.isNotEmpty() && validateFieldsNotEmpty()) {
if (url.isNotEmpty() && validateFieldsNotEmpty())
require(constructedUri == constructUrlFromInputs()) {
"The provided URL and URI fields form different URLs."
}
}

require(constructedUri.host.lowercase() == SUPPORTED_HOST) {
"Only host '$SUPPORTED_HOST' is supported."
Expand All @@ -74,6 +75,7 @@ data class ClusterMetricsInput(
.field(PATH_FIELD, path)
.field(PATH_PARAMS_FIELD, pathParams)
.field(URL_FIELD, url)
.field(CLUSTERS_FIELD, clusters)
.endObject()
.endObject()
}
Expand All @@ -87,6 +89,7 @@ data class ClusterMetricsInput(
out.writeString(path)
out.writeString(pathParams)
out.writeString(url)
out.writeStringArray(clusters.toTypedArray())
}

companion object {
Expand All @@ -99,18 +102,19 @@ data class ClusterMetricsInput(
const val PATH_PARAMS_FIELD = "path_params"
const val URL_FIELD = "url"
const val URI_FIELD = "uri"
const val CLUSTERS_FIELD = "clusters"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(URI_FIELD), CheckedFunction { parseInner(it) })

/**
* This parse function uses [XContentParser] to parse JSON input and store corresponding fields to create a [ClusterMetricsInput] object
*/
@JvmStatic
@Throws(IOException::class)
@JvmStatic @Throws(IOException::class)
fun parseInner(xcp: XContentParser): ClusterMetricsInput {
var path = ""
var pathParams = ""
var url = ""
val clusters = mutableListOf<String>()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)

Expand All @@ -121,9 +125,17 @@ data class ClusterMetricsInput(
PATH_FIELD -> path = xcp.text()
PATH_PARAMS_FIELD -> pathParams = xcp.text()
URL_FIELD -> url = xcp.text()
CLUSTERS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) clusters.add(xcp.text())
}
}
}
return ClusterMetricsInput(path, pathParams, url)
return ClusterMetricsInput(path, pathParams, url, clusters)
}
}

Expand Down Expand Up @@ -163,20 +175,17 @@ data class ClusterMetricsInput(
if (pathParams.isNotEmpty()) {
pathParams = pathParams.trim('/')
ILLEGAL_PATH_PARAMETER_CHARACTERS.forEach { character ->
if (pathParams.contains(character)) {
if (pathParams.contains(character))
throw IllegalArgumentException(
"The provided path parameters contain invalid characters or spaces. Please omit: " + "${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}"
"The provided path parameters contain invalid characters or spaces. Please omit: " + ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")
)
}
}
}

if (apiType.requiresPathParams && pathParams.isEmpty()) {
if (apiType.requiresPathParams && pathParams.isEmpty())
throw IllegalArgumentException("The API requires path parameters.")
}
if (!apiType.supportsPathParams && pathParams.isNotEmpty()) {
if (!apiType.supportsPathParams && pathParams.isNotEmpty())
throw IllegalArgumentException("The API does not use path parameters.")
}

return pathParams
}
Expand All @@ -192,13 +201,11 @@ data class ClusterMetricsInput(
ClusterMetricType.values()
.filter { option -> option != ClusterMetricType.BLANK }
.forEach { option ->
if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath)) {
if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath))
apiType = option
}
}
if (apiType.isBlank()) {
if (apiType.isBlank())
throw IllegalArgumentException("The API could not be determined from the provided URI.")
}
return apiType
}

Expand All @@ -207,28 +214,36 @@ data class ClusterMetricsInput(
* @return The constructed [URI].
*/
private fun constructUrlFromInputs(): URI {
val uriBuilder = URIBuilder()
.setScheme(SUPPORTED_SCHEME)
.setHost(SUPPORTED_HOST)
.setPort(SUPPORTED_PORT)
.setPath(path + pathParams)
return uriBuilder.build()
/**
* this try-catch block is required due to a httpcomponents 5.1.x library issue
* it auto encodes path params in the url.
*/
return try {
val formattedPath = if (path.startsWith("/") || path.isBlank()) path else "/$path"
val formattedPathParams = if (pathParams.startsWith("/") || pathParams.isBlank()) pathParams else "/$pathParams"
val uriBuilder = URIBuilder("$SUPPORTED_SCHEME://$SUPPORTED_HOST:$SUPPORTED_PORT$formattedPath$formattedPathParams")
uriBuilder.build()
} catch (ex: URISyntaxException) {
val uriBuilder = URIBuilder()
.setScheme(SUPPORTED_SCHEME)
.setHost(SUPPORTED_HOST)
.setPort(SUPPORTED_PORT)
.setPath(path + pathParams)
uriBuilder.build()
}
}

/**
* If [url] field is empty, populates it with [constructedUri].
* If [path] and [pathParams] are empty, populates them with values from [url].
*/
private fun parseEmptyFields() {
if (pathParams.isEmpty()) {
if (pathParams.isEmpty())
pathParams = this.parsePathParams()
}
if (path.isEmpty()) {
if (path.isEmpty())
path = if (pathParams.isEmpty()) clusterMetricType.defaultPath else clusterMetricType.prependPath
}
if (url.isEmpty()) {
if (url.isEmpty())
url = constructedUri.toString()
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class AlertTests {
assertEquals(templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli(), "Template args start time does not")
assertEquals(templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null, "Template args last notification time does not match")
assertEquals(templateArgs[Alert.SEVERITY_FIELD], alert.severity, "Template args severity does not match")
assertEquals(templateArgs[Alert.CLUSTERS_FIELD], alert.clusters?.joinToString(","), "Template args clusters does not match")
}

@Test
Expand All @@ -40,6 +41,7 @@ class AlertTests {
assertEquals(templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli(), "Template args start time does not")
assertEquals(templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null, "Template args last notification time does not match")
assertEquals(templateArgs[Alert.SEVERITY_FIELD], alert.severity, "Template args severity does not match")
assertEquals(templateArgs[Alert.CLUSTERS_FIELD], alert.clusters?.joinToString(","), "Template args clusters does not match")
assertEquals(
templateArgs[Alert.BUCKET_KEYS],
alert.aggregationResultBucket?.bucketKeys?.joinToString(","),
Expand Down
Loading