Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,5 @@ object ConfigKeys {

val KEY_FLINK_TM_PROCESS_MEMORY = "taskmanager.memory.process.size"

val STREAMPARK_INGRESS_MODE = "streampark.ingress.mode"
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public Map<String, String> getFlinkConfig() throws JsonProcessingException {
@JsonIgnore
public Map<String, Object> getProperties() {
Map<String, Object> propertyMap = new HashMap<>();
propertyMap.put(ConfigKeys.KEY_KERBEROS_SERVICE_ACCOUNT(), this.getServiceAccount());
Map<String, String> dynamicPropertyMap =
FlinkConfigurationUtils.extractDynamicPropertiesAsJava(this.getDynamicProperties());
propertyMap.putAll(this.getOptionMap());
Expand All @@ -188,6 +189,7 @@ public Map<String, Object> getProperties() {
if (resolveOrder != null) {
propertyMap.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), resolveOrder.getName());
}

return propertyMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,8 @@ public String getVersionOfLast() {

@JsonIgnore
public Properties getFlinkConfig() {
String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
Properties flinkConfig = new Properties();
Map<String, String> config = FlinkConfigurationUtils.loadLegacyFlinkConf(flinkYamlString);
flinkConfig.putAll(config);
flinkConfig.putAll(convertFlinkYamlAsMap());
return flinkConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.apache.streampark.flink.client.bean.SubmitResponse;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
import org.apache.streampark.flink.kubernetes.ingress.IngressController;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
Expand All @@ -90,7 +89,6 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
Expand Down Expand Up @@ -526,7 +524,8 @@ private void processForSuccess(
flinkApplication.setJobId(response.jobId());
}

if (FlinkDeployMode.isYarnMode(flinkApplication.getDeployMode())) {
if (FlinkDeployMode.isYarnMode(flinkApplication.getDeployMode())
|| FlinkDeployMode.isKubernetesMode(flinkApplication.getDeployMode())) {
flinkApplication.setClusterId(response.clusterId());
applicationLog.setClusterId(response.clusterId());
}
Expand Down Expand Up @@ -555,23 +554,6 @@ private void processForSuccess(
private void processForK8sApp(FlinkApplication application, ApplicationLog applicationLog) {
application.setRelease(ReleaseStateEnum.DONE.get());
k8SFlinkTrackMonitor.doWatching(k8sWatcherWrapper.toTrackId(application));
if (!FlinkDeployMode.isKubernetesApplicationMode(application.getDeployMode())) {
return;
}
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
try {
IngressController.configureIngress(
domainName, application.getClusterId(), application.getK8sNamespace());
} catch (KubernetesClientException e) {
log.info("Failed to create ingress, stack info:{}", e.getMessage());
applicationLog.setException(e.getMessage());
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
application.setState(FlinkAppStateEnum.FAILED.getValue());
application.setOptionState(OptionStateEnum.NONE.getValue());
}
}
}

private void processForException(
Expand Down Expand Up @@ -776,6 +758,10 @@ private Map<String, Object> getProperties(
}
} else if (FlinkDeployMode.isKubernetesMode(application.getDeployModeEnum())) {
properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
properties.putAll(application.getHotParamsMap());
if (StringUtils.isNotBlank(settingService.getIngressModeDefault())) {
properties.putIfAbsent(ConfigKeys.STREAMPARK_INGRESS_MODE(), settingService.getIngressModeDefault());
}
}

if (FlinkDeployMode.isKubernetesApplicationMode(application.getDeployMode())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDeployMode;
import org.apache.streampark.common.util.YarnUtils;
Expand All @@ -29,6 +30,7 @@
import org.apache.streampark.console.core.mapper.FlinkClusterMapper;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.console.core.service.application.FlinkApplicationInfoService;
import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
Expand Down Expand Up @@ -60,6 +62,7 @@
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -90,6 +93,9 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
@Autowired
private YarnQueueService yarnQueueService;

@Autowired
private SettingService settingService;

@Autowired
private FlinkClusterWatcher flinkClusterWatcher;

Expand Down Expand Up @@ -179,9 +185,10 @@ public void start(FlinkCluster cluster) {
flinkCluster.setJobManagerUrl(deployResponse.address());
} else {
flinkCluster.setAddress(deployResponse.address());
flinkCluster.setJobManagerUrl(deployResponse.address());
}
flinkCluster.setClusterId(deployResponse.clusterId());
flinkCluster.setClusterState(ClusterState.RUNNING.getState());
flinkCluster.setClusterState(ClusterState.STARTING.getState());
flinkCluster.setException(null);
flinkCluster.setEndTime(null);
updateById(flinkCluster);
Expand Down Expand Up @@ -421,16 +428,24 @@ private ShutDownResponse shutdownInternal(FlinkCluster flinkCluster,
}

private DeployResponse deployInternal(FlinkCluster flinkCluster) throws InterruptedException, ExecutionException, TimeoutException {

Map<String, Object> properties = flinkCluster.getProperties();
if (FlinkDeployMode.isKubernetesMode(flinkCluster.getDeployMode())) {
String ingressMode = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(ingressMode)) {
properties.putIfAbsent(ConfigKeys.STREAMPARK_INGRESS_MODE(), ingressMode);
}
}
DeployRequest deployRequest = new DeployRequest(
flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
flinkCluster.getFlinkDeployModeEnum(),
flinkCluster.getProperties(),
properties,
flinkCluster.getClusterId(),
flinkCluster.getId(),
getKubernetesDeployDesc(flinkCluster, "start"));
log.info("Deploy cluster request {}", deployRequest);
Future<DeployResponse> future = executorService.submit(() -> FlinkClient.deploy(deployRequest));
return future.get(60, TimeUnit.SECONDS);
return future.get(5, TimeUnit.MINUTES);
}

private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDeployMode;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.YarnUtils;
Expand Down Expand Up @@ -57,7 +56,9 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
/**
* This implementation is currently used for tracing Cluster on yarn,remote,K8s mode
*/
@Slf4j
@Component
public class FlinkClusterWatcher {
Expand All @@ -82,24 +83,27 @@ public class FlinkClusterWatcher {
// Track interval every 30 seconds
private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);

/** Watcher cluster lists */
/**
* Watcher cluster lists
*/
private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(8);

private static final Cache<Long, ClusterState> FAILED_STATES =
Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build();

private boolean immediateWatch = false;

/** Initialize cluster cache */
/**
* Initialize cluster cache
*/
@PostConstruct
private void init() {
WATCHER_CLUSTERS.clear();
List<FlinkCluster> flinkClusters =
flinkClusterService.list(
new LambdaQueryWrapper<FlinkCluster>()
.eq(FlinkCluster::getClusterState, ClusterState.RUNNING.getState())
// excluding flink clusters on kubernetes
.notIn(FlinkCluster::getDeployMode, FlinkDeployMode.getKubernetesMode()));
.in(FlinkCluster::getClusterState,
ClusterState.RUNNING.getState(), ClusterState.STARTING.getState()));
flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
}

Expand All @@ -123,6 +127,9 @@ private void start() {
alert(flinkCluster, state);
break;
default:
if (!flinkCluster.getClusterState().equals(state)) {
flinkClusterService.updateClusterState(flinkCluster.getId(), state);
}
break;
}
}));
Expand Down Expand Up @@ -196,6 +203,7 @@ private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
private ClusterState httpClusterState(FlinkCluster flinkCluster) {
switch (flinkCluster.getFlinkDeployModeEnum()) {
case REMOTE:
case KUBERNETES_NATIVE_SESSION:
return httpRemoteClusterState(flinkCluster);
case YARN_SESSION:
return httpYarnSessionClusterState(flinkCluster);
Expand Down Expand Up @@ -225,6 +233,11 @@ private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) {
JacksonUtils.read(res, Overview.class);
return ClusterState.RUNNING;
} catch (Exception ignored) {
// When the cluster is just starting up and the Http API is not ready, wait for 5 minutes.
if (flinkCluster.getClusterState().equals(ClusterState.STARTING.getState())
&& (System.currentTimeMillis() - flinkCluster.getStartTime().getTime()) <= 5 * 60 * 1000) {
return ClusterState.STARTING;
}
log.error("cluster id:{} get state from flink api failed", flinkCluster.getId());
}
return ClusterState.LOST;
Expand Down Expand Up @@ -263,14 +276,15 @@ private ClusterState getStateFromYarnRestApi(FlinkCluster flinkCluster) {
* @param flinkCluster
*/
public static void addWatching(FlinkCluster flinkCluster) {
if (!FlinkDeployMode.isKubernetesMode(flinkCluster.getFlinkDeployModeEnum())
&& !WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
if (!WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
log.info("add the cluster with id:{} to watcher cluster cache", flinkCluster.getId());
WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
}
}

/** @param flinkCluster */
/**
* @param flinkCluster
*/
public static void unWatching(FlinkCluster flinkCluster) {
if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
log.info("remove the cluster with id:{} from watcher cluster cache", flinkCluster.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@
target="_blank"
v-if="
record.deployMode === DeployMode.STANDALONE ||
record.deployMode === DeployMode.YARN_SESSION
record.deployMode === DeployMode.YARN_SESSION ||
record.deployMode === DeployMode.KUBERNETES_SESSION
"
>
{{ record.address }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.streampark.flink.client.impl

import org.apache.streampark.common.conf.ConfigKeys
import org.apache.streampark.common.enums.FlinkDeployMode
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse

import com.google.common.collect.Lists
Expand All @@ -41,8 +43,10 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
// require parameters
require(
StringUtils.isNotBlank(submitRequest.clusterId),
s"[flink-submit] submit flink job failed, clusterId is null, mode=${flinkConfig
.get(DeploymentOptions.TARGET)}")
s"[flink-submit] submit flink job failed, clusterId is null, mode=${
flinkConfig
.get(DeploymentOptions.TARGET)
}")

// check the last building result
submitRequest.checkBuildResult()
Expand All @@ -69,11 +73,18 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
.getClusterClient

val clusterId = clusterClient.getClusterId

var webInterfaceURL = clusterClient.getWebInterfaceURL
val ingressDomain = flinkConfig.getString(ConfigKeys.STREAMPARK_INGRESS_MODE, "")
if (StringUtils.isNotBlank(ingressDomain)) {
webInterfaceURL = IngressController.configureIngress(ingressDomain, clusterId, submitRequest.kubernetesNamespace, flinkConfig)
}

val result = SubmitResponse(
clusterId,
flinkConfig.toMap,
submitRequest.jobId,
clusterClient.getWebInterfaceURL)
webInterfaceURL)
logInfo(s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}")

closeSubmit(submitRequest, clusterDescriptor, clusterClient)
Expand All @@ -90,6 +101,7 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
(jobId, client) => {
val resp = super.cancelJob(cancelRequest, jobId, client)
client.shutDownCluster()
IngressController.deleteIngress(cancelRequest.clusterId, cancelRequest.kubernetesNamespace, flinkConf)
CancelResponse(resp)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.flink.client.impl

import org.apache.streampark.common.conf.ConfigKeys
import org.apache.streampark.common.enums.FlinkDeployMode
import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.common.util.Implicits._
Expand All @@ -26,6 +27,7 @@ import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
import org.apache.streampark.flink.core.FlinkKubernetesClient
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sDeployMode
import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey

import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -150,7 +152,17 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo
.deploySessionCluster(kubernetesClusterDescriptor._2)
.getClusterClient
}
DeployResponse(address = client.getWebInterfaceURL, clusterId = client.getClusterId)
var webInterfaceURL = client.getWebInterfaceURL
val ingressDomain = flinkConfig.getString(ConfigKeys.STREAMPARK_INGRESS_MODE, "")
if (StringUtils.isNotBlank(ingressDomain)) {
webInterfaceURL = IngressController.configureIngress(
ingressDomain,
deployRequest.clusterId,
deployRequest.k8sParam.kubernetesNamespace,
flinkConfig)
logInfo(s"create ingress with url: $webInterfaceURL with cluster: ${client.getClusterId}")
}
DeployResponse(address = webInterfaceURL, clusterId = client.getClusterId)
} catch {
case e: Exception => DeployResponse(error = e)
} finally {
Expand All @@ -164,13 +176,13 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo
val flinkConfig = getFlinkK8sConfig(shutDownRequest)
kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)

val stopAndCleanupState =
shutDownRequest.clusterId != null && kubeClientWrapper
.getService(shutDownRequest.clusterId)
.isPresent
if (stopAndCleanupState) {
kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
IngressController.deleteIngress(shutDownRequest.clusterId, shutDownRequest.k8sParam.kubernetesNamespace, flinkConfig)
ShutDownResponse(shutDownRequest.clusterId)
} else {
null
Expand Down
Loading
Loading