diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0b4f8b0137..9285f96ff3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,7 +37,7 @@ jobs: name: maven build strategy: matrix: - java-version: [ 11, 17, 21 ] + java-version: [ 17, 21 ] steps: - uses: actions/checkout@v4 - name: Set up JDK ${{ matrix.java-version }} @@ -76,7 +76,7 @@ jobs: strategy: matrix: http-client: [ "okhttp", "jdk", "jetty", "vertx" ] - java-version: [ "11", "17", "21" ] + java-version: [ "17", "21" ] uses: ./.github/workflows/e2e.yaml with: java-version: ${{ matrix.java-version }} @@ -89,7 +89,7 @@ jobs: strategy: matrix: http-client: [ "okhttp" ] - java-version: [ "11", "17"] + java-version: [ "17"] flink-version: - "v2_0" - "v1_20" @@ -142,7 +142,7 @@ jobs: uses: ./.github/workflows/e2e.yaml with: - java-version: 11 + java-version: 17 flink-version: ${{ matrix.flink-version }} test: ${{ matrix.test }} namespace: "flink" @@ -211,7 +211,7 @@ jobs: test: test_snapshot.sh uses: ./.github/workflows/e2e.yaml with: - java-version: 11 + java-version: 17 flink-version: ${{ matrix.flink-version }} test: ${{ matrix.test }} mode: ${{ matrix.mode }} diff --git a/.github/workflows/docker_push.yml b/.github/workflows/docker_push.yml index bf4d92a886..fb6fc2236f 100644 --- a/.github/workflows/docker_push.yml +++ b/.github/workflows/docker_push.yml @@ -45,7 +45,7 @@ jobs: image: tonistiigi/binfmt:qemu-v7.0.0 platforms: all - - name: Set up Docker Buildx + - name: Set up Docker Build uses: docker/setup-buildx-action@v3 - name: Log in to the Container registry diff --git a/.github/workflows/publish_snapshot.yml b/.github/workflows/publish_snapshot.yml index 9a6c67603a..ea29187555 100644 --- a/.github/workflows/publish_snapshot.yml +++ b/.github/workflows/publish_snapshot.yml @@ -34,7 +34,7 @@ jobs: - name: Set up JDK 11 uses: actions/setup-java@v4 with: - java-version: '11' + java-version: '17' distribution: 'temurin' - name: Cache local Maven repository uses: actions/cache@v4 diff --git a/Dockerfile b/Dockerfile index b9528af4f3..257bf549ba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ # Build -ARG JAVA_VERSION=11 +ARG JAVA_VERSION=17 FROM maven:3.8.8-eclipse-temurin-${JAVA_VERSION} AS build ARG SKIP_TESTS=true ARG HTTP_CLIENT=okhttp diff --git a/docs/content.zh/docs/development/guide.md b/docs/content.zh/docs/development/guide.md index c8cfd163f6..5edf352904 100644 --- a/docs/content.zh/docs/development/guide.md +++ b/docs/content.zh/docs/development/guide.md @@ -36,7 +36,7 @@ In order to build the operator you need to [clone the git repository]({{< github git clone {{< github_repo >}} ``` -To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 11**. +To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 17**. To build the project, you can use the following command: diff --git a/docs/content/docs/development/guide.md b/docs/content/docs/development/guide.md index c8cfd163f6..5edf352904 100644 --- a/docs/content/docs/development/guide.md +++ b/docs/content/docs/development/guide.md @@ -36,7 +36,7 @@ In order to build the operator you need to [clone the git repository]({{< github git clone {{< github_repo >}} ``` -To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 11**. +To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 17**. To build the project, you can use the following command: diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java index 8dce0d89f2..3a44ef540a 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java @@ -97,7 +97,13 @@ protected static void checkObjectCompatibility( // This claims field was removed in Kubernetes 1.28 as it was mistakenly // added in the first place. For more context please refer to // https://github.com/kubernetes/api/commit/8b14183 - && !fieldPath.contains(".volumeClaimTemplate.spec.resources.claims")) { + && !fieldPath.contains(".volumeClaimTemplate.spec.resources.claims") + && !fieldPath.contains( + ".spec.taskManager.podTemplate.spec.resourceClaims.items.source") + && !fieldPath.contains( + ".spec.jobManager.podTemplate.spec.resourceClaims.items.source") + && !fieldPath.contains( + ".spec.podTemplate.spec.resourceClaims.items.source")) { err(fieldPath + " has been removed"); } } else { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 4bd2836f7a..06c361aea3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -66,6 +66,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -150,14 +151,15 @@ private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) { overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, configManager)); } - overrider.withTerminationTimeoutSeconds( - (int) + overrider.withReconciliationTerminationTimeout( + Duration.ofSeconds( conf.get(KubernetesOperatorConfigOptions.OPERATOR_TERMINATION_TIMEOUT) - .toSeconds()); + .toSeconds())); overrider.withStopOnInformerErrorDuringStartup( conf.get(KubernetesOperatorConfigOptions.OPERATOR_STOP_ON_INFORMER_ERROR)); + overrider.withUseSSAToPatchPrimaryResource(false); var leaderElectionConf = operatorConf.getLeaderElectionConfiguration(); if (leaderElectionConf != null) { overrider.withLeaderElectionConfiguration(leaderElectionConf); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index f4203fb7be..393957b0ac 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -496,13 +496,13 @@ public static String operatorConfigKey(String key) { .withDescription( "Whether informer errors should stop operator startup. If false, the startup will ignore recoverable errors, caused for example by RBAC issues and will retry periodically."); + public static final int DEFAULT_TERMINATION_TIMEOUT_SECONDS = 10; + @Documentation.Section(SECTION_ADVANCED) public static final ConfigOption OPERATOR_TERMINATION_TIMEOUT = operatorConfig("termination.timeout") .durationType() - .defaultValue( - Duration.ofSeconds( - ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS)) + .defaultValue(Duration.ofSeconds(DEFAULT_TERMINATION_TIMEOUT_SECONDS)) .withDescription( "Operator shutdown timeout before reconciliation threads are killed."); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 51235b8ebe..e5d418b81f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -43,10 +43,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @@ -55,17 +53,13 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; /** Controller that runs the main reconcile loop for Flink deployments. */ @ControllerConfiguration public class FlinkDeploymentController - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer, - Cleaner { + implements Reconciler, Cleaner { private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class); private final Set validators; @@ -185,9 +179,9 @@ private void triggerErrorEvent( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext context) { - List eventSources = new ArrayList<>(); + List> eventSources = new ArrayList<>(); eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context)); eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context)); @@ -199,7 +193,7 @@ public Map prepareEventSources( "Could not initialize informer for snapshots as the CRD has not been installed!"); } - return EventSourceInitializer.nameEventSources(eventSources.toArray(EventSource[]::new)); + return eventSources; } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index 7454864fe7..a7f2106b06 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -40,10 +40,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import org.slf4j.Logger; @@ -51,7 +49,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; @@ -59,8 +56,6 @@ @ControllerConfiguration() public class FlinkSessionJobController implements io.javaoperatorsdk.operator.api.reconciler.Reconciler, - ErrorStatusHandler, - EventSourceInitializer, Cleaner { private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class); @@ -179,9 +174,9 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext context) { - List eventSources = new ArrayList<>(); + List> eventSources = new ArrayList<>(); eventSources.add(EventSourceUtils.getFlinkDeploymentInformerEventSource(context)); if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) { @@ -192,7 +187,7 @@ public Map prepareEventSources( "Could not initialize informer for snapshots as the CRD has not been installed!"); } - return EventSourceInitializer.nameEventSources(eventSources.toArray(EventSource[]::new)); + return eventSources; } private boolean validateSessionJob(FlinkResourceContext ctx) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java index 1a516c3d9f..8c36dc0efd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java @@ -34,10 +34,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @@ -47,7 +45,7 @@ import java.time.Duration; import java.util.HashMap; -import java.util.Map; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -55,10 +53,7 @@ @RequiredArgsConstructor @ControllerConfiguration public class FlinkStateSnapshotController - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer, - Cleaner { + implements Reconciler, Cleaner { private static final Logger LOG = LoggerFactory.getLogger(FlinkStateSnapshotController.class); @@ -154,10 +149,9 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext context) { - return EventSourceInitializer.nameEventSources( - EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context)); + return List.of(EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context)); } /** @@ -176,9 +170,9 @@ private UpdateControl getUpdateControl(FlinkStateSnapshotCon var statusChanged = resourceStatusChanged(ctx); if (labelsChanged && statusChanged) { - updateControl = UpdateControl.updateResourceAndPatchStatus(resource); + updateControl = UpdateControl.patchResourceAndStatus(resource); } else if (labelsChanged) { - updateControl = UpdateControl.updateResource(resource); + updateControl = UpdateControl.patchResource(resource); } else if (statusChanged) { updateControl = UpdateControl.patchStatus(resource); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java index 5350e3cd76..66c191d6ee 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java @@ -28,6 +28,7 @@ import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.api.reconciler.Constants; @@ -108,7 +109,8 @@ public void cleanupDoneFor(ResourceID resourceID, Map metadata) @Override public void reconcileCustomResource( - ResourceID resourceID, RetryInfo retryInfoNullable, Map metadata) { + HasMetadata resource, RetryInfo retryInfoNullable, Map metadata) { + var resourceID = ResourceID.fromResource(resource); counter(getResourceMg(resourceID, metadata), RECONCILIATION).inc(); if (retryInfoNullable != null) { @@ -117,14 +119,22 @@ public void reconcileCustomResource( } @Override - public void finishedReconciliation(ResourceID resourceID, Map metadata) { - counter(getResourceMg(resourceID, metadata), RECONCILIATION, "finished").inc(); + public void finishedReconciliation(HasMetadata resource, Map metadata) { + counter( + getResourceMg(ResourceID.fromResource(resource), metadata), + RECONCILIATION, + "finished") + .inc(); } @Override public void failedReconciliation( - ResourceID resourceID, Exception exception, Map metadata) { - counter(getResourceMg(resourceID, metadata), RECONCILIATION, "failed").inc(); + HasMetadata resource, Exception exception, Map metadata) { + counter( + getResourceMg(ResourceID.fromResource(resource), metadata), + RECONCILIATION, + "failed") + .inc(); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java index 24ecb462d0..da544e9e94 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java @@ -28,15 +28,15 @@ import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController; import org.apache.flink.kubernetes.utils.Constants; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import java.util.Collections; import java.util.List; @@ -63,7 +63,8 @@ public class EventSourceUtils { var labelSelector = String.format("%s in (%s)", CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, labelFilters); var configuration = - InformerConfiguration.from(FlinkStateSnapshot.class, context) + InformerEventSourceConfiguration.from( + FlinkStateSnapshot.class, context.getPrimaryResourceClass()) .withLabelSelector(labelSelector) .withSecondaryToPrimaryMapper( snapshot -> { @@ -76,8 +77,8 @@ public class EventSourceUtils { snapshot.getSpec().getJobReference().getName(), snapshot.getMetadata().getNamespace())); }) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); return new InformerEventSource<>(configuration, context); } @@ -92,11 +93,11 @@ public static InformerEventSource getDeploymentInfo .collect(Collectors.joining(",")); var configuration = - InformerConfiguration.from(Deployment.class, context) + InformerEventSourceConfiguration.from(Deployment.class, FlinkDeployment.class) .withLabelSelector(labelSelector) - .withSecondaryToPrimaryMapper(Mappers.fromLabel(Constants.LABEL_APP_KEY)) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withSecondaryToPrimaryMapper(fromLabel(Constants.LABEL_APP_KEY)) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); return new InformerEventSource<>(configuration, context); @@ -114,8 +115,8 @@ public static InformerEventSource getDeploymentInfo flinkDeployment.getMetadata().getName(), flinkDeployment.getMetadata().getNamespace()))); - InformerConfiguration configuration = - InformerConfiguration.from(FlinkSessionJob.class, context) + var configuration = + InformerEventSourceConfiguration.from(FlinkSessionJob.class, FlinkSessionJob.class) .withSecondaryToPrimaryMapper( sessionJob -> context @@ -132,8 +133,8 @@ public static InformerEventSource getDeploymentInfo .stream() .map(ResourceID::fromResource) .collect(Collectors.toSet())) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); return new InformerEventSource<>(configuration, context); @@ -150,8 +151,8 @@ public static InformerEventSource getDeploymentInfo sessionJob.getSpec().getDeploymentName(), sessionJob.getMetadata().getNamespace()))); - InformerConfiguration configuration = - InformerConfiguration.from(FlinkDeployment.class, context) + var configuration = + InformerEventSourceConfiguration.from(FlinkDeployment.class, FlinkSessionJob.class) .withSecondaryToPrimaryMapper( flinkDeployment -> context @@ -179,8 +180,8 @@ public static InformerEventSource getDeploymentInfo sessionJob .getMetadata() .getNamespace()))) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); return new InformerEventSource<>(configuration, context); } @@ -201,8 +202,9 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources( savepoint.getMetadata().getNamespace())); }); - InformerConfiguration configurationFlinkSessionJob = - InformerConfiguration.from(FlinkSessionJob.class, context) + var configurationFlinkSessionJob = + InformerEventSourceConfiguration.from( + FlinkSessionJob.class, FlinkStateSnapshot.class) .withSecondaryToPrimaryMapper(getSnapshotPrimaryMapper(context)) .withPrimaryToSecondaryMapper( (PrimaryToSecondaryMapper) @@ -218,14 +220,15 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources( .getSnapshotJobReferenceResourceId( snapshot)); }) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); var flinkSessionJobEventSource = new InformerEventSource<>(configurationFlinkSessionJob, context); - InformerConfiguration configurationFlinkDeployment = - InformerConfiguration.from(FlinkDeployment.class, context) + var configurationFlinkDeployment = + InformerEventSourceConfiguration.from( + FlinkDeployment.class, FlinkStateSnapshot.class) .withSecondaryToPrimaryMapper(getSnapshotPrimaryMapper(context)) .withPrimaryToSecondaryMapper( (PrimaryToSecondaryMapper) @@ -258,8 +261,8 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources( .getSnapshotJobReferenceResourceId( snapshot)); }) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); var flinkDeploymentEventSource = new InformerEventSource<>(configurationFlinkDeployment, context); @@ -283,6 +286,26 @@ SecondaryToPrimaryMapper getSnapshotPrimaryMapper( .collect(Collectors.toSet()); } + public static SecondaryToPrimaryMapper fromLabel(String nameKey) { + return resource -> { + final var metadata = resource.getMetadata(); + if (metadata == null) { + return Collections.emptySet(); + } else { + final var map = metadata.getLabels(); + if (map == null) { + return Collections.emptySet(); + } + var name = map.get(nameKey); + if (name == null) { + return Collections.emptySet(); + } + var namespace = resource.getMetadata().getNamespace(); + return Set.of(new ResourceID(name, namespace)); + } + }; + } + private static String indexKey(String name, String namespace) { return name + "#" + namespace; } diff --git a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE index b0589175ab..7e51f91131 100644 --- a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE +++ b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE @@ -12,7 +12,7 @@ This project bundles the following dependencies under the Apache Software Licens - com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.15.0 - com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.15.0 - com.google.code.findbugs:jsr305:jar:1.3.9 -- com.squareup.okhttp3:logging-interceptor:jar:3.12.12 +- com.squareup.okhttp3:logging-interceptor:jar:4.12.0 - com.squareup.okhttp3:okhttp:jar:4.12.0 - com.squareup.okio:okio-jvm:jar:3.6.0 - com.squareup.okio:okio:jar:3.6.0 @@ -22,34 +22,34 @@ This project bundles the following dependencies under the Apache Software Licens - commons-cli:commons-cli:jar:1.5.0 - commons-collections:commons-collections:jar:3.2.2 - commons-io:commons-io:jar:2.17.0 -- io.fabric8:kubernetes-client-api:jar:6.13.2 -- io.fabric8:kubernetes-client:jar:6.13.2 -- io.fabric8:kubernetes-httpclient-okhttp:jar:6.13.2 -- io.fabric8:kubernetes-model-admissionregistration:jar:6.13.2 -- io.fabric8:kubernetes-model-apiextensions:jar:6.13.2 -- io.fabric8:kubernetes-model-apps:jar:6.13.2 -- io.fabric8:kubernetes-model-autoscaling:jar:6.13.2 -- io.fabric8:kubernetes-model-batch:jar:6.13.2 -- io.fabric8:kubernetes-model-certificates:jar:6.13.2 -- io.fabric8:kubernetes-model-common:jar:6.13.2 -- io.fabric8:kubernetes-model-coordination:jar:6.13.2 -- io.fabric8:kubernetes-model-core:jar:6.13.2 -- io.fabric8:kubernetes-model-discovery:jar:6.13.2 -- io.fabric8:kubernetes-model-events:jar:6.13.2 -- io.fabric8:kubernetes-model-extensions:jar:6.13.2 -- io.fabric8:kubernetes-model-flowcontrol:jar:6.13.2 -- io.fabric8:kubernetes-model-gatewayapi:jar:6.13.2 -- io.fabric8:kubernetes-model-metrics:jar:6.13.2 -- io.fabric8:kubernetes-model-networking:jar:6.13.2 -- io.fabric8:kubernetes-model-node:jar:6.13.2 -- io.fabric8:kubernetes-model-policy:jar:6.13.2 -- io.fabric8:kubernetes-model-rbac:jar:6.13.2 -- io.fabric8:kubernetes-model-resource:jar:6.13.2 -- io.fabric8:kubernetes-model-scheduling:jar:6.13.2 -- io.fabric8:kubernetes-model-storageclass:jar:6.13.2 -- io.fabric8:zjsonpatch:jar:0.3.0 -- io.javaoperatorsdk:operator-framework-core:jar:4.9.4 -- io.javaoperatorsdk:operator-framework:jar:4.9.4 +- io.fabric8:kubernetes-client-api:jar:7.3.0 +- io.fabric8:kubernetes-client:jar:7.3.0 +- io.fabric8:kubernetes-httpclient-okhttp:jar:7.3.0 +- io.fabric8:kubernetes-model-admissionregistration:jar:7.3.0 +- io.fabric8:kubernetes-model-apiextensions:jar:7.3.0 +- io.fabric8:kubernetes-model-apps:jar:7.3.0 +- io.fabric8:kubernetes-model-autoscaling:jar:7.3.0 +- io.fabric8:kubernetes-model-batch:jar:7.3.0 +- io.fabric8:kubernetes-model-certificates:jar:7.3.0 +- io.fabric8:kubernetes-model-common:jar:7.3.0 +- io.fabric8:kubernetes-model-coordination:jar:7.3.0 +- io.fabric8:kubernetes-model-core:jar:7.3.0 +- io.fabric8:kubernetes-model-discovery:jar:7.3.0 +- io.fabric8:kubernetes-model-events:jar:7.3.0 +- io.fabric8:kubernetes-model-extensions:jar:7.3.0 +- io.fabric8:kubernetes-model-flowcontrol:jar:7.3.0 +- io.fabric8:kubernetes-model-gatewayapi:jar:7.3.0 +- io.fabric8:kubernetes-model-metrics:jar:7.3.0 +- io.fabric8:kubernetes-model-networking:jar:7.3.0 +- io.fabric8:kubernetes-model-node:jar:7.3.0 +- io.fabric8:kubernetes-model-policy:jar:7.3.0 +- io.fabric8:kubernetes-model-rbac:jar:7.3.0 +- io.fabric8:kubernetes-model-resource:jar:7.3.0 +- io.fabric8:kubernetes-model-scheduling:jar:7.3.0 +- io.fabric8:kubernetes-model-storageclass:jar:7.3.0 +- io.fabric8:zjsonpatch:jar:7.3.0 +- io.javaoperatorsdk:operator-framework-core:jar:5.1.0 +- io.javaoperatorsdk:operator-framework:jar:5.1.0 - org.apache.commons:commons-compress:jar:1.26.0 - org.apache.commons:commons-lang3:jar:3.16.0 - org.apache.commons:commons-math3:jar:3.6.1 @@ -72,6 +72,8 @@ This project bundles the following dependencies under the Apache Software Licens - org.xerial.snappy:snappy-java:jar:1.1.10.4 - org.yaml:snakeyaml:jar:2.0 - tools.profiler:async-profiler:jar:2.9 +- io.github.java-diff-utils:java-diff-utils:4.15 +- io.fabric8:kubernetes-httpclient-jdk:7.3.0 This project bundles the following dependencies under the BSD License. See bundled license files for details. diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java index b60551a92a..423b2af37e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java @@ -81,7 +81,7 @@ public void testConfigurationPassedToJOSDK() { var labelSelectors = testOperator.registeredControllers.stream() .map(RegisteredController::getConfiguration) - .map(ControllerConfiguration::getLabelSelector); + .map(c -> c.getInformerConfig().getLabelSelector()); labelSelectors.forEach(selector -> Assertions.assertEquals(testSelector, selector)); Assertions.assertFalse(configService.stopOnInformerErrorDuringStartup()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index e3a3ae25b3..b961953a53 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -47,16 +47,15 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.mockwebserver.http.Headers; +import io.fabric8.mockwebserver.http.RecordedRequest; import io.fabric8.mockwebserver.utils.ResponseProvider; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.IndexedResourceCache; -import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator; import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; -import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext; +import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedWorkflowAndDependentResourceContext; import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever; -import okhttp3.Headers; -import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.provider.Arguments; @@ -505,19 +504,14 @@ public Optional getSecondaryResource(Class aClass, String s) { return Optional.empty(); } - @Override - public Optional getSecondaryResource( - Class aClass, ResourceDiscriminator resourceDiscriminator) { - return Optional.empty(); - } - @Override public ControllerConfiguration getControllerConfiguration() { return null; } @Override - public ManagedDependentResourceContext managedDependentResourceContext() { + public ManagedWorkflowAndDependentResourceContext + managedWorkflowAndDependentResourceContext() { return null; } @@ -536,9 +530,19 @@ public ExecutorService getWorkflowExecutorService() { throw new UnsupportedOperationException("Not implemented"); } + @Override + public T getPrimaryResource() { + return null; + } + @Override public IndexedResourceCache getPrimaryCache() { return null; } + + @Override + public boolean isNextReconciliationImminent() { + return false; + } } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 2384fb4da1..21ba67ffa4 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -130,7 +130,7 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); assertEquals(7, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); FlinkDeploymentReconciliationStatus reconciliationStatus = appCluster.getStatus().getReconciliationStatus(); @@ -264,7 +264,7 @@ public void verifyFailedDeployment() throws Exception { appCluster, TestUtils.createContextWithFailedJobManagerDeployment(kubernetesClient)); submittedEventValidatingResponseProvider.assertValidated(); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), @@ -287,7 +287,7 @@ public void verifyFailedDeployment() throws Exception { assertEquals( JobManagerDeploymentStatus.ERROR, appCluster.getStatus().getJobManagerDeploymentStatus()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( ReconciliationUtils.rescheduleAfter( JobManagerDeploymentStatus.ERROR, @@ -563,7 +563,7 @@ public void verifyReconcileWithBadConfig() throws Exception { // reconcile() finishes. appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), "8088"); updateControl = testController.reconcile(appCluster, context); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -577,7 +577,7 @@ public void verifyReconcileWithBadConfig() throws Exception { .getStatus() .getError() .contains("JobManager replicas should not be configured less than one.")); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -603,14 +603,14 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception UpdateControl updateControl; updateControl = testController.reconcile(appCluster, context); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -621,7 +621,7 @@ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception appCluster.getSpec().setJob(null); // Validation fails and JobObserver should still be used updateControl = testController.reconcile(appCluster, context); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -647,14 +647,14 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except UpdateControl updateControl; updateControl = testController.reconcile(appCluster, context); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYING, appCluster.getStatus().getJobManagerDeploymentStatus()); updateControl = testController.reconcile(appCluster, context); JobStatus jobStatus = appCluster.getStatus().getJobStatus(); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -665,7 +665,7 @@ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Except appCluster.getSpec().setJob(TestUtils.buildSessionJob().getSpec().getJob()); // Validation fails and JobObserver should still be used updateControl = testController.reconcile(appCluster, context); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); @@ -1105,7 +1105,7 @@ private void verifyReconcileInitialSuspendedDeployment(FlinkDeployment appCluste appCluster.getStatus().getJobManagerDeploymentStatus()); assertNull(appCluster.getStatus().getJobStatus().getState()); assertEquals(1, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), @@ -1129,7 +1129,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E org.apache.flink.api.common.JobStatus.RECONCILING, appCluster.getStatus().getJobStatus().getState()); assertEquals(4, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager @@ -1152,7 +1152,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E org.apache.flink.api.common.JobStatus.RECONCILING, appCluster.getStatus().getJobStatus().getState()); assertEquals(5, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getRestApiReadyDelay().toMillis()), @@ -1166,7 +1166,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); assertEquals(6, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), @@ -1181,7 +1181,7 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E org.apache.flink.api.common.JobStatus.RUNNING, appCluster.getStatus().getJobStatus().getState()); assertEquals(6, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java index f0489cd21a..09ff806e8c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java @@ -125,7 +125,7 @@ public void verifyBasicReconcileLoop() throws Exception { assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState()); assertEquals(6, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); FlinkSessionJobReconciliationStatus reconciliationStatus = sessionJob.getStatus().getReconciliationStatus(); @@ -521,7 +521,7 @@ public void verifyReconcileWithBadConfig() throws Exception { .getFlinkConfiguration() .put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed"); updateControl = testController.reconcile(sessionJob, context); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals(RECONCILING, sessionJob.getStatus().getJobStatus().getState()); // Check when the bad config is applied, observe() will change the cluster state correctly @@ -533,7 +533,7 @@ public void verifyReconcileWithBadConfig() throws Exception { .getStatus() .getError() .contains("Job parallelism must be larger than 0")); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState()); // Make sure we do validation before getting effective config in reconcile(). @@ -713,7 +713,7 @@ private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob sessionJo assertEquals(JobState.SUSPENDED, suspendedSessionJob.getSpec().getJob().getState()); assertNull(suspendedSessionJob.getStatus().getJobStatus().getState()); assertEquals(1, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), @@ -734,7 +734,7 @@ private void verifyNormalBasicReconcileLoop(FlinkSessionJob sessionJob) throws E // Reconciling assertEquals(RECONCILING, sessionJob.getStatus().getJobStatus().getState()); assertEquals(4, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), @@ -751,7 +751,7 @@ private void verifyNormalBasicReconcileLoop(FlinkSessionJob sessionJob) throws E updateControl = testController.reconcile(sessionJob, context); assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState()); assertEquals(5, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), @@ -761,7 +761,7 @@ private void verifyNormalBasicReconcileLoop(FlinkSessionJob sessionJob) throws E updateControl = testController.reconcile(sessionJob, context); assertEquals(RUNNING, sessionJob.getStatus().getJobStatus().getState()); assertEquals(5, testController.getInternalStatusUpdateCount()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchStatus()); assertEquals( Optional.of( configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java index 98c98a0e31..cfe6947343 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java @@ -233,7 +233,7 @@ private void assertLabels( private void assertUpdateControl( UpdateControl actual, boolean updateResource, boolean patchStatus) { - assertThat(actual.isUpdateResource()).isEqualTo(updateResource); + assertThat(actual.isPatchResource()).isEqualTo(updateResource); assertThat(actual.isPatchStatus()).isEqualTo(patchStatus); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 09885b965a..5eca939457 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -43,10 +43,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Cleaner; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -56,16 +54,14 @@ import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.function.BiConsumer; /** A wrapper around {@link FlinkDeploymentController} used by unit tests. */ public class TestingFlinkDeploymentController - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer, - Cleaner { + implements Reconciler, Cleaner { @Getter private ReconcilerFactory reconcilerFactory; private FlinkDeploymentController flinkDeploymentController; @@ -165,7 +161,7 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext eventSourceContext) { throw new UnsupportedOperationException(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java index 2b200de1de..7ba5cd48dc 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java @@ -42,13 +42,9 @@ import io.javaoperatorsdk.operator.api.reconciler.Cleaner; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; import lombok.Getter; import java.util.HashMap; @@ -59,8 +55,6 @@ /** A wrapper around {@link FlinkSessionJobController} used by unit tests. */ public class TestingFlinkSessionJobController implements io.javaoperatorsdk.operator.api.reconciler.Reconciler, - ErrorStatusHandler, - EventSourceInitializer, Cleaner { @Getter private CanaryResourceManager canaryResourceManager; @@ -154,12 +148,6 @@ public DeleteControl cleanup( return flinkSessionJobController.cleanup(cloned, context); } - @Override - public Map prepareEventSources( - EventSourceContext eventSourceContext) { - return null; - } - public Queue events() { return flinkResourceEventCollector.events; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java index 891d759260..6ce4f16e27 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java @@ -31,7 +31,6 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.javaoperatorsdk.operator.Operator; import io.javaoperatorsdk.operator.RuntimeInfo; -import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; import io.javaoperatorsdk.operator.health.Status; @@ -258,16 +257,6 @@ public String getTargetNamespace() { } })); - return new InformerWrappingEventSourceHealthIndicator() { - @Override - public Map informerHealthIndicators() { - return informers; - } - - @Override - public ResourceConfiguration getInformerConfiguration() { - return null; - } - }; + return () -> informers; } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java index 4f4697d1f7..0e2e45f1fe 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.metrics.Histogram; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; @@ -43,6 +44,15 @@ public class OperatorJosdkMetricsTest { private static final ResourceID resourceId = new ResourceID("testname", "testns"); + private static final HasMetadata resource = testResource(resourceId); + + private static HasMetadata testResource(ResourceID resourceId) { + var flinkDeployment = new FlinkDeployment(); + flinkDeployment.getMetadata().setName(resourceId.getName()); + flinkDeployment.getMetadata().setNamespace(resourceId.getNamespace().orElseThrow()); + return flinkDeployment; + } + private static final String controllerName = FlinkDeploymentController.class.getSimpleName(); private static final Map metadata = Map.of(Constants.RESOURCE_GVK_KEY, GroupVersionKind.gvkFor(FlinkDeployment.class)); @@ -110,20 +120,20 @@ public Object execute() throws Exception { @Test public void testMetrics() { - operatorMetrics.failedReconciliation(resourceId, null, metadata); + operatorMetrics.failedReconciliation(resource, null, metadata); assertEquals(1, listener.size()); assertEquals(1, getCount("Reconciliation.failed")); - operatorMetrics.failedReconciliation(resourceId, null, metadata); - operatorMetrics.failedReconciliation(resourceId, null, metadata); + operatorMetrics.failedReconciliation(resource, null, metadata); + operatorMetrics.failedReconciliation(resource, null, metadata); assertEquals(1, listener.size()); assertEquals(3, getCount("Reconciliation.failed")); - operatorMetrics.reconcileCustomResource(resourceId, null, metadata); + operatorMetrics.reconcileCustomResource(resource, null, metadata); assertEquals(2, listener.size()); assertEquals(1, getCount("Reconciliation")); operatorMetrics.reconcileCustomResource( - resourceId, + resource, new RetryInfo() { @Override public int getAttemptCount() { @@ -150,7 +160,7 @@ public boolean isLastAttempt() { assertEquals(6, listener.size()); assertEquals(1, getCount("Reconciliation.cleanup")); - operatorMetrics.finishedReconciliation(resourceId, metadata); + operatorMetrics.finishedReconciliation(resource, metadata); assertEquals(7, listener.size()); assertEquals(1, getCount("Reconciliation.finished")); @@ -160,7 +170,8 @@ public boolean isLastAttempt() { 2, listener.getGauge(listener.getMetricId("JOSDK", "mymap", "size")).get().getValue()); - operatorMetrics.reconcileCustomResource(new ResourceID("other", "otherns"), null, metadata); + operatorMetrics.reconcileCustomResource( + testResource(new ResourceID("other", "otherns")), null, metadata); assertEquals(9, listener.size()); assertEquals( 1, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java index 2957a75816..f5afefa164 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java @@ -52,15 +52,15 @@ public void testRescheduleUpgradeImmediately() { UpdateControl updateControl = ReconciliationUtils.toUpdateControl(operatorConfiguration, current, previous, true); - assertFalse(updateControl.isUpdateResource()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchResource()); + assertFalse(updateControl.isPatchResource()); assertEquals(0, updateControl.getScheduleDelay().get()); updateControl = ReconciliationUtils.toUpdateControl(operatorConfiguration, current, current, true); - assertFalse(updateControl.isUpdateResource()); - assertFalse(updateControl.isUpdateStatus()); + assertFalse(updateControl.isPatchResource()); + assertFalse(updateControl.isPatchStatus()); assertNotEquals(0, updateControl.getScheduleDelay().get()); } diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index f498ae2431..292ecf3d4c 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -709,6 +709,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -867,6 +869,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -1261,6 +1265,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -1419,6 +1425,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -1819,6 +1827,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -1977,6 +1987,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -2200,15 +2212,38 @@ spec: properties: name: type: string - source: + resourceClaimName: + type: string + resourceClaimTemplateName: + type: string + type: object + type: array + resources: + properties: + claims: + items: properties: - resourceClaimName: + name: type: string - resourceClaimTemplateName: + request: type: string type: object - type: object - type: array + type: array + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + type: object restartPolicy: type: string runtimeClassName: @@ -2241,6 +2276,8 @@ spec: type: boolean runAsUser: type: integer + seLinuxChangePolicy: + type: string seLinuxOptions: properties: level: @@ -2263,6 +2300,8 @@ spec: items: type: integer type: array + supplementalGroupsPolicy: + type: string sysctls: items: properties: @@ -2732,6 +2771,13 @@ spec: type: type: string type: object + image: + properties: + pullPolicy: + type: string + reference: + type: string + type: object iscsi: properties: chapAuthDiscovery: @@ -3039,6 +3085,8 @@ spec: type: string message: type: string + observedGeneration: + type: integer reason: type: string status: @@ -3057,6 +3105,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -3106,6 +3170,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -3159,6 +3225,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -3184,6 +3266,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -3233,6 +3331,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -3286,6 +3386,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -3320,6 +3436,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -3369,6 +3501,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -3422,6 +3556,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -3441,6 +3591,8 @@ spec: type: string nominatedNodeName: type: string + observedGeneration: + type: integer phase: type: string podIP: @@ -4086,6 +4238,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -4244,6 +4398,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -4638,6 +4794,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -4796,6 +4954,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -5196,6 +5356,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -5354,6 +5516,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -5577,15 +5741,38 @@ spec: properties: name: type: string - source: + resourceClaimName: + type: string + resourceClaimTemplateName: + type: string + type: object + type: array + resources: + properties: + claims: + items: properties: - resourceClaimName: + name: type: string - resourceClaimTemplateName: + request: type: string type: object - type: object - type: array + type: array + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + type: object restartPolicy: type: string runtimeClassName: @@ -5618,6 +5805,8 @@ spec: type: boolean runAsUser: type: integer + seLinuxChangePolicy: + type: string seLinuxOptions: properties: level: @@ -5640,6 +5829,8 @@ spec: items: type: integer type: array + supplementalGroupsPolicy: + type: string sysctls: items: properties: @@ -6109,6 +6300,13 @@ spec: type: type: string type: object + image: + properties: + pullPolicy: + type: string + reference: + type: string + type: object iscsi: properties: chapAuthDiscovery: @@ -6416,6 +6614,8 @@ spec: type: string message: type: string + observedGeneration: + type: integer reason: type: string status: @@ -6434,6 +6634,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -6483,6 +6699,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -6536,6 +6754,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -6561,6 +6795,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -6610,6 +6860,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -6663,6 +6915,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -6697,6 +6965,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -6746,6 +7030,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -6799,6 +7085,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -6818,6 +7120,8 @@ spec: type: string nominatedNodeName: type: string + observedGeneration: + type: integer phase: type: string podIP: @@ -7448,6 +7752,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -7606,6 +7912,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -8000,6 +8308,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -8158,6 +8468,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -8558,6 +8870,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + stopSignal: + type: string type: object livenessProbe: properties: @@ -8716,6 +9030,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -8939,15 +9255,38 @@ spec: properties: name: type: string - source: + resourceClaimName: + type: string + resourceClaimTemplateName: + type: string + type: object + type: array + resources: + properties: + claims: + items: properties: - resourceClaimName: + name: type: string - resourceClaimTemplateName: + request: type: string type: object - type: object - type: array + type: array + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true + type: object + type: object restartPolicy: type: string runtimeClassName: @@ -8980,6 +9319,8 @@ spec: type: boolean runAsUser: type: integer + seLinuxChangePolicy: + type: string seLinuxOptions: properties: level: @@ -9002,6 +9343,8 @@ spec: items: type: integer type: array + supplementalGroupsPolicy: + type: string sysctls: items: properties: @@ -9471,6 +9814,13 @@ spec: type: type: string type: object + image: + properties: + pullPolicy: + type: string + reference: + type: string + type: object iscsi: properties: chapAuthDiscovery: @@ -9778,6 +10128,8 @@ spec: type: string message: type: string + observedGeneration: + type: integer reason: type: string status: @@ -9796,6 +10148,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -9845,6 +10213,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -9898,6 +10268,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -9923,6 +10309,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -9972,6 +10374,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -10025,6 +10429,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -10059,6 +10479,22 @@ spec: - type: string x-kubernetes-int-or-string: true type: object + allocatedResourcesStatus: + items: + properties: + name: + type: string + resources: + items: + properties: + health: + type: string + resourceID: + type: string + type: object + type: array + type: object + type: array containerID: type: string image: @@ -10108,6 +10544,8 @@ spec: properties: name: type: string + request: + type: string type: object type: array limits: @@ -10161,6 +10599,22 @@ spec: type: string type: object type: object + stopSignal: + type: string + user: + properties: + linux: + properties: + gid: + type: integer + supplementalGroups: + items: + type: integer + type: array + uid: + type: integer + type: object + type: object volumeMounts: items: properties: @@ -10180,6 +10634,8 @@ spec: type: string nominatedNodeName: type: string + observedGeneration: + type: integer phase: type: string podIP: diff --git a/pom.xml b/pom.xml index 463c0b457d..b2a3ead0d0 100644 --- a/pom.xml +++ b/pom.xml @@ -66,8 +66,8 @@ under the License. - 11 - 11 + 17 + 17 3.3.0 3.0.0-M4 3.0.0-M5 @@ -75,10 +75,10 @@ under the License. 3.3.2 5.0.0 - 4.9.4 + 5.1.1 1.1.1 - 6.13.2 + 7.3.0 1.18.30 3.12.0 @@ -138,6 +138,10 @@ under the License. kubernetes-client ${fabric8.version} + + io.fabric8 + kubernetes-httpclient-vertx + com.squareup.okhttp3 *