Skip to content

Commit d928226

Browse files
committed
[FLINK-36162] Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot jobReference
1 parent 2ee9f20 commit d928226

34 files changed

+149
-613
lines changed

docs/content/docs/concepts/controller-flow.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ It’s very important to understand that the Observer phase records a point-in-t
9898
The `AbstractFlinkResourceReconciler` encapsulates the core reconciliation flow for all Flink resource types. Let’s take a look at the high level flow before we go into specifics for session, application and session job resources.
9999

100100
1. Check if the resource is ready for reconciliation or if there are any pending operations that should not be interrupted (manual savepoints for example)
101-
2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `flinkStateSnapshotReference` provided in the spec.
101+
2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `initialSavepointPath` provided in the spec.
102102
3. Next we determine if the desired spec changed and the type of change: `IGNORE, SCALE, UPGRADE`. Only for scale and upgrade type changes do we need to execute further reconciliation logic.
103103
4. If we have upgrade/scale spec changes we execute the upgrade logic specific for the resource type
104104
5. If we did not receive any spec change we still have to ensure that the currently deployed resources are fully reconciled:

docs/content/docs/custom-resource/job-management.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -247,17 +247,16 @@ Users have two options to restore a job from a target savepoint / checkpoint
247247

248248
### Redeploy using the savepointRedeployNonce
249249

250-
It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `flinkStateSnapshotReference` in the job spec:
250+
It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `initialSavepointPath` in the job spec:
251251

252252
```yaml
253253
job:
254-
flinkStateSnapshotReference:
255-
path: file://redeploy-target-savepoint
254+
initialSavepointPath: file://redeploy-target-savepoint
256255
# If not set previously, set to 1, otherwise increment, e.g. 2
257256
savepointRedeployNonce: 1
258257
```
259258

260-
When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `flinkStateSnapshotReference`. The savepoint path must not be empty.
259+
When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `initialSavepointPath`. The savepoint path must not be empty.
261260

262261
{{< hint warning >}}
263262
Rollbacks are not supported after redeployments.
@@ -271,7 +270,7 @@ However, this also means that savepoint history is lost and the operator won't c
271270
1. Locate the latest checkpoint/savepoint metafile in your configured checkpoint/savepoint directory.
272271
2. Delete the `FlinkDeployment` resource for your application
273272
3. Check that you have the current savepoint, and that your `FlinkDeployment` is deleted completely
274-
4. Modify your `FlinkDeployment` JobSpec and set `flinkStateSnapshotReference.path` to your last checkpoint location
273+
4. Modify your `FlinkDeployment` JobSpec and set `initialSavepointPath` to your last checkpoint location
275274
5. Recreate the deployment
276275

277276
These steps ensure that the operator will start completely fresh from the user defined savepoint path and can hopefully fully recover.

docs/content/docs/custom-resource/reference.md

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,6 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
9090
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | Flink configuration overrides for the Flink deployment or Flink session job. |
9191
| deploymentName | java.lang.String | The name of the target session cluster deployment. |
9292

93-
### FlinkStateSnapshotReference
94-
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference
95-
96-
**Description**: Reference for a FlinkStateSnapshot.
97-
98-
| Parameter | Type | Docs |
99-
| ----------| ---- | ---- |
100-
| namespace | java.lang.String | Namespace of the snapshot resource. |
101-
| name | java.lang.String | Name of the snapshot resource. |
102-
| path | java.lang.String | If a path is given, all other fields will be ignored, and this will be used as the initial savepoint path. |
103-
10493
### FlinkStateSnapshotSpec
10594
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec
10695

@@ -172,7 +161,6 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
172161
| ----------| ---- | ---- |
173162
| kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. |
174163
| name | java.lang.String | Name of the Flink resource. |
175-
| namespace | java.lang.String | Namespace of the Flink resource. If empty, the operator will use the namespace of the snapshot. |
176164

177165
### JobSpec
178166
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec
@@ -188,11 +176,10 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
188176
| state | org.apache.flink.kubernetes.operator.api.spec.JobState | Desired state for the job. |
189177
| savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger savepoint for the running job. In order to trigger a savepoint, change the number to a different non-null value. |
190178
| initialSavepointPath | java.lang.String | Savepoint path used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
191-
| flinkStateSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | Snapshot reference used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
192179
| checkpointTriggerNonce | java.lang.Long | Nonce used to manually trigger checkpoint for the running job. In order to trigger a checkpoint, change the number to a different non-null value. |
193180
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
194181
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
195-
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath or the path/FlinkStateSnapshot reference in flinkStateSnapshotReference. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
182+
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
196183

197184
### JobState
198185
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
@@ -418,7 +405,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
418405
| state | java.lang.String | Last observed state of the job. |
419406
| startTime | java.lang.String | Start time of the job. |
420407
| updateTime | java.lang.String | Update time of the job. |
421-
| upgradeSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | |
408+
| upgradeSavepointPath | java.lang.String | |
422409
| savepointInfo | org.apache.flink.kubernetes.operator.api.status.SavepointInfo | Information about pending and last savepoint for the job. |
423410
| checkpointInfo | org.apache.flink.kubernetes.operator.api.status.CheckpointInfo | Information about pending and last checkpoint for the job. |
424411

docs/content/docs/custom-resource/snapshots.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ If you set this to false, the operator will keep using the deprecated status fie
3838
To create a savepoint or checkpoint, exactly one of the spec fields `savepoint` or `checkpoint` must present.
3939
Furthermore, in case of a savepoint you can signal to the operator that the savepoint already exists using the `alreadyExists` field, and the operator will mark it as a successful snapshot in the next reconciliation phase.
4040

41-
You can also instruct the Operator to start a new FlinkDeployment/FlinkSessionJob from an existing snapshot CR by using `flinkStateSnapshotReference` in the job spec.
41+
You can also instruct the Operator to start a new FlinkDeployment/FlinkSessionJob from an existing snapshot by using `initialSavepointPath` in the job spec.
4242

4343
## Examples
4444

@@ -78,11 +78,11 @@ spec:
7878
7979
### Start job from existing snapshot
8080
81+
To start a job from an existing snapshot, you need to extract the path then use:
82+
8183
```yaml
8284
job:
83-
flinkStateSnapshotReference:
84-
namespace: flink # not required if it's in the same namespace
85-
name: example-savepoint
85+
initialSavepointPath: [savepoint_path]
8686
```
8787
8888
{{< hint warning >}}
@@ -131,7 +131,7 @@ This feature is not available for checkpoints.
131131
## Triggering snapshots
132132

133133
Upgrade savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections.
134-
In this case, the savepoint path will also be recorded in the `upgradeSnapshotReference` job status field, which the operator will use when restarting the job.
134+
In this case, the savepoint path will also be recorded in the `upgradeSavepointPath` job status field, which the operator will use when restarting the job.
135135

136136
For backup, job forking and other purposes savepoint and checkpoints can be triggered manually or periodically by the operator, however generally speaking these will not be used during upgrades and are not required for the correct operation.
137137

docs/content/docs/operations/upgrade.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,20 +148,19 @@ Here is a reference example of upgrading a `basic-checkpoint-ha-example` deploym
148148
```
149149
5. Restore the job:
150150

151-
Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.flinkStateSnapshotReference.path` to the savepoint location obtained from the step 1.
151+
Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.initialSavepointPath` to the savepoint location obtained from the step 1.
152152

153153
```
154154
spec:
155155
...
156156
job:
157-
flinkStateSnapshotReference:
158-
path: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
157+
initialSavepointPath: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
159158
upgradeMode: savepoint
160159
...
161160
```
162161
Alternatively, we may use this command to edit and deploy the manifest:
163162
```sh
164-
wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.flinkStateSnapshotReference.path" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f -
163+
wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.initialSavepointPath" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f -
165164
```
166165
Finally, verify that `deploy/basic-checkpoint-ha-example` log has:
167166
```

e2e-tests/test_snapshot.sh

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIME
5656
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":{"state": "suspended"}}}'
5757
wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${TIMEOUT} || exit 1
5858

59-
location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq '.status.jobStatus.upgradeSnapshotReference.path')
59+
location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq '.status.jobStatus.upgradeSavepointPath')
6060
if [ "$location" == "" ]; then echo "Legacy savepoint location was empty"; exit 1; fi
61-
echo "Removing upgradeSnapshotReference and setting lastSavepoint"
62-
kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch '{"status":{"jobStatus":{"upgradeSnapshotReference":null,"savepointInfo":{"lastSavepoint":{"timeStamp": 0, "location": "'$location'", "triggerNonce": 0}}}}}'
61+
echo "Removing upgradeSavepointPath and setting lastSavepoint"
62+
kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch '{"status":{"jobStatus":{"upgradeSavepointPath":null,"savepointInfo":{"lastSavepoint":{"timeStamp": 0, "location": "'$location'", "triggerNonce": 0}}}}}'
6363

6464
# Delete operator Pod to clear CR state cache
6565
kubectl delete pod -n $(get_operator_pod_namespace) $(get_operator_pod_name)
@@ -151,13 +151,11 @@ wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${T
151151
echo "Waiting for upgrade savepoint..."
152152
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT})
153153
if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
154-
echo "Found upgrade snapshot: $snapshot"
155-
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} || exit 1
156154

157155
location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')
158156
if [ "$location" == "" ]; then echo "Upgrade savepoint location was empty"; exit 1; fi
159157

160-
158+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSavepointPath' "$location" ${TIMEOUT} || exit 1
161159

162160
echo "Restarting deployment..."
163161
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "running" } } }'

examples/snapshot/job-from-savepoint.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,4 @@ spec:
6464
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
6565
parallelism: 2
6666
upgradeMode: savepoint
67-
flinkStateSnapshotReference:
68-
name: example-savepoint
69-
namespace: flink
67+
initialSavepointPath: file:///flink-data/savepoints/savepoint-45305c-d867504446e0

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,9 @@ public class JobReference {
4444
/** Name of the Flink resource. */
4545
private String name;
4646

47-
/**
48-
* Namespace of the Flink resource. If empty, the operator will use the namespace of the
49-
* snapshot.
50-
*/
51-
private String namespace;
52-
5347
public static JobReference fromFlinkResource(AbstractFlinkResource<?, ?> flinkResource) {
5448
var result = new JobReference();
5549
result.setName(flinkResource.getMetadata().getName());
56-
result.setNamespace(flinkResource.getMetadata().getNamespace());
5750

5851
if (flinkResource instanceof FlinkDeployment) {
5952
result.setKind(JobKind.FLINK_DEPLOYMENT);
@@ -71,6 +64,6 @@ public String toString() {
7164
} else if (kind == JobKind.FLINK_SESSION_JOB) {
7265
kindString = CrdConstants.KIND_SESSION_JOB;
7366
}
74-
return String.format("%s/%s (%s)", namespace, name, kindString);
67+
return String.format("%s (%s)", name, kindString);
7568
}
7669
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,8 @@ public class JobSpec implements Diffable<JobSpec> {
7272
* redeployments (triggered by changing the savepointRedeployNonce).
7373
*/
7474
@SpecDiff(DiffType.IGNORE)
75-
@Deprecated
7675
private String initialSavepointPath;
7776

78-
/**
79-
* Snapshot reference used by the job the first time it is deployed or during savepoint
80-
* redeployments (triggered by changing the savepointRedeployNonce).
81-
*/
82-
@SpecDiff(DiffType.IGNORE)
83-
private FlinkStateSnapshotReference flinkStateSnapshotReference;
84-
8577
/**
8678
* Nonce used to manually trigger checkpoint for the running job. In order to trigger a
8779
* checkpoint, change the number to a different non-null value.
@@ -100,9 +92,8 @@ public class JobSpec implements Diffable<JobSpec> {
10092

10193
/**
10294
* Nonce used to trigger a full redeployment of the job from the savepoint path specified in
103-
* initialSavepointPath or the path/FlinkStateSnapshot reference in flinkStateSnapshotReference.
104-
* In order to trigger redeployment, change the number to a different non-null value. Rollback
105-
* is not possible after redeployment.
95+
* initialSavepointPath. In order to trigger redeployment, change the number to a different
96+
* non-null value. Rollback is not possible after redeployment.
10697
*/
10798
@SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true)
10899
private Long savepointRedeployNonce;

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.kubernetes.operator.api.status;
1919

2020
import org.apache.flink.annotation.Experimental;
21-
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
2221

2322
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2423
import io.fabric8.crd.generator.annotation.PrinterColumn;
@@ -51,7 +50,7 @@ public class JobStatus {
5150
/** Update time of the job. */
5251
private String updateTime;
5352

54-
private FlinkStateSnapshotReference upgradeSnapshotReference;
53+
private String upgradeSavepointPath;
5554

5655
/** Information about pending and last savepoint for the job. */
5756
@Deprecated private SavepointInfo savepointInfo = new SavepointInfo();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2525
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
26-
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
2726
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
2827
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
2928
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
@@ -454,10 +453,7 @@ private void observeLatestCheckpoint(
454453
flinkService
455454
.getLastCheckpoint(JobID.fromHexString(jobID), observeConfig)
456455
.ifPresent(
457-
snapshot ->
458-
jobStatus.setUpgradeSnapshotReference(
459-
FlinkStateSnapshotReference.fromPath(
460-
snapshot.getLocation())));
456+
snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()));
461457
} catch (Exception e) {
462458
LOG.error("Could not observe latest checkpoint information.", e);
463459
throw new ReconciliationException(e);

0 commit comments

Comments
 (0)