Skip to content

Upgrade to JOSDK 5.1 #982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
name: maven build
strategy:
matrix:
java-version: [ 11, 17, 21 ]
java-version: [ 17, 21 ]
steps:
- uses: actions/checkout@v4
- name: Set up JDK ${{ matrix.java-version }}
Expand Down Expand Up @@ -76,7 +76,7 @@ jobs:
strategy:
matrix:
http-client: [ "okhttp", "jdk", "jetty", "vertx" ]
java-version: [ "11", "17", "21" ]
java-version: [ "17", "21" ]
uses: ./.github/workflows/e2e.yaml
with:
java-version: ${{ matrix.java-version }}
Expand All @@ -89,7 +89,7 @@ jobs:
strategy:
matrix:
http-client: [ "okhttp" ]
java-version: [ "11", "17"]
java-version: [ "17"]
flink-version:
- "v2_0"
- "v1_20"
Expand Down Expand Up @@ -142,7 +142,7 @@ jobs:

uses: ./.github/workflows/e2e.yaml
with:
java-version: 11
java-version: 17
flink-version: ${{ matrix.flink-version }}
test: ${{ matrix.test }}
namespace: "flink"
Expand Down Expand Up @@ -211,7 +211,7 @@ jobs:
test: test_snapshot.sh
uses: ./.github/workflows/e2e.yaml
with:
java-version: 11
java-version: 17
flink-version: ${{ matrix.flink-version }}
test: ${{ matrix.test }}
mode: ${{ matrix.mode }}
2 changes: 1 addition & 1 deletion .github/workflows/publish_snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '11'
java-version: '17'
distribution: 'temurin'
- name: Cache local Maven repository
uses: actions/cache@v4
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/development/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ In order to build the operator you need to [clone the git repository]({{< github
git clone {{< github_repo >}}
```

To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 11**.
To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 17**.

To build the project, you can use the following command:

Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/development/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ In order to build the operator you need to [clone the git repository]({{< github
git clone {{< github_repo >}}
```

To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 11**.
To build from the command line, it is necessary to have **Maven 3** and a **Java Development Kit** (JDK) installed. Please note that Flink Kubernetes Operator requires **Java 17**.

To build the project, you can use the following command:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ protected static void checkObjectCompatibility(
// This claims field was removed in Kubernetes 1.28 as it was mistakenly
// added in the first place. For more context please refer to
// https://github.com/kubernetes/api/commit/8b14183
&& !fieldPath.contains(".volumeClaimTemplate.spec.resources.claims")) {
&& !fieldPath.contains(".volumeClaimTemplate.spec.resources.claims")
&& !fieldPath.contains(
".spec.taskManager.podTemplate.spec.resourceClaims.items.source")
&& !fieldPath.contains(
".spec.jobManager.podTemplate.spec.resourceClaims.items.source")
&& !fieldPath.contains(
".spec.podTemplate.spec.resourceClaims.items.source")) {
err(fieldPath + " has been removed");
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -150,14 +151,15 @@ private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, configManager));
}

overrider.withTerminationTimeoutSeconds(
(int)
overrider.withReconciliationTerminationTimeout(
Duration.ofSeconds(
conf.get(KubernetesOperatorConfigOptions.OPERATOR_TERMINATION_TIMEOUT)
.toSeconds());
.toSeconds()));

overrider.withStopOnInformerErrorDuringStartup(
conf.get(KubernetesOperatorConfigOptions.OPERATOR_STOP_ON_INFORMER_ERROR));

overrider.withUseSSAToPatchPrimaryResource(false);
var leaderElectionConf = operatorConf.getLeaderElectionConfiguration();
if (leaderElectionConf != null) {
overrider.withLeaderElectionConfiguration(leaderElectionConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,13 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Whether informer errors should stop operator startup. If false, the startup will ignore recoverable errors, caused for example by RBAC issues and will retry periodically.");

public static final int DEFAULT_TERMINATION_TIMEOUT_SECONDS = 10;

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_TERMINATION_TIMEOUT =
operatorConfig("termination.timeout")
.durationType()
.defaultValue(
Duration.ofSeconds(
ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS))
.defaultValue(Duration.ofSeconds(DEFAULT_TERMINATION_TIMEOUT_SECONDS))
.withDescription(
"Operator shutdown timeout before reconciliation threads are killed.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
Expand All @@ -55,17 +53,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/** Controller that runs the main reconcile loop for Flink deployments. */
@ControllerConfiguration
public class FlinkDeploymentController
implements Reconciler<FlinkDeployment>,
ErrorStatusHandler<FlinkDeployment>,
EventSourceInitializer<FlinkDeployment>,
Cleaner<FlinkDeployment> {
implements Reconciler<FlinkDeployment>, Cleaner<FlinkDeployment> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);

private final Set<FlinkResourceValidator> validators;
Expand Down Expand Up @@ -185,9 +179,9 @@ private void triggerErrorEvent(
}

@Override
public Map<String, EventSource> prepareEventSources(
public List<EventSource<?, FlinkDeployment>> prepareEventSources(
EventSourceContext<FlinkDeployment> context) {
List<EventSource> eventSources = new ArrayList<>();
List<EventSource<?, FlinkDeployment>> eventSources = new ArrayList<>();
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));

Expand All @@ -199,7 +193,7 @@ public Map<String, EventSource> prepareEventSources(
"Could not initialize informer for snapshots as the CRD has not been installed!");
}

return EventSourceInitializer.nameEventSources(eventSources.toArray(EventSource[]::new));
return eventSources;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,22 @@
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. */
@ControllerConfiguration()
public class FlinkSessionJobController
implements io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
ErrorStatusHandler<FlinkSessionJob>,
EventSourceInitializer<FlinkSessionJob>,
Cleaner<FlinkSessionJob> {

private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
Expand Down Expand Up @@ -179,9 +174,9 @@ public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
}

@Override
public Map<String, EventSource> prepareEventSources(
public List<EventSource<?, FlinkSessionJob>> prepareEventSources(
EventSourceContext<FlinkSessionJob> context) {
List<EventSource> eventSources = new ArrayList<>();
List<EventSource<?, FlinkSessionJob>> eventSources = new ArrayList<>();
eventSources.add(EventSourceUtils.getFlinkDeploymentInformerEventSource(context));

if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
Expand All @@ -192,7 +187,7 @@ public Map<String, EventSource> prepareEventSources(
"Could not initialize informer for snapshots as the CRD has not been installed!");
}

return EventSourceInitializer.nameEventSources(eventSources.toArray(EventSource[]::new));
return eventSources;
}

private boolean validateSessionJob(FlinkResourceContext<FlinkSessionJob> ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
Expand All @@ -47,18 +45,15 @@

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/** Controller that runs the main reconcile loop for {@link FlinkStateSnapshot}. */
@RequiredArgsConstructor
@ControllerConfiguration
public class FlinkStateSnapshotController
implements Reconciler<FlinkStateSnapshot>,
ErrorStatusHandler<FlinkStateSnapshot>,
EventSourceInitializer<FlinkStateSnapshot>,
Cleaner<FlinkStateSnapshot> {
implements Reconciler<FlinkStateSnapshot>, Cleaner<FlinkStateSnapshot> {

private static final Logger LOG = LoggerFactory.getLogger(FlinkStateSnapshotController.class);

Expand Down Expand Up @@ -154,10 +149,9 @@ public ErrorStatusUpdateControl<FlinkStateSnapshot> updateErrorStatus(
}

@Override
public Map<String, EventSource> prepareEventSources(
public List<EventSource<?, FlinkStateSnapshot>> prepareEventSources(
EventSourceContext<FlinkStateSnapshot> context) {
return EventSourceInitializer.nameEventSources(
EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context));
return List.of(EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context));
}

/**
Expand All @@ -176,9 +170,9 @@ private UpdateControl<FlinkStateSnapshot> getUpdateControl(FlinkStateSnapshotCon
var statusChanged = resourceStatusChanged(ctx);

if (labelsChanged && statusChanged) {
updateControl = UpdateControl.updateResourceAndPatchStatus(resource);
updateControl = UpdateControl.patchResourceAndStatus(resource);
} else if (labelsChanged) {
updateControl = UpdateControl.updateResource(resource);
updateControl = UpdateControl.patchResource(resource);
} else if (statusChanged) {
updateControl = UpdateControl.patchStatus(resource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
Expand Down Expand Up @@ -108,7 +109,8 @@ public void cleanupDoneFor(ResourceID resourceID, Map<String, Object> metadata)

@Override
public void reconcileCustomResource(
ResourceID resourceID, RetryInfo retryInfoNullable, Map<String, Object> metadata) {
HasMetadata resource, RetryInfo retryInfoNullable, Map<String, Object> metadata) {
var resourceID = ResourceID.fromResource(resource);
counter(getResourceMg(resourceID, metadata), RECONCILIATION).inc();

if (retryInfoNullable != null) {
Expand All @@ -117,14 +119,22 @@ public void reconcileCustomResource(
}

@Override
public void finishedReconciliation(ResourceID resourceID, Map<String, Object> metadata) {
counter(getResourceMg(resourceID, metadata), RECONCILIATION, "finished").inc();
public void finishedReconciliation(HasMetadata resource, Map<String, Object> metadata) {
counter(
getResourceMg(ResourceID.fromResource(resource), metadata),
RECONCILIATION,
"finished")
.inc();
}

@Override
public void failedReconciliation(
ResourceID resourceID, Exception exception, Map<String, Object> metadata) {
counter(getResourceMg(resourceID, metadata), RECONCILIATION, "failed").inc();
HasMetadata resource, Exception exception, Map<String, Object> metadata) {
counter(
getResourceMg(ResourceID.fromResource(resource), metadata),
RECONCILIATION,
"failed")
.inc();
}

@Override
Expand Down
Loading