Skip to content

Commit ad54a59

Browse files
Tarun-kishoreTarun Kishore
andcommitted
Support for no_alias and min_state_age in ISM TransitionsFeature/ism enhancement (#1440)
Co-authored-by: Tarun Kishore <[email protected]>
1 parent 933c487 commit ad54a59

File tree

7 files changed

+505
-36
lines changed

7 files changed

+505
-36
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ The current supported transition conditions are:
5050
* Index size
5151
* Index age
5252
* Cron expression
53+
* Alias presence
54+
* ISM state age
5355

5456
## Contributing
5557

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Transition.kt

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.indexmanagement.indexstatemanagement.model
77

8+
import org.opensearch.Version
89
import org.opensearch.common.unit.TimeValue
910
import org.opensearch.core.common.io.stream.StreamInput
1011
import org.opensearch.core.common.io.stream.StreamOutput
@@ -80,10 +81,12 @@ data class Conditions(
8081
val size: ByteSizeValue? = null,
8182
val cron: CronSchedule? = null,
8283
val rolloverAge: TimeValue? = null,
84+
val noAlias: Boolean? = null,
85+
val minStateAge: TimeValue? = null,
8386
) : ToXContentObject,
8487
Writeable {
8588
init {
86-
val conditionsList = listOf(indexAge, docCount, size, cron, rolloverAge)
89+
val conditionsList = listOf(indexAge, docCount, size, cron, rolloverAge, noAlias, minStateAge)
8790
require(conditionsList.filterNotNull().size == 1) { "Cannot provide more than one Transition condition" }
8891

8992
// Validate doc count condition
@@ -100,6 +103,8 @@ data class Conditions(
100103
if (size != null) builder.field(MIN_SIZE_FIELD, size.stringRep)
101104
if (cron != null) builder.field(CRON_FIELD, cron)
102105
if (rolloverAge != null) builder.field(MIN_ROLLOVER_AGE_FIELD, rolloverAge.stringRep)
106+
if (noAlias != null) builder.field(NO_ALIAS_FIELD, noAlias)
107+
if (minStateAge != null) builder.field(MIN_STATE_AGE_FIELD, minStateAge.stringRep)
103108
return builder.endObject()
104109
}
105110

@@ -110,6 +115,8 @@ data class Conditions(
110115
size = sin.readOptionalWriteable(::ByteSizeValue),
111116
cron = sin.readOptionalWriteable(::CronSchedule),
112117
rolloverAge = sin.readOptionalTimeValue(),
118+
noAlias = if (sin.version.onOrAfter(Version.V_3_2_0)) sin.readOptionalBoolean() else null,
119+
minStateAge = if (sin.version.onOrAfter(Version.V_3_2_0)) sin.readOptionalTimeValue() else null,
113120
)
114121

115122
@Throws(IOException::class)
@@ -119,6 +126,10 @@ data class Conditions(
119126
out.writeOptionalWriteable(size)
120127
out.writeOptionalWriteable(cron)
121128
out.writeOptionalTimeValue(rolloverAge)
129+
if (out.version.onOrAfter(Version.V_3_2_0)) {
130+
out.writeOptionalBoolean(noAlias)
131+
out.writeOptionalTimeValue(minStateAge)
132+
}
122133
}
123134

124135
companion object {
@@ -127,6 +138,8 @@ data class Conditions(
127138
const val MIN_SIZE_FIELD = "min_size"
128139
const val CRON_FIELD = "cron"
129140
const val MIN_ROLLOVER_AGE_FIELD = "min_rollover_age"
141+
const val NO_ALIAS_FIELD = "no_alias"
142+
const val MIN_STATE_AGE_FIELD = "min_state_age"
130143

131144
@JvmStatic
132145
@Throws(IOException::class)
@@ -136,6 +149,8 @@ data class Conditions(
136149
var size: ByteSizeValue? = null
137150
var cron: CronSchedule? = null
138151
var rolloverAge: TimeValue? = null
152+
var noAlias: Boolean? = null
153+
var minStateAge: TimeValue? = null
139154

140155
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
141156
while (xcp.nextToken() != Token.END_OBJECT) {
@@ -148,11 +163,13 @@ data class Conditions(
148163
MIN_SIZE_FIELD -> size = ByteSizeValue.parseBytesSizeValue(xcp.text(), MIN_SIZE_FIELD)
149164
CRON_FIELD -> cron = ScheduleParser.parse(xcp) as? CronSchedule
150165
MIN_ROLLOVER_AGE_FIELD -> rolloverAge = TimeValue.parseTimeValue(xcp.text(), MIN_ROLLOVER_AGE_FIELD)
166+
NO_ALIAS_FIELD -> noAlias = xcp.booleanValue()
167+
MIN_STATE_AGE_FIELD -> minStateAge = TimeValue.parseTimeValue(xcp.text(), MIN_STATE_AGE_FIELD)
151168
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Conditions.")
152169
}
153170
}
154171

155-
return Conditions(indexAge, docCount, size, cron, rolloverAge)
172+
return Conditions(indexAge, docCount, size, cron, rolloverAge, noAlias, minStateAge)
156173
}
157174
}
158175
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transition/AttemptTransitionStep.kt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider
1616
import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction
1717
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getOldestRolloverTime
1818
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
19+
import org.opensearch.indexmanagement.indexstatemanagement.util.TransitionConditionContext
1920
import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions
2021
import org.opensearch.indexmanagement.indexstatemanagement.util.hasStatsConditions
2122
import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
@@ -100,9 +101,22 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name)
100101
}
101102

102103
// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
104+
val indexAliasesCount = indexMetadata?.aliases?.size ?: 0
105+
val stateStartTime = context.metadata.stateMetaData?.startTime
106+
val stateStartInstant = stateStartTime?.let { Instant.ofEpochMilli(it) }
103107
stateName =
104108
transitions.find {
105-
it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime, rolloverDate)
109+
it.evaluateConditions(
110+
TransitionConditionContext(
111+
indexCreationDate = indexCreationDateInstant,
112+
numDocs = numDocs,
113+
indexSize = indexSize,
114+
transitionStartTime = stepStartTime,
115+
rolloverDate = rolloverDate,
116+
indexAliasesCount = indexAliasesCount,
117+
stateStartTime = stateStartInstant,
118+
),
119+
)
106120
}?.stateName
107121
val message: String
108122
val stateName = stateName // shadowed on purpose to prevent var from changing

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction
3434
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
3535
import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction
3636
import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy
37+
import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions
3738
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
3839
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
3940
import org.opensearch.indexmanagement.indexstatemanagement.model.State
@@ -182,47 +183,83 @@ fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = Manag
182183
return req
183184
}
184185

185-
@Suppress("ReturnCount", "ComplexCondition")
186+
@Suppress("ReturnCount", "ComplexCondition", "LongParameterList")
187+
data class TransitionConditionContext(
188+
val indexCreationDate: Instant,
189+
val numDocs: Long?,
190+
val indexSize: ByteSizeValue?,
191+
val transitionStartTime: Instant,
192+
val rolloverDate: Instant?,
193+
val indexAliasesCount: Int? = null,
194+
val stateStartTime: Instant? = null,
195+
)
196+
197+
@Suppress("ReturnCount")
186198
fun Transition.evaluateConditions(
187-
indexCreationDate: Instant,
188-
numDocs: Long?,
189-
indexSize: ByteSizeValue?,
190-
transitionStartTime: Instant,
191-
rolloverDate: Instant?,
199+
context: TransitionConditionContext,
192200
): Boolean {
193-
// If there are no conditions, treat as always true
194-
if (this.conditions == null) return true
201+
val conditions = this.conditions ?: return true
202+
if (checkDocCount(conditions, context)) return true
203+
if (checkIndexAge(conditions, context)) return true
204+
if (checkSize(conditions, context)) return true
205+
if (checkCron(conditions, context)) return true
206+
if (checkRolloverAge(conditions, context)) return true
207+
if (checkNoAlias(conditions, context)) return true
208+
if (checkMinStateAge(conditions, context)) return true
209+
return false
210+
}
195211

196-
if (this.conditions.docCount != null && numDocs != null) {
197-
return this.conditions.docCount <= numDocs
198-
}
212+
private fun checkDocCount(conditions: Conditions, context: TransitionConditionContext): Boolean =
213+
conditions.docCount != null &&
214+
context.numDocs != null &&
215+
conditions.docCount <= context.numDocs
199216

200-
if (this.conditions.indexAge != null) {
201-
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
202-
if (indexCreationDateMilli == -1L) return false // transitions cannot currently be ORd like rollover, so we must return here
217+
@Suppress("ReturnCount")
218+
private fun checkIndexAge(conditions: Conditions, context: TransitionConditionContext): Boolean {
219+
if (conditions.indexAge != null) {
220+
val indexCreationDateMilli = context.indexCreationDate.toEpochMilli()
221+
if (indexCreationDateMilli == -1L) return false
203222
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
204-
return this.conditions.indexAge.millis <= elapsedTime
223+
return conditions.indexAge.millis <= elapsedTime
205224
}
225+
return false
226+
}
206227

207-
if (this.conditions.size != null && indexSize != null) {
208-
return this.conditions.size <= indexSize
209-
}
228+
private fun checkSize(conditions: Conditions, context: TransitionConditionContext): Boolean =
229+
conditions.size != null &&
230+
context.indexSize != null &&
231+
conditions.size <= context.indexSize
210232

211-
if (this.conditions.cron != null) {
212-
// If a cron pattern matches the time between the start of "attempt_transition" to now then we consider it meeting the condition
213-
return this.conditions.cron.getNextExecutionTime(transitionStartTime) <= Instant.now()
233+
private fun checkCron(conditions: Conditions, context: TransitionConditionContext): Boolean {
234+
if (conditions.cron != null) {
235+
return conditions.cron.getNextExecutionTime(context.transitionStartTime) <= Instant.now()
214236
}
237+
return false
238+
}
215239

216-
if (this.conditions.rolloverAge != null) {
217-
val rolloverDateMilli = rolloverDate?.toEpochMilli() ?: return false
240+
@Suppress("ReturnCount")
241+
private fun checkRolloverAge(conditions: Conditions, context: TransitionConditionContext): Boolean {
242+
if (conditions.rolloverAge != null) {
243+
val rolloverDateMilli = context.rolloverDate?.toEpochMilli() ?: return false
218244
val elapsedTime = Instant.now().toEpochMilli() - rolloverDateMilli
219-
return this.conditions.rolloverAge.millis <= elapsedTime
245+
return conditions.rolloverAge.millis <= elapsedTime
220246
}
221-
222-
// We should never reach this
223247
return false
224248
}
225249

250+
private fun checkNoAlias(conditions: Conditions, context: TransitionConditionContext): Boolean =
251+
conditions.noAlias != null &&
252+
context.indexAliasesCount != null &&
253+
(
254+
(conditions.noAlias && context.indexAliasesCount == 0) ||
255+
(!conditions.noAlias && context.indexAliasesCount > 0)
256+
)
257+
258+
private fun checkMinStateAge(conditions: Conditions, context: TransitionConditionContext): Boolean =
259+
conditions.minStateAge != null &&
260+
context.stateStartTime != null &&
261+
(System.currentTimeMillis() - context.stateStartTime.toEpochMilli() >= conditions.minStateAge.millis)
262+
226263
fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null || this.conditions?.size != null
227264

228265
@Suppress("ReturnCount", "ComplexCondition")

src/test/kotlin/org/opensearch/indexmanagement/bwc/ISMBackwardsCompatibilityIT.kt

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,16 @@ import org.opensearch.indexmanagement.IndexManagementIndices.Companion.HISTORY_W
1111
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
1212
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
1313
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
14+
import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions
15+
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
16+
import org.opensearch.indexmanagement.indexstatemanagement.model.State
17+
import org.opensearch.indexmanagement.indexstatemanagement.model.Transition
18+
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
1419
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
20+
import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep
1521
import org.opensearch.indexmanagement.waitFor
22+
import java.time.Instant
23+
import java.time.temporal.ChronoUnit
1624
import java.util.Locale
1725

1826
class ISMBackwardsCompatibilityIT : IndexStateManagementRestTestCase() {
@@ -130,6 +138,62 @@ class ISMBackwardsCompatibilityIT : IndexStateManagementRestTestCase() {
130138
}
131139
}
132140

141+
@Throws(Exception::class)
142+
@Suppress("UNCHECKED_CAST")
143+
fun `test existing transition conditions backwards compatibility`() {
144+
val indexNameBase = "${testIndexName}_existing_conditions"
145+
val index1 = "$indexNameBase-1"
146+
val index2 = "$indexNameBase-2"
147+
val policyID = "${testIndexName}_doc_count_policy"
148+
149+
val uri = getPluginUri()
150+
val responseMap = getAsMap(uri)["nodes"] as Map<String, Map<String, Any>>
151+
for (response in responseMap.values) {
152+
val plugins = response["plugins"] as List<Map<String, Any>>
153+
val pluginNames = plugins.map { plugin -> plugin["name"] }.toSet()
154+
when (CLUSTER_TYPE) {
155+
ClusterType.OLD -> {
156+
assertTrue(pluginNames.contains("opendistro-index-management") || pluginNames.contains("opensearch-index-management"))
157+
158+
createDocCountTransitionPolicy(policyID)
159+
160+
createIndex(index1, policyID)
161+
createIndex(index2, policyID)
162+
163+
// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
164+
updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(index1))
165+
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) }
166+
167+
// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
168+
updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(index2))
169+
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index2).policyID) }
170+
171+
verifyPendingTransition(index1)
172+
verifyPendingTransition(index2)
173+
}
174+
ClusterType.MIXED -> {
175+
assertTrue(pluginNames.contains("opensearch-index-management"))
176+
177+
verifyPendingTransition(index1)
178+
verifyPendingTransition(index2)
179+
}
180+
ClusterType.UPGRADED -> {
181+
assertTrue(pluginNames.contains("opensearch-index-management"))
182+
183+
verifyPendingTransition(index1)
184+
insertSampleData(index = index1, docCount = 6, delay = 0)
185+
verifySuccessfulTransition(index1)
186+
187+
insertSampleData(index = index2, docCount = 6, delay = 0)
188+
verifySuccessfulTransition(index2)
189+
190+
deleteIndex("$indexNameBase*")
191+
}
192+
}
193+
break
194+
}
195+
}
196+
133197
private fun createRolloverPolicy(policyID: String) {
134198
val policy =
135199
"""
@@ -209,4 +273,50 @@ class ISMBackwardsCompatibilityIT : IndexStateManagementRestTestCase() {
209273
}
210274
Assert.assertTrue("New rollover index does not exist.", indexExists(newIndex))
211275
}
276+
277+
private fun createDocCountTransitionPolicy(policyID: String) {
278+
val secondStateName = "second"
279+
val states = listOf(
280+
State("first", listOf(), listOf(Transition(secondStateName, Conditions(docCount = 5L)))),
281+
State(secondStateName, listOf(), listOf()),
282+
)
283+
284+
val policy = Policy(
285+
id = policyID,
286+
description = "BWC test policy with doc count transition",
287+
schemaVersion = 5L,
288+
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
289+
errorNotification = randomErrorNotification(),
290+
defaultState = states[0].name,
291+
states = states,
292+
)
293+
294+
createPolicy(policy, policyID)
295+
}
296+
297+
private fun verifyPendingTransition(index: String) {
298+
val managedIndexConfig = getExistingManagedIndexConfig(index)
299+
// Need to speed up to second execution where it will trigger the first execution of transition evaluation
300+
updateManagedIndexConfigStartTime(managedIndexConfig)
301+
waitFor {
302+
assertEquals(
303+
"Index transitioned before it met the condition.",
304+
AttemptTransitionStep.getEvaluatingMessage(index),
305+
getExplainManagedIndexMetaData(index).info?.get("message"),
306+
)
307+
}
308+
}
309+
310+
private fun verifySuccessfulTransition(index: String) {
311+
val managedIndexConfig = getExistingManagedIndexConfig(index)
312+
// Need to speed up to second execution where it will trigger the transition evaluation
313+
updateManagedIndexConfigStartTime(managedIndexConfig)
314+
waitFor {
315+
assertEquals(
316+
"Index did not transition successfully",
317+
AttemptTransitionStep.getSuccessMessage(index, "second"),
318+
getExplainManagedIndexMetaData(index).info?.get("message"),
319+
)
320+
}
321+
}
212322
}

0 commit comments

Comments
 (0)