diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 164c9bbd3d..d7929d9d2a 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -140,6 +140,12 @@
Duration |
The timeout for the observer to wait the flink rest client to return. |
+
+ kubernetes.operator.flink.client.io.threads |
+ 60 |
+ Integer |
+ The maximum number of io threads used by the flink rest client. |
+
kubernetes.operator.health.canary.resource.timeout |
1 min |
diff --git a/docs/layouts/shortcodes/generated/system_section.html b/docs/layouts/shortcodes/generated/system_section.html
index ec30301189..2c2da96a93 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -50,6 +50,12 @@
Duration |
The timeout for the observer to wait the flink rest client to return. |
+
+ kubernetes.operator.flink.client.io.threads |
+ 60 |
+ Integer |
+ The maximum number of io threads used by the flink rest client. |
+
kubernetes.operator.leader-election.enabled |
false |
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index ad044242af..9d71987983 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -53,6 +53,7 @@ public class FlinkOperatorConfiguration {
Duration progressCheckInterval;
Duration restApiReadyDelay;
Duration flinkClientTimeout;
+ int flinkClientIOThreads;
String flinkServiceHostOverride;
Set watchedNamespaces;
boolean dynamicNamespacesEnabled;
@@ -97,6 +98,10 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
Duration flinkClientTimeout =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_TIMEOUT);
+ int flinkClientIOThreads =
+ operatorConfig.getInteger(
+ KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_IO_THREADS);
+
Duration flinkCancelJobTimeout =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_CANCEL_TIMEOUT);
@@ -201,6 +206,7 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
progressCheckInterval,
restApiReadyDelay,
flinkClientTimeout,
+ flinkClientIOThreads,
flinkServiceHostOverride,
watchedNamespaces,
dynamicNamespacesEnabled,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index f4203fb7be..04d082da7d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -115,6 +115,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"The timeout for the observer to wait the flink rest client to return.");
+ @Documentation.Section(SECTION_SYSTEM)
+ public static final ConfigOption OPERATOR_FLINK_CLIENT_IO_THREADS =
+ operatorConfig("flink.client.io.threads")
+ .intType()
+ .defaultValue(60)
+ .withDescription(
+ "The maximum number of io threads used by the flink rest client.");
+
@Documentation.Section(SECTION_SYSTEM)
public static final ConfigOption OPERATOR_FLINK_CLIENT_CANCEL_TIMEOUT =
operatorConfig("flink.client.cancel.timeout")
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 733ebd3ae2..9870d8cef5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -24,6 +24,8 @@
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.utils.JobStatusUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
+import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
@@ -55,6 +57,7 @@
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
@@ -105,6 +108,7 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
@@ -125,6 +129,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
@@ -171,17 +176,20 @@ public abstract class AbstractFlinkService implements FlinkService {
protected final ExecutorService executorService;
protected final FlinkOperatorConfiguration operatorConfig;
protected final ArtifactManager artifactManager;
+ private final EventLoopGroup flinkClientSharedEventLoopGroup;
private static final String EMPTY_JAR = createEmptyJar();
public AbstractFlinkService(
KubernetesClient kubernetesClient,
ArtifactManager artifactManager,
ExecutorService executorService,
- FlinkOperatorConfiguration operatorConfig) {
+ FlinkOperatorConfiguration operatorConfig,
+ EventLoopGroup flinkClientEventLoopGroup) {
this.kubernetesClient = kubernetesClient;
this.artifactManager = artifactManager;
this.executorService = executorService;
this.operatorConfig = operatorConfig;
+ this.flinkClientSharedEventLoopGroup = flinkClientEventLoopGroup;
}
protected abstract PodList getJmPodList(String namespace, String clusterId);
@@ -838,11 +846,34 @@ public RestClusterClient getClusterClient(Configuration conf) throws Exc
operatorConfig.getFlinkServiceHostOverride(),
ExternalServiceDecorator.getNamespacedExternalServiceName(
clusterId, namespace));
+
final String restServerAddress = String.format("http://%s:%s", host, port);
- return new RestClusterClient<>(
- operatorRestConf,
- clusterId,
- (c, e) -> new StandaloneClientHAServices(restServerAddress));
+ RestClient restClient =
+ new RestClientProxy(
+ operatorRestConf,
+ executorService,
+ host,
+ port,
+ flinkClientSharedEventLoopGroup);
+
+ Constructor> clusterClientConstructor =
+ RestClusterClient.class.getDeclaredConstructor(
+ Configuration.class,
+ RestClient.class,
+ Object.class,
+ WaitStrategy.class,
+ ClientHighAvailabilityServicesFactory.class);
+
+ clusterClientConstructor.setAccessible(true);
+
+ return (RestClusterClient)
+ clusterClientConstructor.newInstance(
+ operatorRestConf,
+ restClient,
+ clusterId,
+ new ExponentialWaitStrategy(10L, 2000L),
+ (ClientHighAvailabilityServicesFactory)
+ (c, e) -> new StandaloneClientHAServices(restServerAddress));
}
@VisibleForTesting
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
index d9d3688b20..885f1a8a58 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
@@ -36,6 +36,8 @@
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -60,6 +62,7 @@ public class FlinkResourceContextFactory {
protected final Map, ResourceID>, KubernetesResourceMetricGroup>
resourceMetricGroups = new ConcurrentHashMap<>();
+ private final EventLoopGroup flinkClientEventLoopGroup;
public FlinkResourceContextFactory(
FlinkConfigManager configManager,
@@ -73,6 +76,10 @@ public FlinkResourceContextFactory(
Executors.newFixedThreadPool(
configManager.getOperatorConfiguration().getReconcilerMaxParallelism(),
new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
+ this.flinkClientEventLoopGroup =
+ new NioEventLoopGroup(
+ configManager.getOperatorConfiguration().getFlinkClientIOThreads(),
+ new ExecutorThreadFactory("flink-rest-client-netty-shared"));
}
public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
@@ -123,13 +130,15 @@ protected FlinkService getFlinkService(FlinkResourceContext> ctx) {
artifactManager,
clientExecutorService,
ctx.getOperatorConfig(),
- eventRecorder);
+ eventRecorder,
+ flinkClientEventLoopGroup);
case STANDALONE:
return new StandaloneFlinkService(
ctx.getKubernetesClient(),
artifactManager,
clientExecutorService,
- ctx.getOperatorConfig());
+ ctx.getOperatorConfig(),
+ flinkClientEventLoopGroup);
default:
throw new UnsupportedOperationException(
String.format("Unsupported deployment mode: %s", deploymentMode));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index a5a63f8b4f..64ed3f4d99 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -51,6 +51,8 @@
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -89,7 +91,28 @@ public NativeFlinkService(
ExecutorService executorService,
FlinkOperatorConfiguration operatorConfig,
EventRecorder eventRecorder) {
- super(kubernetesClient, artifactManager, executorService, operatorConfig);
+ this(
+ kubernetesClient,
+ artifactManager,
+ executorService,
+ operatorConfig,
+ eventRecorder,
+ null);
+ }
+
+ public NativeFlinkService(
+ KubernetesClient kubernetesClient,
+ ArtifactManager artifactManager,
+ ExecutorService executorService,
+ FlinkOperatorConfiguration operatorConfig,
+ EventRecorder eventRecorder,
+ EventLoopGroup flinkClientEventLoopGroup) {
+ super(
+ kubernetesClient,
+ artifactManager,
+ executorService,
+ operatorConfig,
+ flinkClientEventLoopGroup);
this.eventRecorder = eventRecorder;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/RestClientProxy.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/RestClientProxy.java
new file mode 100644
index 0000000000..562e48aee8
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/RestClientProxy.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/** Proxy for the {@link RestClient}. */
+public class RestClientProxy extends RestClient {
+ private static final Logger LOG = LoggerFactory.getLogger(RestClientProxy.class);
+
+ private final boolean useSharedEventLoopGroup;
+ private Field groupField;
+ private Bootstrap bootstrap;
+ private CompletableFuture terminationFuture;
+
+ public RestClientProxy(
+ Configuration configuration,
+ ExecutorService executor,
+ String host,
+ int port,
+ EventLoopGroup sharedGroup)
+ throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
+ super(configuration, executor, host, port);
+
+ if (sharedGroup != null) {
+ Preconditions.checkArgument(
+ !sharedGroup.isShuttingDown() && !sharedGroup.isShutdown(),
+ "provided eventLoopGroup is shut/shutting down");
+
+ // get private field
+ Field bootstrapField = RestClient.class.getDeclaredField("bootstrap");
+ Field terminationFutureField = RestClient.class.getDeclaredField("terminationFuture");
+ Field groupField = AbstractBootstrap.class.getDeclaredField("group");
+
+ bootstrapField.setAccessible(true);
+ terminationFutureField.setAccessible(true);
+ groupField.setAccessible(true);
+
+ this.terminationFuture = (CompletableFuture) terminationFutureField.get(this);
+ this.bootstrap = (Bootstrap) bootstrapField.get(this);
+ this.groupField = groupField;
+
+ // close previous group
+ bootstrap.config().group().shutdown();
+ // setup share group
+ groupField.set(bootstrap, sharedGroup);
+
+ useSharedEventLoopGroup = true;
+ } else {
+ useSharedEventLoopGroup = false;
+ }
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ if (useSharedEventLoopGroup) {
+ this.shutdownInternal();
+ }
+
+ return super.closeAsync();
+ }
+
+ @Override
+ public void shutdown(Time timeout) {
+ if (useSharedEventLoopGroup) {
+ this.shutdownInternal();
+ }
+ super.shutdown(timeout);
+ }
+
+ private void shutdownInternal() {
+ try {
+ // replace bootstrap's group to null to avoid shutdown shared group
+ groupField.set(bootstrap, null);
+ terminationFuture.complete(null);
+ } catch (IllegalAccessException e) {
+ LOG.error("Failed to setup rest client event group .", e);
+ }
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 7c483f7f42..bbfd901aab 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -39,6 +39,8 @@
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -61,7 +63,21 @@ public StandaloneFlinkService(
ArtifactManager artifactManager,
ExecutorService executorService,
FlinkOperatorConfiguration operatorConfig) {
- super(kubernetesClient, artifactManager, executorService, operatorConfig);
+ this(kubernetesClient, artifactManager, executorService, operatorConfig, null);
+ }
+
+ public StandaloneFlinkService(
+ KubernetesClient kubernetesClient,
+ ArtifactManager artifactManager,
+ ExecutorService executorService,
+ FlinkOperatorConfiguration operatorConfig,
+ EventLoopGroup flinkClientEventLoopGroup) {
+ super(
+ kubernetesClient,
+ artifactManager,
+ executorService,
+ operatorConfig,
+ flinkClientEventLoopGroup);
}
@Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index fa893e7c49..2d0d0ffd68 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -70,8 +70,10 @@
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
@@ -169,7 +171,9 @@ public TestingFlinkService(KubernetesClient kubernetesClient) {
kubernetesClient,
null,
Executors.newDirectExecutorService(),
- FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration()),
+ new NioEventLoopGroup(
+ 4, new ExecutorThreadFactory("flink-rest-client-netty-shared")));
}
public Context getContext() {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index a57d60a8a3..75fadaeafe 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -92,10 +92,12 @@
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.TriFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
@@ -1334,7 +1336,10 @@ class TestingService extends AbstractFlinkService {
client,
AbstractFlinkServiceTest.this.artifactManager,
AbstractFlinkServiceTest.this.executorService,
- AbstractFlinkServiceTest.this.operatorConfig);
+ AbstractFlinkServiceTest.this.operatorConfig,
+ new NioEventLoopGroup(
+ AbstractFlinkServiceTest.this.operatorConfig.getFlinkClientIOThreads(),
+ new ExecutorThreadFactory("flink-rest-client-netty-shared")));
this.clusterClient = clusterClient;
this.restClient = restClient;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestClientProxyTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestClientProxyTest.java
new file mode 100644
index 0000000000..ada290ba48
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/RestClientProxyTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * @link RestClientProxy unit tests
+ */
+public class RestClientProxyTest {
+
+ private final Configuration configuration = new Configuration();
+
+ private final FlinkConfigManager configManager = new FlinkConfigManager(configuration);
+ private ExecutorService executorService;
+
+ private EventLoopGroup flinkClientEventLoopGroup;
+
+ @BeforeEach
+ public void setup() {
+ configuration.set(KubernetesConfigOptions.CLUSTER_ID, TestUtils.TEST_DEPLOYMENT_NAME);
+ configuration.set(KubernetesConfigOptions.NAMESPACE, TestUtils.TEST_NAMESPACE);
+ configuration.set(FLINK_VERSION, FlinkVersion.v1_18);
+
+ executorService = Executors.newDirectExecutorService();
+
+ flinkClientEventLoopGroup =
+ new NioEventLoopGroup(
+ configManager.getOperatorConfiguration().getFlinkClientIOThreads(),
+ new ExecutorThreadFactory("flink-rest-client-netty-shared"));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ RestClientProxy restClientProxy =
+ new RestClientProxy(
+ configuration, executorService, "localhost", -1, flinkClientEventLoopGroup);
+
+ restClientProxy.close();
+ assertFalse(flinkClientEventLoopGroup.isShutdown());
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+ RestClientProxy restClientProxy =
+ new RestClientProxy(
+ configuration, executorService, "localhost", -1, flinkClientEventLoopGroup);
+
+ restClientProxy.shutdown(Time.seconds(5));
+ assertFalse(flinkClientEventLoopGroup.isShutdown());
+ }
+}