Skip to content

Commit 1a4416b

Browse files
Add support in SM Plugin to delete snapshots created manually (#1452)
Co-authored-by: bowenlan-amzn <[email protected]>
1 parent f67a308 commit 1a4416b

27 files changed

+1574
-128
lines changed

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,20 @@ object SMRunner :
103103

104104
// creation, deletion workflow have to be executed sequentially,
105105
// because they are sharing the same metadata document.
106-
SMStateMachine(client, job, metadata, settings, threadPool, indicesManager)
106+
val stateMachine = SMStateMachine(client, job, metadata, settings, threadPool, indicesManager)
107107
.handlePolicyChange()
108-
.currentState(metadata.creation.currentState)
109-
.next(creationTransitions)
110-
.apply {
111-
val deleteMetadata = metadata.deletion
112-
if (deleteMetadata != null) {
113-
this.currentState(deleteMetadata.currentState)
114-
.next(deletionTransitions)
115-
}
116-
}
108+
109+
// Execute creation workflow if it exists
110+
if (metadata.creation != null) {
111+
stateMachine.currentState(metadata.creation.currentState)
112+
.next(creationTransitions)
113+
}
114+
115+
// Execute deletion workflow if it exists
116+
if (metadata.deletion != null) {
117+
stateMachine.currentState(metadata.deletion.currentState)
118+
.next(deletionTransitions)
119+
}
117120
} finally {
118121
if (!releaseLockForScheduledJob(context, lock)) {
119122
log.error("Could not release lock [${lock.lockId}] for ${job.id}.")
@@ -155,13 +158,14 @@ object SMRunner :
155158
id = smPolicyNameToMetadataDocId(smDocIdToPolicyName(job.id)),
156159
policySeqNo = job.seqNo,
157160
policyPrimaryTerm = job.primaryTerm,
158-
creation =
159-
SMMetadata.WorkflowMetadata(
160-
SMState.CREATION_START,
161-
SMMetadata.Trigger(
162-
time = job.creation.schedule.getNextExecutionTime(now),
163-
),
164-
),
161+
creation = job.creation?.let {
162+
SMMetadata.WorkflowMetadata(
163+
SMState.CREATION_START,
164+
SMMetadata.Trigger(
165+
time = job.creation.schedule.getNextExecutionTime(now),
166+
),
167+
)
168+
},
165169
deletion =
166170
job.deletion?.let {
167171
SMMetadata.WorkflowMetadata(

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import org.opensearch.action.get.GetRequest
1717
import org.opensearch.action.get.GetResponse
1818
import org.opensearch.action.index.IndexRequest
1919
import org.opensearch.action.index.IndexResponse
20+
import org.opensearch.common.regex.Regex
2021
import org.opensearch.common.time.DateFormatter
2122
import org.opensearch.common.time.DateFormatters
2223
import org.opensearch.common.unit.TimeValue
@@ -240,7 +241,7 @@ suspend fun Client.getSnapshots(
240241
snapshotMissingMsg: String?,
241242
exceptionMsg: String,
242243
): GetSnapshotsResult {
243-
val snapshots =
244+
var snapshots =
244245
try {
245246
getSnapshots(
246247
name,
@@ -257,7 +258,19 @@ suspend fun Client.getSnapshots(
257258
cause = ex,
258259
)
259260
return GetSnapshotsResult(true, emptyList(), metadataBuilder)
260-
}.filterBySMPolicyInSnapshotMetadata(job.policyName)
261+
}
262+
263+
// Parse CSV patterns from name and implement pattern-based filtering
264+
val patterns = name.split(",").map { it.trim() }
265+
val policyNamePattern = "${job.policyName}*"
266+
// Filter other snapshots by policy metadata
267+
val otherPatternSnapshots = snapshots.filter { snapshot ->
268+
patterns.any { pattern ->
269+
pattern != policyNamePattern && Regex.simpleMatch(pattern, snapshot.snapshotId().name)
270+
}
271+
}
272+
val policySnapshots = snapshots.filterBySMPolicyInSnapshotMetadata(job.policyName)
273+
snapshots = policySnapshots + otherPatternSnapshots
261274

262275
return GetSnapshotsResult(false, snapshots, metadataBuilder)
263276
}

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ class SMStateMachine(
162162
val retry =
163163
when (result.workflowType) {
164164
WorkflowType.CREATION -> {
165-
metadata.creation.retry
165+
metadata.creation?.retry
166166
}
167167
WorkflowType.DELETION -> {
168168
metadata.deletion?.retry
@@ -249,7 +249,11 @@ class SMStateMachine(
249249
val metadataToSave =
250250
SMMetadata.Builder(metadata)
251251
.setSeqNoPrimaryTerm(job.seqNo, job.primaryTerm)
252-
.setNextCreationTime(job.creation.schedule.getNextExecutionTime(now))
252+
253+
val creation = job.creation
254+
creation?.let {
255+
metadataToSave.setNextCreationTime(creation.schedule.getNextExecutionTime(now))
256+
}
253257

254258
val deletion = job.deletion
255259
deletion?.let {

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreatingState.kt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,14 @@ object CreatingState : State {
3838
SMMetadata.Builder(metadata)
3939
.workflow(WorkflowType.CREATION)
4040

41-
var snapshotName: String? = metadata.creation.started?.first()
41+
if (job.creation == null) {
42+
log.warn("Policy creation config becomes null before trying to create snapshot. Reset.")
43+
return SMResult.Fail(
44+
metadataBuilder.resetCreation(), WorkflowType.CREATION, true,
45+
)
46+
}
47+
48+
var snapshotName: String? = metadata.creation?.started?.first()
4249

4350
// Check if there's already a snapshot created by SM in current execution period.
4451
// So that this State can be executed idempotent.
@@ -54,8 +61,8 @@ object CreatingState : State {
5461
}
5562
val getSnapshots = getSnapshotsResult.snapshots
5663

57-
val latestExecutionStartTime = job.creation.schedule.getPeriodStartingAt(null).v1()
58-
snapshotName = checkCreatedSnapshots(latestExecutionStartTime, getSnapshots)
64+
val latestExecutionStartTime = job.creation?.schedule?.getPeriodStartingAt(null)?.v1()
65+
snapshotName = latestExecutionStartTime?.let { checkCreatedSnapshots(it, getSnapshots) }
5966
if (snapshotName != null) {
6067
log.info("Already created snapshot [$snapshotName] during this execution period starting at $latestExecutionStartTime.")
6168
metadataBuilder.setLatestExecution(

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationConditionMetState.kt

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State
1111
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.WorkflowType
1212
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
1313
import org.opensearch.indexmanagement.snapshotmanagement.tryUpdatingNextExecutionTime
14+
import java.time.Instant.now
1415

1516
object CreationConditionMetState : State {
1617
override val continuous = true
1718

19+
@Suppress("ReturnCount")
1820
override suspend fun execute(context: SMStateMachine): SMResult {
1921
val job = context.job
2022
val metadata = context.metadata
@@ -24,15 +26,35 @@ object CreationConditionMetState : State {
2426
SMMetadata.Builder(metadata)
2527
.workflow(WorkflowType.CREATION)
2628

27-
val nextCreationTime = metadata.creation.trigger.time
28-
val updateNextTimeResult =
29-
tryUpdatingNextExecutionTime(
30-
metadataBuilder, nextCreationTime, job.creation.schedule, WorkflowType.CREATION, log,
29+
if (job.creation == null) {
30+
log.warn("Policy creation config becomes null before trying to create snapshot. Reset.")
31+
return SMResult.Fail(
32+
metadataBuilder.resetCreation(), WorkflowType.CREATION, true,
3133
)
32-
if (!updateNextTimeResult.updated) {
33-
return SMResult.Stay(metadataBuilder)
3434
}
35-
metadataBuilder = updateNextTimeResult.metadataBuilder
35+
36+
// if job.creation != null, then metadata.creation.trigger.time should already be
37+
// initialized or handled in handlePolicyChange before executing this state.
38+
val nextCreationTime = if (metadata.creation == null) {
39+
val nextTime = job.creation.schedule.getNextExecutionTime(now())
40+
nextTime?.let { metadataBuilder.setNextCreationTime(it) }
41+
nextTime
42+
} else {
43+
metadata.creation.trigger.time
44+
}
45+
46+
nextCreationTime?.let { creationTime ->
47+
job.creation.schedule.let { schedule ->
48+
val updateNextTimeResult =
49+
tryUpdatingNextExecutionTime(
50+
metadataBuilder, creationTime, schedule, WorkflowType.CREATION, log,
51+
)
52+
if (!updateNextTimeResult.updated) {
53+
return SMResult.Stay(metadataBuilder)
54+
}
55+
metadataBuilder = updateNextTimeResult.metadataBuilder
56+
}
57+
}
3658

3759
return SMResult.Next(metadataBuilder)
3860
}

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/creation/CreationFinishedState.kt

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.time.Instant.now
2121
object CreationFinishedState : State {
2222
override val continuous = true
2323

24-
@Suppress("ReturnCount", "LongMethod", "NestedBlockDepth")
24+
@Suppress("ReturnCount", "LongMethod", "NestedBlockDepth", "CyclomaticComplexMethod")
2525
override suspend fun execute(context: SMStateMachine): SMResult {
2626
val client = context.client
2727
val job = context.job
@@ -32,8 +32,8 @@ object CreationFinishedState : State {
3232
SMMetadata.Builder(metadata)
3333
.workflow(WorkflowType.CREATION)
3434

35-
metadata.creation.started?.first()?.let { snapshotName ->
36-
if (metadata.creation.latestExecution == null) {
35+
metadata.creation?.started?.first()?.let { snapshotName ->
36+
if (metadata.creation?.latestExecution == null) {
3737
// This should not happen
3838
log.error("latest_execution is null while checking if snapshot [$snapshotName] creation has finished. Reset.")
3939
metadataBuilder.resetWorkflow()
@@ -75,8 +75,8 @@ object CreationFinishedState : State {
7575
job.notificationConfig?.sendCreationNotification(client, job.policyName, creationMessage, job.user, log)
7676
}
7777
SnapshotState.IN_PROGRESS -> {
78-
job.creation.timeLimit?.let { timeLimit ->
79-
if (timeLimit.isExceed(metadata.creation.latestExecution.startTime)) {
78+
job.creation?.timeLimit?.let { timeLimit ->
79+
if (timeLimit.isExceed(metadata.creation?.latestExecution?.startTime)) {
8080
return timeLimitExceeded(
8181
timeLimit, metadataBuilder, WorkflowType.CREATION, log,
8282
)
@@ -98,18 +98,22 @@ object CreationFinishedState : State {
9898

9999
// if now is after next creation time, update nextCreationTime to next execution schedule
100100
// TODO may want to notify user that we skipped the execution because snapshot creation time is longer than execution schedule
101-
val result =
102-
tryUpdatingNextExecutionTime(
103-
metadataBuilder, metadata.creation.trigger.time, job.creation.schedule,
104-
WorkflowType.CREATION, log,
105-
)
106-
if (result.updated) {
107-
metadataBuilder = result.metadataBuilder
101+
metadata.creation?.trigger?.time?.let { triggerTime ->
102+
job.creation?.schedule?.let { schedule ->
103+
val result =
104+
tryUpdatingNextExecutionTime(
105+
metadataBuilder, triggerTime, schedule,
106+
WorkflowType.CREATION, log,
107+
)
108+
if (result.updated) {
109+
metadataBuilder = result.metadataBuilder
110+
}
111+
}
108112
}
109113
}
110114

111115
val metadataToSave = metadataBuilder.build()
112-
if (metadataToSave.creation.started != null) {
116+
if (metadataToSave.creation?.started != null) {
113117
return SMResult.Stay(metadataBuilder)
114118
}
115119
return SMResult.Next(metadataBuilder)

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletingState.kt

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,28 @@ object DeletingState : State {
4949

5050
val snapshotsToDelete: List<String>
5151

52+
var snapshotPattern = job.policyName + "*"
53+
54+
if (job.deletion?.snapshotPattern != null) {
55+
snapshotPattern += "," + job.deletion.snapshotPattern
56+
}
5257
val getSnapshotsRes =
5358
client.getSnapshots(
54-
job, job.policyName + "*", metadataBuilder, log,
59+
job, snapshotPattern, metadataBuilder, log,
5560
getSnapshotsMissingMessage(),
5661
getSnapshotsErrorMessage(),
5762
)
5863
metadataBuilder = getSnapshotsRes.metadataBuilder
5964
if (getSnapshotsRes.failed) {
6065
return SMResult.Fail(metadataBuilder, WorkflowType.DELETION)
6166
}
62-
val getSnapshots = getSnapshotsRes.snapshots
67+
val snapshots = getSnapshotsRes.snapshots
68+
.distinctBy { it.snapshotId().name }
69+
.filter { it.state() != SnapshotState.IN_PROGRESS }
6370

6471
snapshotsToDelete =
6572
filterByDeleteCondition(
66-
getSnapshots.filter { it.state() != SnapshotState.IN_PROGRESS },
73+
snapshots,
6774
job.deletion.condition, log,
6875
)
6976
if (snapshotsToDelete.isNotEmpty()) {

src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/deletion/DeletionFinishedState.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,24 @@ object DeletionFinishedState : State {
3838
return@let
3939
}
4040

41+
var snapshotPattern = job.policyName + "*"
42+
43+
if (job.deletion?.snapshotPattern != null) {
44+
snapshotPattern += "," + job.deletion.snapshotPattern
45+
}
4146
val getSnapshotsRes =
4247
client.getSnapshots(
43-
job, "${job.policyName}*", metadataBuilder, log,
48+
job, snapshotPattern, metadataBuilder, log,
4449
getSnapshotMissingMessageInDeletionWorkflow(),
4550
getSnapshotExceptionInDeletionWorkflow(snapshotsStartedDeletion),
4651
)
4752
metadataBuilder = getSnapshotsRes.metadataBuilder
4853
if (getSnapshotsRes.failed) {
4954
return SMResult.Fail(metadataBuilder, WorkflowType.DELETION)
5055
}
51-
val getSnapshots = getSnapshotsRes.snapshots
56+
val snapshots = getSnapshotsRes.snapshots.distinctBy { it.snapshotId().name }
5257

53-
val existingSnapshotsNameSet = getSnapshots.map { it.snapshotId().name }.toSet()
58+
val existingSnapshotsNameSet = snapshots.map { it.snapshotId().name }.toSet()
5459
val remainingSnapshotsName = existingSnapshotsNameSet intersect snapshotsStartedDeletion.toSet()
5560
if (remainingSnapshotsName.isEmpty()) {
5661
val deletionMessage = "Snapshot(s) $snapshotsStartedDeletion deletion has finished."

0 commit comments

Comments
 (0)