diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala index 7f321ee92c..22dbf38ebe 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala @@ -201,4 +201,5 @@ object ConfigKeys { val KEY_FLINK_TM_PROCESS_MEMORY = "taskmanager.memory.process.size" + val STREAMPARK_INGRESS_MODE = "streampark.ingress.mode" } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index b50747ce0a..9ad6788fd6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -180,6 +180,7 @@ public Map getFlinkConfig() throws JsonProcessingException { @JsonIgnore public Map getProperties() { Map propertyMap = new HashMap<>(); + propertyMap.put(ConfigKeys.KEY_KERBEROS_SERVICE_ACCOUNT(), this.getServiceAccount()); Map dynamicPropertyMap = FlinkConfigurationUtils.extractDynamicPropertiesAsJava(this.getDynamicProperties()); propertyMap.putAll(this.getOptionMap()); @@ -188,6 +189,7 @@ public Map getProperties() { if (resolveOrder != null) { propertyMap.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), resolveOrder.getName()); } + return propertyMap; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java index 7b13727a93..124d9ac382 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java @@ -166,10 +166,8 @@ public String getVersionOfLast() { @JsonIgnore public Properties getFlinkConfig() { - String flinkYamlString = DeflaterUtils.unzipString(flinkConf); Properties flinkConfig = new Properties(); - Map config = FlinkConfigurationUtils.loadLegacyFlinkConf(flinkYamlString); - flinkConfig.putAll(config); + flinkConfig.putAll(convertFlinkYamlAsMap()); return flinkConfig; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index d79911d4cf..81e1a3e5df 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -79,7 +79,6 @@ import org.apache.streampark.flink.client.bean.SubmitResponse; import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper; -import org.apache.streampark.flink.kubernetes.ingress.IngressController; import org.apache.streampark.flink.kubernetes.model.TrackId; import org.apache.streampark.flink.packer.pipeline.BuildResult; import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse; @@ -90,7 +89,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -526,7 +524,8 @@ private void processForSuccess( flinkApplication.setJobId(response.jobId()); } - if (FlinkDeployMode.isYarnMode(flinkApplication.getDeployMode())) { + if (FlinkDeployMode.isYarnMode(flinkApplication.getDeployMode()) + || FlinkDeployMode.isKubernetesMode(flinkApplication.getDeployMode())) { flinkApplication.setClusterId(response.clusterId()); applicationLog.setClusterId(response.clusterId()); } @@ -555,23 +554,6 @@ private void processForSuccess( private void processForK8sApp(FlinkApplication application, ApplicationLog applicationLog) { application.setRelease(ReleaseStateEnum.DONE.get()); k8SFlinkTrackMonitor.doWatching(k8sWatcherWrapper.toTrackId(application)); - if (!FlinkDeployMode.isKubernetesApplicationMode(application.getDeployMode())) { - return; - } - String domainName = settingService.getIngressModeDefault(); - if (StringUtils.isNotBlank(domainName)) { - try { - IngressController.configureIngress( - domainName, application.getClusterId(), application.getK8sNamespace()); - } catch (KubernetesClientException e) { - log.info("Failed to create ingress, stack info:{}", e.getMessage()); - applicationLog.setException(e.getMessage()); - applicationLog.setSuccess(false); - applicationLogService.save(applicationLog); - application.setState(FlinkAppStateEnum.FAILED.getValue()); - application.setOptionState(OptionStateEnum.NONE.getValue()); - } - } } private void processForException( @@ -776,6 +758,10 @@ private Map getProperties( } } else if (FlinkDeployMode.isKubernetesMode(application.getDeployModeEnum())) { properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always"); + properties.putAll(application.getHotParamsMap()); + if (StringUtils.isNotBlank(settingService.getIngressModeDefault())) { + properties.putIfAbsent(ConfigKeys.STREAMPARK_INGRESS_MODE(), settingService.getIngressModeDefault()); + } } if (FlinkDeployMode.isKubernetesApplicationMode(application.getDeployMode())) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 4942aa8b61..33b74837a7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkDeployMode; import org.apache.streampark.common.util.YarnUtils; @@ -29,6 +30,7 @@ import org.apache.streampark.console.core.mapper.FlinkClusterMapper; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; +import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.YarnQueueService; import org.apache.streampark.console.core.service.application.FlinkApplicationInfoService; import org.apache.streampark.console.core.watcher.FlinkClusterWatcher; @@ -60,6 +62,7 @@ import java.util.Collection; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -90,6 +93,9 @@ public class FlinkClusterServiceImpl extends ServiceImpl properties = flinkCluster.getProperties(); + if (FlinkDeployMode.isKubernetesMode(flinkCluster.getDeployMode())) { + String ingressMode = settingService.getIngressModeDefault(); + if (StringUtils.isNotBlank(ingressMode)) { + properties.putIfAbsent(ConfigKeys.STREAMPARK_INGRESS_MODE(), ingressMode); + } + } DeployRequest deployRequest = new DeployRequest( flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(), flinkCluster.getFlinkDeployModeEnum(), - flinkCluster.getProperties(), + properties, flinkCluster.getClusterId(), flinkCluster.getId(), getKubernetesDeployDesc(flinkCluster, "start")); log.info("Deploy cluster request {}", deployRequest); Future future = executorService.submit(() -> FlinkClient.deploy(deployRequest)); - return future.get(60, TimeUnit.SECONDS); + return future.get(5, TimeUnit.MINUTES); } private void checkActiveIfNeeded(FlinkCluster flinkCluster) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java index 8abab9c0f3..ba527a4432 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java @@ -20,7 +20,6 @@ import org.apache.streampark.common.conf.CommonConfig; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.enums.ClusterState; -import org.apache.streampark.common.enums.FlinkDeployMode; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.HttpClientUtils; import org.apache.streampark.common.util.YarnUtils; @@ -57,7 +56,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */ +/** + * This implementation is currently used for tracing Cluster on yarn,remote,K8s mode + */ @Slf4j @Component public class FlinkClusterWatcher { @@ -82,7 +83,9 @@ public class FlinkClusterWatcher { // Track interval every 30 seconds private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30); - /** Watcher cluster lists */ + /** + * Watcher cluster lists + */ private static final Map WATCHER_CLUSTERS = new ConcurrentHashMap<>(8); private static final Cache FAILED_STATES = @@ -90,16 +93,17 @@ public class FlinkClusterWatcher { private boolean immediateWatch = false; - /** Initialize cluster cache */ + /** + * Initialize cluster cache + */ @PostConstruct private void init() { WATCHER_CLUSTERS.clear(); List flinkClusters = flinkClusterService.list( new LambdaQueryWrapper() - .eq(FlinkCluster::getClusterState, ClusterState.RUNNING.getState()) - // excluding flink clusters on kubernetes - .notIn(FlinkCluster::getDeployMode, FlinkDeployMode.getKubernetesMode())); + .in(FlinkCluster::getClusterState, + ClusterState.RUNNING.getState(), ClusterState.STARTING.getState())); flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster)); } @@ -123,6 +127,9 @@ private void start() { alert(flinkCluster, state); break; default: + if (!flinkCluster.getClusterState().equals(state)) { + flinkClusterService.updateClusterState(flinkCluster.getId(), state); + } break; } })); @@ -196,6 +203,7 @@ private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) { private ClusterState httpClusterState(FlinkCluster flinkCluster) { switch (flinkCluster.getFlinkDeployModeEnum()) { case REMOTE: + case KUBERNETES_NATIVE_SESSION: return httpRemoteClusterState(flinkCluster); case YARN_SESSION: return httpYarnSessionClusterState(flinkCluster); @@ -225,6 +233,11 @@ private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) { JacksonUtils.read(res, Overview.class); return ClusterState.RUNNING; } catch (Exception ignored) { + // When the cluster is just starting up and the Http API is not ready, wait for 5 minutes. + if (flinkCluster.getClusterState().equals(ClusterState.STARTING.getState()) + && (System.currentTimeMillis() - flinkCluster.getStartTime().getTime()) <= 5 * 60 * 1000) { + return ClusterState.STARTING; + } log.error("cluster id:{} get state from flink api failed", flinkCluster.getId()); } return ClusterState.LOST; @@ -263,14 +276,15 @@ private ClusterState getStateFromYarnRestApi(FlinkCluster flinkCluster) { * @param flinkCluster */ public static void addWatching(FlinkCluster flinkCluster) { - if (!FlinkDeployMode.isKubernetesMode(flinkCluster.getFlinkDeployModeEnum()) - && !WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) { + if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) { log.info("add the cluster with id:{} to watcher cluster cache", flinkCluster.getId()); WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster); } } - /** @param flinkCluster */ + /** + * @param flinkCluster + */ public static void unWatching(FlinkCluster flinkCluster) { if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) { log.info("remove the cluster with id:{} from watcher cluster cache", flinkCluster.getId()); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/cluster/View.vue b/streampark-console/streampark-console-webapp/src/views/flink/cluster/View.vue index b46a5132fe..d52e6f767c 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/cluster/View.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/cluster/View.vue @@ -196,7 +196,8 @@ target="_blank" v-if=" record.deployMode === DeployMode.STANDALONE || - record.deployMode === DeployMode.YARN_SESSION + record.deployMode === DeployMode.YARN_SESSION || + record.deployMode === DeployMode.KUBERNETES_SESSION " > {{ record.address }} diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala index 1791e5d820..a9070b5318 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala @@ -17,9 +17,11 @@ package org.apache.streampark.flink.client.impl +import org.apache.streampark.common.conf.ConfigKeys import org.apache.streampark.common.enums.FlinkDeployMode import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait import org.apache.streampark.flink.client.bean._ +import org.apache.streampark.flink.kubernetes.ingress.IngressController import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse import com.google.common.collect.Lists @@ -41,8 +43,10 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { // require parameters require( StringUtils.isNotBlank(submitRequest.clusterId), - s"[flink-submit] submit flink job failed, clusterId is null, mode=${flinkConfig - .get(DeploymentOptions.TARGET)}") + s"[flink-submit] submit flink job failed, clusterId is null, mode=${ + flinkConfig + .get(DeploymentOptions.TARGET) + }") // check the last building result submitRequest.checkBuildResult() @@ -69,11 +73,18 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { .getClusterClient val clusterId = clusterClient.getClusterId + + var webInterfaceURL = clusterClient.getWebInterfaceURL + val ingressDomain = flinkConfig.getString(ConfigKeys.STREAMPARK_INGRESS_MODE, "") + if (StringUtils.isNotBlank(ingressDomain)) { + webInterfaceURL = IngressController.configureIngress(ingressDomain, clusterId, submitRequest.kubernetesNamespace, flinkConfig) + } + val result = SubmitResponse( clusterId, flinkConfig.toMap, submitRequest.jobId, - clusterClient.getWebInterfaceURL) + webInterfaceURL) logInfo(s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}") closeSubmit(submitRequest, clusterDescriptor, clusterClient) @@ -90,6 +101,7 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { (jobId, client) => { val resp = super.cancelJob(cancelRequest, jobId, client) client.shutDownCluster() + IngressController.deleteIngress(cancelRequest.clusterId, cancelRequest.kubernetesNamespace, flinkConf) CancelResponse(resp) }) } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index 3c37075359..e60a8991fb 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -17,6 +17,7 @@ package org.apache.streampark.flink.client.impl +import org.apache.streampark.common.conf.ConfigKeys import org.apache.streampark.common.enums.FlinkDeployMode import org.apache.streampark.common.util.{Logger, Utils} import org.apache.streampark.common.util.Implicits._ @@ -26,6 +27,7 @@ import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper import org.apache.streampark.flink.core.FlinkKubernetesClient import org.apache.streampark.flink.kubernetes.KubernetesRetriever import org.apache.streampark.flink.kubernetes.enums.FlinkK8sDeployMode +import org.apache.streampark.flink.kubernetes.ingress.IngressController import org.apache.streampark.flink.kubernetes.model.ClusterKey import org.apache.commons.lang3.StringUtils @@ -150,7 +152,17 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo .deploySessionCluster(kubernetesClusterDescriptor._2) .getClusterClient } - DeployResponse(address = client.getWebInterfaceURL, clusterId = client.getClusterId) + var webInterfaceURL = client.getWebInterfaceURL + val ingressDomain = flinkConfig.getString(ConfigKeys.STREAMPARK_INGRESS_MODE, "") + if (StringUtils.isNotBlank(ingressDomain)) { + webInterfaceURL = IngressController.configureIngress( + ingressDomain, + deployRequest.clusterId, + deployRequest.k8sParam.kubernetesNamespace, + flinkConfig) + logInfo(s"create ingress with url: $webInterfaceURL with cluster: ${client.getClusterId}") + } + DeployResponse(address = webInterfaceURL, clusterId = client.getClusterId) } catch { case e: Exception => DeployResponse(error = e) } finally { @@ -164,13 +176,13 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo val flinkConfig = getFlinkK8sConfig(shutDownRequest) kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client") val kubeClientWrapper = new FlinkKubernetesClient(kubeClient) - val stopAndCleanupState = shutDownRequest.clusterId != null && kubeClientWrapper .getService(shutDownRequest.clusterId) .isPresent if (stopAndCleanupState) { kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId) + IngressController.deleteIngress(shutDownRequest.clusterId, shutDownRequest.k8sParam.kubernetesNamespace, flinkConfig) ShutDownResponse(shutDownRequest.clusterId) } else { null diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala index 28d106ec22..16c2ba75f0 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala @@ -20,6 +20,7 @@ package org.apache.streampark.flink.client.`trait` import org.apache.streampark.common.enums.{FlinkDeployMode, FlinkK8sRestExposedType} import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.kubernetes.PodTemplateTool +import org.apache.streampark.flink.kubernetes.ingress.IngressClusterDescriptor import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse import org.apache.commons.lang3.StringUtils @@ -155,15 +156,13 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait { def getK8sClusterDescriptorAndSpecification( flinkConfig: Configuration): (KubernetesClusterDescriptor, ClusterSpecification) = { val clientFactory = new KubernetesClusterClientFactory() - val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig) - val clusterSpecification = - clientFactory.getClusterSpecification(flinkConfig) + val clusterDescriptor = IngressClusterDescriptor.createClusterDescriptor(flinkConfig) + val clusterSpecification = clientFactory.getClusterSpecification(flinkConfig) (clusterDescriptor, clusterSpecification) } def getK8sClusterDescriptor(flinkConfig: Configuration): KubernetesClusterDescriptor = { - val clientFactory = new KubernetesClusterClientFactory() - clientFactory.createClusterDescriptor(flinkConfig) + IngressClusterDescriptor.createClusterDescriptor(flinkConfig) } protected def flinkConfIdentifierInfo(@Nonnull conf: Configuration): String = diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index d78517581e..4127e7dfcf 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -160,7 +160,7 @@ object KubernetesRetriever extends Logger { .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, clusterKey.executeMode) .getOrElse(return None) val url = - IngressController.getIngressUrlAddress(clusterKey.namespace, clusterKey.clusterId, client) + IngressController.getIngressUrlAddress(clusterKey.namespace, clusterKey.clusterId, client.getFlinkConfiguration).getOrElse(client.getWebInterfaceURL) logger.info(s"retrieve flink jobManager rest url: $url") Some(url) } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressClusterDescriptor.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressClusterDescriptor.scala new file mode 100644 index 0000000000..56e7fb252a --- /dev/null +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressClusterDescriptor.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.kubernetes.ingress + +import org.apache.streampark.common.util.Implicits.AutoCloseImplicits + +import org.apache.flink.client.deployment.ClusterRetrieveException +import org.apache.flink.client.program.ClusterClientProvider +import org.apache.flink.client.program.rest.RestClusterClient +import org.apache.flink.configuration.{Configuration, JobManagerOptions, RestOptions} +import org.apache.flink.kubernetes.KubernetesClusterDescriptor +import org.apache.flink.kubernetes.artifact.{DefaultKubernetesArtifactUploader, KubernetesArtifactUploader} +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices +import org.apache.flink.runtime.rpc.{AddressResolution, FatalErrorHandler} +import org.apache.flink.util.Preconditions.checkNotNull +import org.slf4j.LoggerFactory + +import java.net.URL + +class IngressClusterDescriptor(flinkConfig: Configuration, clientFactory: FlinkKubeClientFactory, artifactUploader: KubernetesArtifactUploader) + extends KubernetesClusterDescriptor(flinkConfig, clientFactory, artifactUploader) { + + val LOG = LoggerFactory.getLogger(classOf[KubernetesClusterDescriptor]) + + val clusterId = checkNotNull(flinkConfig.get(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!") + val namespace = flinkConfig.get(KubernetesConfigOptions.NAMESPACE) + + private def createClusterClientProvider(clusterId: String): ClusterClientProvider[String] = { + () => + val configuration = new Configuration(flinkConfig) + val client = FlinkKubeClientFactory.getInstance.fromConfiguration(configuration, "client") + client.using(client => { + val restEndpoint = client.getRestEndpoint(clusterId) + if (restEndpoint.isPresent) { + configuration.set(RestOptions.ADDRESS, restEndpoint.get.getAddress) + configuration.set[Integer](RestOptions.PORT, restEndpoint.get.getPort) + } else { + throw new RuntimeException(new ClusterRetrieveException(s"Could not get the rest endpoint of $clusterId")) + } + }) + var webMonitorAddress = getWebMonitorAddress(configuration) + val ingressURL = IngressController.getIngressUrlAddress(namespace, clusterId, flinkConfig) + if (ingressURL.isDefined) { + val webInterfaceURL = new URL(ingressURL.get) + webMonitorAddress = webInterfaceURL.getProtocol + "://" + webInterfaceURL.getHost + ":80" + webInterfaceURL.getPath + configuration.set(JobManagerOptions.ADDRESS, webInterfaceURL.getHost) + configuration.set[Integer](JobManagerOptions.PORT, 80) + configuration.set(RestOptions.PATH, webInterfaceURL.getPath) + configuration.set(RestOptions.PATH, webInterfaceURL.getPath) + } + new RestClusterClient[String]( + configuration, + clusterId, + (effectiveConfiguration: Configuration, fatalErrorHandler: FatalErrorHandler) => + new StandaloneClientHAServices(webMonitorAddress)) + } + + @throws[Exception] + private def getWebMonitorAddress(configuration: Configuration) = { + var resolution = AddressResolution.TRY_ADDRESS_RESOLUTION + val serviceType = configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE) + if (serviceType.isClusterIP) { + resolution = AddressResolution.NO_ADDRESS_RESOLUTION + LOG.warn( + s"Please note that Flink client operations(e.g. cancel, list, stop," + + " savepoint, etc.) won't work from outside the Kubernetes cluster" + + s" since '${KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key}' has been set to $serviceType.") + } + HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution) + } + + override def retrieve(clusterId: String): ClusterClientProvider[String] = { + val clusterClientProvider = createClusterClientProvider(clusterId) + clusterClientProvider.getClusterClient.using(clusterClient => + LOG.info(s"Retrieve flink cluster $clusterId successfully, JobManager Web Interface: ${clusterClient.getWebInterfaceURL}")) + clusterClientProvider + } +} + +object IngressClusterDescriptor { + def createClusterDescriptor(configuration: Configuration): IngressClusterDescriptor = { + checkNotNull(configuration.get(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!") + new IngressClusterDescriptor( + configuration, + FlinkKubeClientFactory.getInstance(), + new DefaultKubernetesArtifactUploader()) + } +} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala index a39880ff41..484c27abe1 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala @@ -20,7 +20,7 @@ package org.apache.streampark.flink.kubernetes.ingress import org.apache.streampark.common.util.Implicits._ import org.apache.streampark.common.util.Logger -import org.apache.flink.client.program.ClusterClient +import org.apache.flink.configuration.Configuration import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient object IngressController extends Logger { @@ -39,15 +39,19 @@ object IngressController extends Logger { } } - def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { - ingressStrategy.configureIngress(domainName, clusterId, nameSpace) + def configureIngress(domainName: String, clusterId: String, nameSpace: String, flinkConfig: Configuration): String = { + ingressStrategy.configureIngress(domainName, clusterId, nameSpace, flinkConfig) + } + + def deleteIngress(clusterId: String, nameSpace: String, flinkConfig: Configuration): Unit = { + ingressStrategy.deleteIngress(clusterId, nameSpace, flinkConfig) } def getIngressUrlAddress( nameSpace: String, clusterId: String, - clusterClient: ClusterClient[_]): String = { - ingressStrategy.getIngressUrl(nameSpace, clusterId, clusterClient) + flinkConfig: Configuration): Option[String] = { + ingressStrategy.getIngressUrl(nameSpace, clusterId, flinkConfig) } def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala index 425ef1e6de..573698a49f 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala @@ -20,9 +20,9 @@ package org.apache.streampark.flink.kubernetes.ingress import org.apache.streampark.common.conf.{ConfigKeys, InternalConfigHolder, K8sFlinkConfig} import org.apache.streampark.common.util.FileUtils -import org.apache.flink.client.program.ClusterClient +import org.apache.flink.configuration.Configuration import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder} -import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClient import java.io.File @@ -30,12 +30,13 @@ trait IngressStrategy { val REST_SERVICE_IDENTIFICATION = "rest" - lazy val ingressClass: String = - InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass) + lazy val ingressClass: String = InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass) - def getIngressUrl(nameSpace: String, clusterId: String, clusterClient: ClusterClient[_]): String + def getIngressUrl(nameSpace: String, clusterId: String, flinkConfig: Configuration): Option[String] - def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit + def configureIngress(domainName: String, clusterId: String, nameSpace: String, flinkConfig: Configuration): String + + def deleteIngress(clusterId: String, nameSpace: String, flinkConfig: Configuration) def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: String): String = { val workspaceDir = new File(buildWorkspace) @@ -69,7 +70,7 @@ trait IngressStrategy { def getOwnerReference( nameSpace: String, clusterId: String, - client: DefaultKubernetesClient): OwnerReference = { + client: KubernetesClient): OwnerReference = { val deployment = client .apps() diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala index d4984949d7..f82e3889ef 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala @@ -19,9 +19,10 @@ package org.apache.streampark.flink.kubernetes.ingress import org.apache.streampark.common.util.Implicits._ -import org.apache.flink.client.program.ClusterClient -import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder -import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.apache.flink.configuration.Configuration +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder} +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClient import scala.util.{Failure, Success, Try} @@ -30,8 +31,9 @@ class IngressStrategyV1 extends IngressStrategy { override def getIngressUrl( nameSpace: String, clusterId: String, - clusterClient: ClusterClient[_]): String = { - new DefaultKubernetesClient().using(client => + flinkConfig: Configuration): Option[String] = { + val kubernetesClient = FlinkKubeClientFactory.getInstance.createFabric8ioKubernetesClient(flinkConfig) + kubernetesClient.using(client => Try { Option( Try( @@ -42,15 +44,8 @@ class IngressStrategyV1 extends IngressStrategy { .get()) .getOrElse(null)) match { case Some(ingress) => - Option(ingress) - .map(ingress => ingress.getSpec.getRules.head) - .map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath) - .map { case (host, path) => - val newPath = Option(path).filter(_.nonEmpty).map(_.replaceAll("\\/+$", "")).getOrElse("") - s"http://$host$newPath" - } - .getOrElse(clusterClient.using(_.getWebInterfaceURL)) - case None => clusterClient.using(_.getWebInterfaceURL) + extractIngressURL(ingress) + case None => Option.empty[String] } } match { case Success(value) => value @@ -60,7 +55,7 @@ class IngressStrategyV1 extends IngressStrategy { } private[this] def touchIngressBackendRestPort( - client: DefaultKubernetesClient, + client: KubernetesClient, clusterId: String, nameSpace: String): Int = { var ports = client.services @@ -74,8 +69,9 @@ class IngressStrategyV1 extends IngressStrategy { ports.map(servicePort => servicePort.getTargetPort.getIntVal).head } - override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { - new DefaultKubernetesClient().using(client => { + override def configureIngress(domainName: String, clusterId: String, nameSpace: String, flinkConfig: Configuration): String = { + val kubernetesClient = FlinkKubeClientFactory.getInstance.createFabric8ioKubernetesClient(flinkConfig) + kubernetesClient.using(client => { val ownerReference = getOwnerReference(nameSpace, clusterId, client) val ingressBackendRestServicePort = touchIngressBackendRestPort(client, clusterId, nameSpace) @@ -119,7 +115,33 @@ class IngressStrategyV1 extends IngressStrategy { .endRule() .endSpec() .build() - client.network.v1.ingresses().inNamespace(nameSpace).create(ingress) + client.network.v1.ingresses().inNamespace(nameSpace).createOrReplace(ingress) + extractIngressURL(ingress).get }) } + + private def extractIngressURL(ingress: Ingress): Option[String] = { + Option(ingress).map(ingress => ingress.getSpec.getRules.head) + .map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath) + .map { case (host, path) => + val newPath = Option(path).filter(_.nonEmpty).map(_.replaceAll("\\/+$", "")).getOrElse("") + s"http://$host$newPath" + } + } + + override def deleteIngress(clusterId: String, nameSpace: String, flinkConfig: Configuration): Unit = { + val kubernetesClient = FlinkKubeClientFactory.getInstance.createFabric8ioKubernetesClient(flinkConfig) + kubernetesClient.using(client => + Try { + client.network.v1 + .ingresses() + .inNamespace(nameSpace) + .withName(clusterId) + .delete() + } match { + case Success(value) => value + case Failure(e) => + throw new RuntimeException(s"[StreamPark] delete ingress error: $e") + }) + } } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala index 28bd547558..5d7bdb3dfc 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala @@ -20,10 +20,10 @@ package org.apache.streampark.flink.kubernetes.ingress import org.apache.streampark.common.util.Implicits._ import org.apache.commons.lang3.StringUtils -import org.apache.flink.client.program.ClusterClient +import org.apache.flink.configuration.Configuration +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.IntOrString -import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder -import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.networking.v1beta1.{Ingress, IngressBuilder} import scala.util.{Failure, Success, Try} @@ -32,8 +32,9 @@ class IngressStrategyV1beta1 extends IngressStrategy { override def getIngressUrl( nameSpace: String, clusterId: String, - clusterClient: ClusterClient[_]): String = { - new DefaultKubernetesClient().using(client => { + flinkConfig: Configuration): Option[String] = { + val kubernetesClient = FlinkKubeClientFactory.getInstance.createFabric8ioKubernetesClient(flinkConfig) + kubernetesClient.using(client => { Try { Option( Try( @@ -43,12 +44,8 @@ class IngressStrategyV1beta1 extends IngressStrategy { .get) .getOrElse(null)) match { case Some(ingress) => - Option(ingress) - .map(ingress => ingress.getSpec.getRules.head) - .map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath) - .map { case (host, path) => s"http://$host$path" } - .getOrElse(clusterClient.using(_.getWebInterfaceURL)) - case None => clusterClient.using(_.getWebInterfaceURL) + extractIngressURL(ingress) + case None => Option.empty[String] } } match { case Success(value) => value @@ -69,8 +66,9 @@ class IngressStrategyV1beta1 extends IngressStrategy { } } - override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { - new DefaultKubernetesClient().using(client => { + override def configureIngress(domainName: String, clusterId: String, nameSpace: String, flinkConfig: Configuration): String = { + val kubernetesClient = FlinkKubeClientFactory.getInstance.createFabric8ioKubernetesClient(flinkConfig) + kubernetesClient.using(client => { val ownerReference = getOwnerReference(nameSpace, clusterId, client) val ingress = new IngressBuilder() .withNewMetadata() @@ -101,8 +99,30 @@ class IngressStrategyV1beta1 extends IngressStrategy { .endRule() .endSpec() .build() + client.network.ingress.inNamespace(nameSpace).createOrReplace(ingress) + extractIngressURL(ingress).get + }) + } - client.network.ingress.inNamespace(nameSpace).create(ingress) + private def extractIngressURL(ingress: Ingress): Option[String] = { + Option(ingress).map(ingress => ingress.getSpec.getRules.head) + .map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath) + .map { case (host, path) => s"http://$host$path" } + } + + override def deleteIngress(clusterId: String, nameSpace: String, flinkConfig: Configuration): Unit = { + val kubernetesClient = FlinkKubeClientFactory.getInstance.createFabric8ioKubernetesClient(flinkConfig) + kubernetesClient.using(client => { + Try { + client.network.v1beta1.ingresses + .inNamespace(nameSpace) + .withName(clusterId) + .delete() + } match { + case Success(value) => value + case Failure(e) => + throw new RuntimeException(s"[StreamPark] delete ingress error: $e") + } }) } }