diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 8d4ae1e3cb6..ee09e45c762 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -212,7 +212,7 @@ jobs: ${{ runner.os }}-zeppelin- - name: install environment run: | - ./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-117 ${MAVEN_ARGS} + ./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-120 ${MAVEN_ARGS} ./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS} - name: Setup conda environment with python 3.9 and R uses: conda-incubator/setup-miniconda@v3 @@ -239,7 +239,7 @@ jobs: fail-fast: false matrix: python: [ 3.9 ] - flink: [116, 117] + flink: [116, 117, 120] include: # Flink 1.15 supports Python 3.6, 3.7, and 3.8 # https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/ diff --git a/flink/README.md b/flink/README.md index 3b120bf3140..c319e61868d 100644 --- a/flink/README.md +++ b/flink/README.md @@ -13,6 +13,7 @@ Currently, it has the following modules clustered into two groups: * flink1.15-shims * flink1.16-shims * flink1.17-shims +* flink1.20-shims * flink-scala-2.12 diff --git a/flink/flink-scala-2.12/pom.xml b/flink/flink-scala-2.12/pom.xml index 0a1d26e277c..c8095f105be 100644 --- a/flink/flink-scala-2.12/pom.xml +++ b/flink/flink-scala-2.12/pom.xml @@ -33,7 +33,7 @@ - ${flink1.17.version} + ${flink1.20.version} 2.12.7 2.12 ${flink.scala.version} @@ -71,6 +71,12 @@ ${project.version} + + org.apache.zeppelin + flink1.20-shims + ${project.version} + + org.apache.zeppelin zeppelin-python @@ -1300,5 +1306,39 @@ + + flink-120 + + ${flink1.20.version} + 2.12.7 + 2.12 + + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-planner_${flink.scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-sql-client + ${flink.version} + provided + + + org.apache.flink + flink-python + ${flink.version} + provided + + + diff --git a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index c2ceb60360c..88d2fcf0b3b 100644 --- a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -18,25 +18,18 @@ package org.apache.zeppelin.flink -import java.io.{File, IOException} -import java.net.{URL, URLClassLoader} -import java.nio.file.Files -import java.util.Properties -import java.util.concurrent.TimeUnit -import java.util.jar.JarFile - import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.api.common.JobExecutionResult +import org.apache.flink.api.java.{ExecutionEnvironmentFactory, ExecutionEnvironment => JExecutionEnvironment} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration._ import org.apache.flink.core.execution.{JobClient, JobListener} -import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironmentFactory, StreamExecutionEnvironment => JStreamExecutionEnvironment} -import org.apache.flink.api.java.{ExecutionEnvironmentFactory, ExecutionEnvironment => JExecutionEnvironment} import org.apache.flink.runtime.jobgraph.SavepointConfigOptions import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironmentFactory, StreamExecutionEnvironment => JStreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment} @@ -46,19 +39,24 @@ import org.apache.flink.table.module.hive.HiveModule import org.apache.flink.yarn.cli.FlinkYarnSessionCli import org.apache.zeppelin.conf.ZeppelinConfiguration import org.apache.zeppelin.dep.DependencyResolver -import org.apache.zeppelin.flink.internal.FlinkShell -import org.apache.zeppelin.flink.internal.FlinkShell._ import org.apache.zeppelin.flink.internal.FlinkILoop +import org.apache.zeppelin.flink.internal.FlinkShell._ import org.apache.zeppelin.interpreter.Interpreter.FormType import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterHookRegistry, InterpreterResult} import org.slf4j.{Logger, LoggerFactory} +import java.io.{File, IOException} +import java.net.{URL, URLClassLoader} +import java.nio.file.Files +import java.util.Properties +import java.util.concurrent.TimeUnit +import java.util.jar.JarFile import scala.collection.JavaConversions import scala.collection.JavaConverters._ import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{Completion, IMain, IR, JPrintWriter, Results, SimpleReader} +import scala.tools.nsc.interpreter.{Completion, IMain, IR, JPrintWriter, SimpleReader} /** * It instantiate flink scala shell and create env, senv, btenv, stenv. @@ -211,8 +209,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties, Some(ensureYarnConfig(config) .copy(name = Some(appName)))) - val slotNum = Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots", - properties.getProperty("flink.tm.slot", "1"))) + val slotNum = Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots", + properties.getProperty("flink.tm.slot", "1"))) config = config.copy(yarnConfig = Some(ensureYarnConfig(config) .copy(slots = Some(slotNum)))) @@ -426,39 +424,103 @@ abstract class FlinkScalaInterpreter(val properties: Properties, } private def createTableEnvs(): Unit = { - val originalClassLoader = Thread.currentThread().getContextClassLoader - try { - Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader) - val tableConfig = new TableConfig - tableConfig.getConfiguration.addAll(configuration) - - this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims, - this.benv, this.senv, tableConfig, this.userJars.map(new URL(_)).asJava) - - // blink planner - val btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() - .asInstanceOf[EnvironmentSettings.Builder] - .inBatchMode() - .build() - this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkScalaShellLoader); - flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient")) - this.java_btenv = this.btenv - - val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() - .asInstanceOf[EnvironmentSettings.Builder] - .inStreamingMode() - .build() - this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader) - flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient")) - this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader) - - if (!flinkVersion.isAfterFlink114()) { - // flink planner is not supported after flink 1.14 - this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment() - this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment() + if (flinkVersion.getMajorVersion == 1 && flinkVersion.getMinorVersion <= 17) { + val originalClassLoader = Thread.currentThread().getContextClassLoader + try { + LOGGER.warn("Executing Table envs for 117") + Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader) + val tableConfig = new TableConfig + tableConfig.getConfiguration.addAll(configuration) + + this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims, + this.benv, this.senv, tableConfig, this.userJars.map(new URL(_)).asJava) + + // blink planner + val btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() + .asInstanceOf[EnvironmentSettings.Builder] + .inBatchMode() + .build() + this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkScalaShellLoader); + flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient")) + this.java_btenv = this.btenv + + val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() + .asInstanceOf[EnvironmentSettings.Builder] + .inStreamingMode() + .build() + this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader) + flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient")) + this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader) + + if (!flinkVersion.isAfterFlink114()) { + // flink planner is not supported after flink 1.14 + this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment() + this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment() + } + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader) + } + } + else { + val originalCl = Thread.currentThread().getContextClassLoader + try { + val shellCl = getFlinkScalaShellLoader + Thread.currentThread().setContextClassLoader(shellCl) + // Create settings (no Blink planner in 1.20; this is the unified planner) + val btSettings = EnvironmentSettings.newInstance() + .inBatchMode() + .build() + + val stSettings = EnvironmentSettings.newInstance() + .inStreamingMode() + .build() + + // --- Batch TableEnvironment (unified, not Blink/Java/Scala-specific) --- + // NOTE: TableConfig ctor is deprecated/removed; configure AFTER creation. + val btEnv = org.apache.flink.table.api.TableEnvironment.create(btSettings) + btEnv.getConfig.getConfiguration.addAll(configuration) + this.btenv = btEnv + this.java_btenv = btEnv // single impl for both Java/Scala APIs + flinkILoop.bind("btenv", + classOf[org.apache.flink.table.api.TableEnvironment].getCanonicalName, + btEnv, + List("@transient")) + + // --- Streaming TableEnvironment (Scala & Java bridges) --- + // Scala bridge + val stEnv = + org.apache.flink.table.api.bridge.scala.StreamTableEnvironment.create(this.senv, stSettings) + stEnv.getConfig.getConfiguration.addAll(configuration) + this.stenv = stEnv + flinkILoop.bind("stenv", + classOf[org.apache.flink.table.api.bridge.scala.StreamTableEnvironment].getCanonicalName, + stEnv, + List("@transient")) + + + // Java StreamExecutionEnvironment from the Scala one + val jStreamEnv: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment = + this.senv.getJavaEnv // in Scala this accessor is available as `javaEnv` + + // Java bridge + this.java_stenv = + org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(jStreamEnv, stSettings) + + + + // --- (Optional) If you previously injected user jars via a custom loader, + // consider also setting PipelineOptions.JARS so jobs carry them when submitted: + // import org.apache.flink.configuration.PipelineOptions + // configuration.set(PipelineOptions.JARS, this.userJars.asJava) + // If you do this, set it BEFORE creating the envs so it takes effect. + + // No Flink planner (pre-1.14) fallback in 1.20+ + this.btenv_2 = null + this.java_btenv_2 = null + + } finally { + Thread.currentThread().setContextClassLoader(originalCl) } - } finally { - Thread.currentThread().setContextClassLoader(originalClassLoader) } } @@ -519,7 +581,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, // -6 because of .class var className = je.getName.substring(0, je.getName.length - 6) className = className.replace('/', '.') - if (udfPackages.isEmpty || udfPackages.exists( p => className.startsWith(p))) { + if (udfPackages.isEmpty || udfPackages.exists(p => className.startsWith(p))) { val c = cl.loadClass(className) val udf = c.newInstance() if (udf.isInstanceOf[ScalarFunction]) { @@ -539,7 +601,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, } } } catch { - case e : Throwable => + case e: Throwable => LOGGER.info("Fail to inspect udf class: " + je.getName, e) } } @@ -592,19 +654,61 @@ abstract class FlinkScalaInterpreter(val properties: Properties, * This is just a workaround to make table api work in multiple threads. */ def createPlannerAgain(): Unit = { - val originalClassLoader = Thread.currentThread().getContextClassLoader - try { - Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader) - val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() - .asInstanceOf[EnvironmentSettings.Builder] - .inStreamingMode() - .build() - this.tblEnvFactory.createStreamPlanner(stEnvSetting) - } finally { - Thread.currentThread().setContextClassLoader(originalClassLoader) + if (flinkVersion.getMajorVersion == 1 && flinkVersion.getMinorVersion <= 17) { + val originalClassLoader = Thread.currentThread().getContextClassLoader + try { + Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader) + val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() + .asInstanceOf[EnvironmentSettings.Builder] + .inStreamingMode() + .build() + this.tblEnvFactory.createStreamPlanner(stEnvSetting) + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader) + } + } else { + val originalCl = Thread.currentThread().getContextClassLoader + try { + val shellCl = getFlinkScalaShellLoader + Thread.currentThread().setContextClassLoader(shellCl) + + val stSettings = org.apache.flink.table.api.EnvironmentSettings + .newInstance() + .inStreamingMode() + .build() + + // Recreate Scala StreamTableEnvironment + val newScalaStEnv = + org.apache.flink.table.api.bridge.scala.StreamTableEnvironment.create(this.senv, stSettings) + newScalaStEnv.getConfig.getConfiguration.addAll(configuration) + this.stenv = newScalaStEnv + flinkILoop.bind( + "stenv", + classOf[org.apache.flink.table.api.bridge.scala.StreamTableEnvironment].getCanonicalName, + newScalaStEnv, + List("@transient") + ) + + // Recreate Java StreamTableEnvironment (from the Scala env’s Java backing env) + val jEnv: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment = this.senv.getJavaEnv + val newJavaStEnv = + org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(jEnv, stSettings) + newJavaStEnv.getConfig.getConfiguration.addAll(configuration) + this.java_stenv = newJavaStEnv + + // If you also need a (batch) TableEnvironment, rebuild it similarly: + // val btSettings = EnvironmentSettings.newInstance().inBatchMode().build() + // this.btenv = org.apache.flink.table.api.TableEnvironment.create(btSettings) + // this.btenv.getConfig.getConfiguration.addAll(configuration) + // this.java_btenv = this.btenv + + } finally { + Thread.currentThread().setContextClassLoader(originalCl) + } } } + def interpret(code: String, context: InterpreterContext): InterpreterResult = { val originalStdOut = System.out val originalStdErr = System.err; @@ -671,7 +775,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, // flink 1.12 use copied version of configuration, so in order to update configuration we have to // get the internal configuration of StreamExecutionEnvironment. val internalConfiguration = getConfigurationOfStreamExecutionEnv() - if (!StringUtils.isBlank(savepointPath) && resumeFromSavepoint){ + if (!StringUtils.isBlank(savepointPath) && resumeFromSavepoint) { LOGGER.info("Resume job from savepoint , savepointPath = {}", savepointPath) internalConfiguration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), savepointPath); return @@ -913,7 +1017,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, } } - def getUserJars:java.util.List[String] = JavaConversions.seqAsJavaList(userJars) + def getUserJars: java.util.List[String] = JavaConversions.seqAsJavaList(userJars) def completion(buf: String, cursor: Int, context: InterpreterContext): java.util.List[InterpreterCompletion] diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index 11de5bd3b7e..ae54a794ea0 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -69,7 +69,10 @@ private static FlinkShims loadShims(FlinkVersion flinkVersion, } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 17) { LOGGER.info("Initializing shims for Flink 1.17"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink117Shims"); - } else { + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 20) { + LOGGER.info("Initializing shims for Flink 1.20"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink120Shims"); + }else { throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet"); } diff --git a/flink/flink1.20-shims/pom.xml b/flink/flink1.20-shims/pom.xml new file mode 100644 index 00000000000..15db84812d8 --- /dev/null +++ b/flink/flink1.20-shims/pom.xml @@ -0,0 +1,207 @@ + + + + + + flink-parent + org.apache.zeppelin + 0.13.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.apache.zeppelin + flink1.20-shims + 0.13.0-SNAPSHOT + jar + Zeppelin: Flink1.20 Shims + + + ${flink1.20.version} + 2.12 + + + + + + org.apache.zeppelin + flink-shims + ${project.version} + + + + org.apache.flink + flink-core + ${flink.version} + provided + + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala-bridge_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + + org.apache.flink + flink-scala_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + + org.apache.flink + flink-table-planner_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-python + ${flink.version} + provided + + + + org.apache.flink + flink-sql-client + ${flink.version} + provided + + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + ${flink.scala.version} + + -unchecked + -deprecation + -feature + -nobootcp + + + -Xms1024m + -Xmx1024m + -XX:MaxMetaspaceSize=${MaxMetaspace} + + + -source + ${java.version} + -target + ${java.version} + -Xlint:all,-serial,-path,-options + + + + + + maven-resources-plugin + + + copy-interpreter-setting + none + + true + + + + + + + + diff --git a/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/Flink120Shims.java b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/Flink120Shims.java new file mode 100644 index 00000000000..b307fefea0f --- /dev/null +++ b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/Flink120Shims.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.ZoneId; +import java.util.List; +import java.util.Properties; +import java.util.StringJoiner; +import java.util.stream.Stream; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.cli.CustomCommandLine; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; +import org.apache.flink.table.api.*; +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.client.resource.ClientResourceManager; +import org.apache.flink.table.client.util.ClientClassloaderUtil; +import org.apache.flink.table.client.util.ClientWrapperClassLoader; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.factories.CatalogStoreFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.PlannerFactoryUtil; +import org.apache.flink.table.factories.TableFactoryUtil; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableAggregateFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.resource.ResourceManager; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.FlinkException; +import org.apache.zeppelin.flink.shims120.CollectStreamTableSink; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Shims for flink 1.20 + */ +public class Flink120Shims extends FlinkShims { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink120Shims.class); + + private Flink120SqlInterpreter batchSqlInterpreter; + private Flink120SqlInterpreter streamSqlInterpreter; + + public Flink120Shims(FlinkVersion flinkVersion, Properties properties) { + super(flinkVersion, properties); + } + + public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) { + this.batchSqlInterpreter = new Flink120SqlInterpreter(flinkSqlContext, true); + } + + public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) { + this.streamSqlInterpreter = new Flink120SqlInterpreter(flinkSqlContext, false); + } + + @Override + public Object createResourceManager(List jars, Object tableConfig) { + Configuration configuration = ((TableConfig) tableConfig).getConfiguration().clone(); + ClientWrapperClassLoader userClassLoader = + new ClientWrapperClassLoader( + ClientClassloaderUtil.buildUserClassLoader( + jars, + Thread.currentThread().getContextClassLoader(), + new Configuration(configuration)), + configuration); + return new ClientResourceManager(configuration, userClassLoader); + } + + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List jars) { + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig); + return new FunctionCatalog((TableConfig) tableConfig, resourceManager, (CatalogManager) catalogManager, (ModuleManager) moduleManager); + } + + @Override + public void disableSysoutLogging(Object batchConfig, Object streamConfig) { + // do nothing + } + + @Override + public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig); + + return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, + moduleManager, resourceManager, + functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), + planner, executor, environmentSettings.isStreamingMode()); + } + + @Override + public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig); + + return new StreamTableEnvironmentImpl(catalogManager, moduleManager, resourceManager, + functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode()); + } + + @Override + public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { + return new StreamExecutionEnvironmentFactory() { + @Override + public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) { + return (StreamExecutionEnvironment) streamExecutionEnvironment; + } + }; + } + + @Override + public Object createCatalogManager(Object config) { + final TableConfig tableConfig = TableConfig.getDefault(); + ClassLoader userClassLoader=Thread.currentThread().getContextClassLoader(); + + final CatalogStoreFactory catalogStoreFactory = + TableFactoryUtil.findAndCreateCatalogStoreFactory( + tableConfig.getConfiguration(), userClassLoader); + final CatalogStore catalogStore = + catalogStoreFactory.createCatalogStore(); + + return CatalogManager.newBuilder() + .classLoader(Thread.currentThread().getContextClassLoader()) + .config((ReadableConfig) config) + .defaultCatalog( + "default_catalog", + new GenericInMemoryCatalog( + "default_catalog", + "default_database")) + . catalogStoreHolder( + CatalogStoreHolder.newBuilder() + .classloader(Thread.currentThread().getContextClassLoader()) + .config(tableConfig) + .catalogStore(catalogStore) + .factory(catalogStoreFactory) + .build()) + .build(); + } + + @Override + public String getPyFlinkPythonPath(Properties properties) throws IOException { + String mode = properties.getProperty("flink.execution.mode"); + + final Path pyDir; + if ("yarn-application".equalsIgnoreCase(mode)) { + // for yarn-application mode, FLINK_HOME is the container working directory + Path flinkHome = Paths.get(".").toAbsolutePath().normalize(); + pyDir = Paths.get(flinkHome.toString(), "lib", "python"); + } else { + String flinkHome = System.getenv("FLINK_HOME"); + if (flinkHome != null && !flinkHome.isBlank()) { + pyDir = Paths.get(flinkHome,"opt","python"); + } else { + throw new IOException("No FLINK_HOME is specified"); + } + } + return getPyFlinkPythonPath(pyDir); + } + + private String getPyFlinkPythonPath(Path pyFlinkFolder) throws IOException { + LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder.toAbsolutePath()); + + if (!Files.exists(pyFlinkFolder) || !Files.isDirectory(pyFlinkFolder)) { + throw new IOException(String.format( + "PyFlink folder %s does not exist or is not a folder", + pyFlinkFolder.toAbsolutePath() + )); + } + + StringJoiner joiner = new StringJoiner(java.io.File.pathSeparator); + try (Stream entries = Files.list(pyFlinkFolder)) { + entries.forEach(p -> { + LOGGER.info("Adding extracted file {} to PYTHONPATH", p.toAbsolutePath()); + joiner.add(p.toAbsolutePath().toString()); + }); + } + + return joiner.toString(); + } + + @Override + public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { + return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer>) serializer); + } + + @Override + public List collectToList(Object table) throws Exception { + return Lists.newArrayList(((Table) table).execute().collect()); + } + + @Override + public boolean rowEquals(Object row1, Object row2) { + Row r1 = (Row) row1; + Row r2 = (Row) row2; + r1.setKind(RowKind.INSERT); + r2.setKind(RowKind.INSERT); + return r1.equals(r2); + } + + @Override + public Object fromDataSet(Object btenv, Object ds) { + throw new RuntimeException("Conversion from DataSet is not supported in Flink 1.20"); + } + + @Override + public Object toDataSet(Object btenv, Object table) { + throw new RuntimeException("Conversion to DataSet is not supported in Flink 1.20"); + } + + @Override + public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { + ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv) + .registerTableSinkInternal(tableName, (TableSink) collectTableSink); + } + + @Override + public void registerScalarFunction(Object btenv, String name, Object scalarFunction) { + ((StreamTableEnvironmentImpl) (btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction); + } + + @Override + public void registerTableFunction(Object btenv, String name, Object tableFunction) { + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction); + } + + @Override + public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); + } + + @Override + public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); + } + + /** + * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager. + * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code. + * + * @param catalogManager + * @param parserObject + * @param environmentSetting + */ + @Override + public void setCatalogManagerSchemaResolver(Object catalogManager, + Object parserObject, + Object environmentSetting) { + + } + + @Override + public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { + CustomCommandLine customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); + try { + ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine)); + return effectiveConfig; + } catch (FlinkException e) { + throw new RuntimeException("Fail to call addAll", e); + } + } + + @Override + public void setBatchRuntimeMode(Object tableConfig) { + ((TableConfig) tableConfig).getConfiguration() + .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); + } + + @Override + public void setOldPlanner(Object tableConfig) { + + } + + @Override + public String[] rowToString(Object row, Object table, Object tableConfig) { + final String zone = ((TableConfig) tableConfig).getConfiguration() + .get(TableConfigOptions.LOCAL_TIME_ZONE); + ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) + ? ZoneId.systemDefault() + : ZoneId.of(zone); + + ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema(); + return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId); + } + + @Override + public boolean isTimeIndicatorType(Object type) { + return type instanceof TimeIndicatorTypeInfo; + } + + private Object lookupExecutor(ClassLoader classLoader, + Object settings, + Object sEnv) { + try { + final ExecutorFactory executorFactory = + FactoryUtil.discoverFactory( + classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); + final Method createMethod = + executorFactory + .getClass() + .getMethod("create", StreamExecutionEnvironment.class); + + return createMethod.invoke(executorFactory, sEnv); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + } + + @Override + public ImmutablePair createPlannerAndExecutor( + ClassLoader classLoader, Object environmentSettings, Object sEnv, + Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { + EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; + Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv); + Planner planner = PlannerFactoryUtil.createPlanner(executor, + (TableConfig) tableConfig, + Thread.currentThread().getContextClassLoader(), + (ModuleManager) moduleManager, + (CatalogManager) catalogManager, + (FunctionCatalog) functionCatalog); + return ImmutablePair.of(planner, executor); + } + + @Override + public Object createBlinkPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance(); + } + + @Override + public Object createOldPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance(); + } + + public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { + if (isBatch) { + return batchSqlInterpreter.runSqlList(st, context); + } else { + return streamSqlInterpreter.runSqlList(st, context); + } + } +} diff --git a/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/Flink120SqlInterpreter.java b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/Flink120SqlInterpreter.java new file mode 100644 index 00000000000..ed4717a5072 --- /dev/null +++ b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/Flink120SqlInterpreter.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobListener; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.SqlParserException; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.operations.*; +import org.apache.flink.table.operations.command.HelpOperation; +import org.apache.flink.table.operations.command.SetOperation; +import org.apache.flink.table.operations.ddl.*; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ZeppelinContext; +import org.apache.zeppelin.interpreter.util.SqlSplitter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + + +public class Flink120SqlInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink120SqlInterpreter.class); + private static final String CMD_DESC_DELIMITER = "\t\t"; + + /** + * SQL Client HELP command helper class. + */ + private static final class SQLCliCommandsDescriptions { + private int commandMaxLength; + private final Map commandsDescriptions; + + public SQLCliCommandsDescriptions() { + this.commandsDescriptions = new LinkedHashMap<>(); + this.commandMaxLength = -1; + } + + public SQLCliCommandsDescriptions commandDescription(String command, String description) { + Preconditions.checkState( + StringUtils.isNotBlank(command), "content of command must not be empty."); + Preconditions.checkState( + StringUtils.isNotBlank(description), + "content of command's description must not be empty."); + this.updateMaxCommandLength(command.length()); + this.commandsDescriptions.put(command, description); + return this; + } + + private void updateMaxCommandLength(int newLength) { + Preconditions.checkState(newLength > 0); + if (this.commandMaxLength < newLength) { + this.commandMaxLength = newLength; + } + } + + public AttributedString build() { + AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder(); + if (!this.commandsDescriptions.isEmpty()) { + this.commandsDescriptions.forEach( + (cmd, cmdDesc) -> { + attributedStringBuilder + .style(AttributedStyle.DEFAULT.bold()) + .append( + String.format( + String.format("%%-%ds", commandMaxLength), cmd)) + .append(CMD_DESC_DELIMITER) + .style(AttributedStyle.DEFAULT) + .append(cmdDesc) + .append('\n'); + }); + } + return attributedStringBuilder.toAttributedString(); + } + } + + private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS = + new SQLCliCommandsDescriptions() + .commandDescription("HELP", "Prints the available commands.") + .commandDescription( + "SET", + "Sets a session configuration property. Syntax: \"SET ''='';\". Use \"SET;\" for listing all properties.") + .commandDescription( + "RESET", + "Resets a session configuration property. Syntax: \"RESET '';\". Use \"RESET;\" for reset all session properties.") + .commandDescription( + "INSERT INTO", + "Inserts the results of a SQL SELECT query into a declared table sink.") + .commandDescription( + "INSERT OVERWRITE", + "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.") + .commandDescription( + "SELECT", "Executes a SQL SELECT query on the Flink cluster.") + .commandDescription( + "EXPLAIN", + "Describes the execution plan of a query or table with the given name.") + .commandDescription( + "BEGIN STATEMENT SET", + "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"") + .commandDescription("END", "Ends a statement set. Syntax: \"END;\"") + // (TODO) zjffdu, ADD/REMOVE/SHOW JAR + .build(); + + // -------------------------------------------------------------------------------------------- + + public static final AttributedString MESSAGE_HELP = + new AttributedStringBuilder() + .append("The following commands are available:\n\n") + .append(SQL_CLI_COMMANDS_DESCRIPTIONS) + .style(AttributedStyle.DEFAULT.underline()) + .append("\nHint") + .style(AttributedStyle.DEFAULT) + .append( + ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.") + // About Documentation Link. + .style(AttributedStyle.DEFAULT) + .append( + "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.") + .toAttributedString(); + + private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit."; + + private FlinkSqlContext flinkSqlContext; + private TableEnvironment tbenv; + private ZeppelinContext z; + private Parser sqlParser; + private SqlSplitter sqlSplitter; + // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: + // 1. runAsOne= true + // 2. begin statement set; + // ... + // end; + private Map> statementOperationsMap = new HashMap<>(); + private boolean isBatch; + private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); + + + public Flink120SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) { + this.flinkSqlContext = flinkSqlContext; + this.isBatch = isBatch; + if (isBatch) { + this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv(); + } else { + this.tbenv = (TableEnvironment) flinkSqlContext.getStenv(); + } + this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext(); + this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser(); + this.sqlSplitter = new SqlSplitter(); + JobListener jobListener = new JobListener() { + @Override + public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + LOGGER.info("UnLock JobSubmitLock"); + } + } + + @Override + public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { + + } + }; + + ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener); + ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener); + } + + public InterpreterResult runSqlList(String st, InterpreterContext context) { + try { + boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); + if (runAsOne) { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + String jobName = context.getLocalProperties().get("jobName"); + if (StringUtils.isNotBlank(jobName)) { + tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); + } + + List sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList()); + for (String sql : sqls) { + List operations = null; + try { + operations = sqlParser.parse(sql); + } catch (SqlParserException e) { + context.out.write("%text Invalid Sql statement: " + sql + "\n"); + context.out.write(MESSAGE_HELP.toString()); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); + } + + try { + callOperation(sql, operations.get(0), context); + context.out.flush(); + } catch (Throwable e) { + LOGGER.error("Fail to run sql:" + sql, e); + try { + context.out.write("%text Fail to run sql command: " + + sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n"); + } catch (IOException ex) { + LOGGER.warn("Unexpected exception:", ex); + return new InterpreterResult(InterpreterResult.Code.ERROR, + ExceptionUtils.getStackTrace(e)); + } + return new InterpreterResult(InterpreterResult.Code.ERROR); + } + } + + if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) { + try { + lock.lock(); + List modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); + if (!modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql as one job", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + statementOperationsMap.remove(context.getParagraphId()); + } + + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } + + private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException { + if (operation instanceof HelpOperation) { + // HELP + callHelp(context); + } else if (operation instanceof SetOperation) { + // SET + callSet((SetOperation) operation, context); + } else if (operation instanceof ModifyOperation) { + // INSERT INTO/OVERWRITE + callInsert((ModifyOperation) operation, context); + } else if (operation instanceof QueryOperation) { + // SELECT + callSelect(sql, (QueryOperation) operation, context); + } else if (operation instanceof ExplainOperation) { + // EXPLAIN + callExplain((ExplainOperation) operation, context); + } else if (operation instanceof BeginStatementSetOperation) { + // BEGIN STATEMENT SET + callBeginStatementSet(context); + } else if (operation instanceof EndStatementSetOperation) { + // END + callEndStatementSet(context); + } else if (operation instanceof ShowCreateTableOperation) { + // SHOW CREATE TABLE + callShowCreateTable((ShowCreateTableOperation) operation, context); + } else if (operation instanceof ShowCatalogsOperation) { + callShowCatalogs(context); + } else if (operation instanceof ShowCurrentCatalogOperation) { + callShowCurrentCatalog(context); + } else if (operation instanceof UseCatalogOperation) { + callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context); + } else if (operation instanceof CreateCatalogOperation) { + callDDL(sql, context, "Catalog has been created."); + } else if (operation instanceof DropCatalogOperation) { + callDDL(sql, context, "Catalog has been dropped."); + } else if (operation instanceof UseDatabaseOperation) { + UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation; + callUseDatabase(useDBOperation.getDatabaseName(), context); + } else if (operation instanceof CreateDatabaseOperation) { + callDDL(sql, context, "Database has been created."); + } else if (operation instanceof DropDatabaseOperation) { + callDDL(sql, context, "Database has been removed."); + } else if (operation instanceof AlterDatabaseOperation) { + callDDL(sql, context, "Alter database succeeded!"); + } else if (operation instanceof ShowDatabasesOperation) { + callShowDatabases(context); + } else if (operation instanceof ShowCurrentDatabaseOperation) { + callShowCurrentDatabase(context); + } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) { + callDDL(sql, context, "Table has been created."); + } else if (operation instanceof AlterTableOperation) { + callDDL(sql, context, "Alter table succeeded!"); + } else if (operation instanceof DropTableOperation) { + callDDL(sql, context, "Table has been dropped."); + } else if (operation instanceof DescribeTableOperation) { + DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; + callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context); + } else if (operation instanceof ShowTablesOperation) { + callShowTables(context); + } else if (operation instanceof CreateViewOperation) { + callDDL(sql, context, "View has been created."); + } else if (operation instanceof DropViewOperation) { + callDDL(sql, context, "View has been dropped."); + } else if (operation instanceof AlterViewOperation) { + callDDL(sql, context, "Alter view succeeded!"); + } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been created."); + } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been removed."); + } else if (operation instanceof AlterCatalogFunctionOperation) { + callDDL(sql, context, "Alter function succeeded!"); + } else if (operation instanceof ShowFunctionsOperation) { + callShowFunctions(context); + } else if (operation instanceof ShowModulesOperation) { + callShowModules(context); + } else if (operation instanceof ShowPartitionsOperation) { + ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation; + callShowPartitions(showPartitionsOperation.asSummaryString(), context); + } else { + throw new IOException(operation.getClass().getName() + " is not supported"); + } + } + + + private void callHelp(InterpreterContext context) throws IOException { + context.out.write(MESSAGE_HELP.toString() + "\n"); + } + + private void callInsert(ModifyOperation operation, InterpreterContext context) throws IOException { + if (statementOperationsMap.containsKey(context.getParagraphId())) { + List modifyOperations = statementOperationsMap.get(context.getParagraphId()); + modifyOperations.add(operation); + } else { + callInserts(Collections.singletonList(operation), context); + } + } + + private void callInserts(List operations, InterpreterContext context) throws IOException { + if (!isBatch) { + context.getLocalProperties().put("flink.streaming.insert_into", "true"); + } + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); + checkState(tableResult.getJobClient().isPresent()); + try { + tableResult.await(); + JobClient jobClient = tableResult.getJobClient().get(); + if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { + context.out.write("Insertion successfully.\n"); + } else { + throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); + } + } catch (InterruptedException e) { + throw new IOException("Flink job is interrupted", e); + } catch (ExecutionException e) { + throw new IOException("Flink job is failed", e); + } + } + + private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + if (isBatch) { + callBatchInnerSelect(sql, context); + } else { + callStreamInnerSelect(sql, context); + } + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException { + Table table = this.tbenv.sqlQuery(sql); + String result = z.showData(table); + context.out.write(result); + } + + public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException { + flinkSqlContext.getStreamSqlSelectConsumer().accept(sql); + } + + public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException { + if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { + // set a property + String key = setOperation.getKey().get().trim(); + String value = setOperation.getValue().get().trim(); + this.tbenv.getConfig().getConfiguration().setString(key, value); + LOGGER.info("Set table config: {}={}", key, value); + } else { + // show all properties + final Map properties = this.tbenv.getConfig().getConfiguration().toMap(); + List prettyEntries = new ArrayList<>(); + for (String key : properties.keySet()) { + prettyEntries.add( + String.format( + "'%s' = '%s'", + EncodingUtils.escapeSingleQuotes(key), + EncodingUtils.escapeSingleQuotes(properties.get(key)))); + } + prettyEntries.sort(String::compareTo); + prettyEntries.forEach(entry -> { + try { + context.out.write(entry + "\n"); + } catch (IOException e) { + LOGGER.warn("Fail to write output", e); + } + }); + } + } + + private void callBeginStatementSet(InterpreterContext context) throws IOException { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + private void callEndStatementSet(InterpreterContext context) throws IOException { + List modifyOperations = statementOperationsMap.get(context.getParagraphId()); + if (modifyOperations != null && !modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } else { + context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); + } + } + + private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { + tbenv.executeSql("USE CATALOG `" + catalog + "`"); + } + + private void callUseDatabase(String databaseName, + InterpreterContext context) throws IOException { + this.tbenv.executeSql("USE `" + databaseName + "`"); + } + + private void callShowCatalogs(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs"); + List catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n"); + } + + private void callShowCurrentCatalog(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog"); + String catalog = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current catalog: " + catalog + "\n"); + } + + private void callShowDatabases(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Databases"); + List databases = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table database\n" + StringUtils.join(databases, "\n") + "\n"); + } + + private void callShowCurrentDatabase(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Database"); + String database = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current database: " + database + "\n"); + } + + private void callShowTables(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Tables"); + List tables = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .filter(tbl -> !tbl.startsWith("UnnamedTable")) + .collect(Collectors.toList()); + context.out.write( + "%table table\n" + StringUtils.join(tables, "\n") + "\n"); + } + + private void callShowFunctions(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Functions"); + List functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table function\n" + StringUtils.join(functions, "\n") + "\n"); + } + + private void callShowModules(InterpreterContext context) throws IOException { + String[] modules = this.tbenv.listModules(); + context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n"); + } + + private void callShowPartitions(String sql, InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql(sql); + List partions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table partitions\n" + StringUtils.join(partions, "\n") + "\n"); + } + + private void callDDL(String sql, InterpreterContext context, String message) throws IOException { + try { + lock.lock(); + this.tbenv.executeSql(sql); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + context.out.write(message + "\n"); + } + + private void callDescribe(String name, InterpreterContext context) throws IOException { + TableResult tableResult = null; + try { + tableResult = tbenv.executeSql("DESCRIBE " + name); + } catch (Exception e) { + throw new IOException("Fail to describe table: " + name, e); + } + CloseableIterator result = tableResult.collect(); + StringBuilder builder = new StringBuilder(); + builder.append("Column\tType\n"); + while (result.hasNext()) { + Row row = result.next(); + builder.append(row.getField(0) + "\t" + row.getField(1) + "\n"); + } + context.out.write("%table\n" + builder.toString()); + } +} diff --git a/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java new file mode 100644 index 00000000000..a35ad3a6cd1 --- /dev/null +++ b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.*; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.zeppelin.flink.TimestampStringUtils.*; + +/** + * Copied from flink-project with minor modification. + * */ +public class PrintUtils { + + public static final String NULL_COLUMN = "(NULL)"; + private static final String COLUMN_TRUNCATED_FLAG = "..."; + + private PrintUtils() {} + + + public static String[] rowToString( + Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) { + return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone); + } + + public static String[] rowToString( + Row row, + String nullColumn, + boolean printRowKind, + ResolvedSchema resolvedSchema, + ZoneId sessionTimeZone) { + final int len = printRowKind ? row.getArity() + 1 : row.getArity(); + final List fields = new ArrayList<>(len); + if (printRowKind) { + fields.add(row.getKind().shortString()); + } + for (int i = 0; i < row.getArity(); i++) { + final Object field = row.getField(i); + final LogicalType fieldType = + resolvedSchema.getColumnDataTypes().get(i).getLogicalType(); + if (field == null) { + fields.add(nullColumn); + } else { + fields.add( + StringUtils.arrayAwareToString( + formattedTimestamp(field, fieldType, sessionTimeZone))); + } + } + return fields.toArray(new String[0]); + } + + /** + * Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data. + * + *

This method also supports nested type ARRAY, ROW, MAP. + */ + private static Object formattedTimestamp( + Object field, LogicalType fieldType, ZoneId sessionTimeZone) { + final LogicalTypeRoot typeRoot = fieldType.getTypeRoot(); + if (field == null) { + return "null"; + } + switch (typeRoot) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return formatTimestampField(field, fieldType, sessionTimeZone); + case TIME_WITHOUT_TIME_ZONE: + return formatTimeField(field); + case ARRAY: + LogicalType elementType = ((ArrayType) fieldType).getElementType(); + if (field instanceof List) { + List array = (List) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp(array.get(i), elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass().isArray()) { + // primitive type + if (field.getClass() == byte[].class) { + byte[] array = (byte[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == short[].class) { + short[] array = (short[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == int[].class) { + int[] array = (int[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == long[].class) { + long[] array = (long[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == float[].class) { + float[] array = (float[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == double[].class) { + double[] array = (double[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == boolean[].class) { + boolean[] array = (boolean[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == char[].class) { + char[] array = (char[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else { + // non-primitive type + Object[] array = (Object[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } + } else { + return field; + } + case ROW: + if (fieldType instanceof RowType && field instanceof Row) { + Row row = (Row) field; + Row formattedRow = new Row(row.getKind(), row.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + formattedRow.setField( + i, formattedTimestamp(row.getField(i), type, sessionTimeZone)); + } + return formattedRow; + + } else if (fieldType instanceof RowType && field instanceof RowData) { + RowData rowData = (RowData) field; + Row formattedRow = new Row(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, i); + formattedRow.setField( + i, + formattedTimestamp( + fieldGetter.getFieldOrNull(rowData), + type, + sessionTimeZone)); + } + return formattedRow; + } else { + return field; + } + case MAP: + LogicalType keyType = ((MapType) fieldType).getKeyType(); + LogicalType valueType = ((MapType) fieldType).getValueType(); + if (fieldType instanceof MapType && field instanceof Map) { + Map map = ((Map) field); + Map formattedMap = new HashMap<>(map.size()); + for (Object key : map.keySet()) { + formattedMap.put( + formattedTimestamp(key, keyType, sessionTimeZone), + formattedTimestamp(map.get(key), valueType, sessionTimeZone)); + } + return formattedMap; + } else if (fieldType instanceof MapType && field instanceof MapData) { + MapData map = ((MapData) field); + Map formattedMap = new HashMap<>(map.size()); + Object[] keyArray = + (Object[]) formattedTimestamp(map.keyArray(), keyType, sessionTimeZone); + Object[] valueArray = + (Object[]) + formattedTimestamp( + map.valueArray(), valueType, sessionTimeZone); + for (int i = 0; i < keyArray.length; i++) { + formattedMap.put(keyArray[i], valueArray[i]); + } + return formattedMap; + } else { + return field; + } + default: + return field; + } + } + + /** + * Formats the print content of TIMESTAMP and TIMESTAMP_LTZ type data, consider the user + * configured time zone. + */ + private static Object formatTimestampField( + Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) { + switch (fieldType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int precision = getPrecision(fieldType); + if (timestampField instanceof java.sql.Timestamp) { + // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE + return timestampToString( + ((Timestamp) timestampField).toLocalDateTime(), precision); + } else if (timestampField instanceof java.time.LocalDateTime) { + return timestampToString(((LocalDateTime) timestampField), precision); + } else if (timestampField instanceof TimestampData) { + return timestampToString( + ((TimestampData) timestampField).toLocalDateTime(), precision); + } else { + return timestampField; + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + Instant instant = null; + if (timestampField instanceof java.time.Instant) { + instant = ((Instant) timestampField); + } else if (timestampField instanceof java.sql.Timestamp) { + Timestamp timestamp = ((Timestamp) timestampField); + // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE + instant = + TimestampData.fromEpochMillis( + timestamp.getTime(), timestamp.getNanos() % 1000_000) + .toInstant(); + } else if (timestampField instanceof TimestampData) { + instant = ((TimestampData) timestampField).toInstant(); + } else if (timestampField instanceof Integer) { + instant = Instant.ofEpochSecond((Integer) timestampField); + } else if (timestampField instanceof Long) { + instant = Instant.ofEpochMilli((Long) timestampField); + } + if (instant != null) { + return timestampToString( + instant.atZone(sessionTimeZone).toLocalDateTime(), + getPrecision(fieldType)); + } else { + return timestampField; + } + default: + return timestampField; + } + } + + /** Formats the print content of TIME type data. */ + private static Object formatTimeField(Object timeField) { + if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) { + return unixTimeToString((int) timeField); + } else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) { + return unixTimeToString(((Long) timeField).intValue()); + } else if (timeField instanceof Time) { + return unixTimeToString(timeToInternal((Time) timeField)); + } else if (timeField instanceof LocalTime) { + return unixTimeToString(localTimeToUnixDate((LocalTime) timeField)); + } else { + return timeField; + } + } +} diff --git a/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java new file mode 100644 index 00000000000..c52104e45af --- /dev/null +++ b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import java.sql.Time; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.TimeZone; + +/** + * Copied from flink-project with minor modification. + * */ +public class TimestampStringUtils { + + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + public TimestampStringUtils() { + } + + public static String timestampToString(LocalDateTime ldt, int precision) { + String fraction; + for(fraction = pad(9, (long)ldt.getNano()); fraction.length() > precision && fraction.endsWith("0"); fraction = fraction.substring(0, fraction.length() - 1)) { + } + + StringBuilder ymdhms = ymdhms(new StringBuilder(), ldt.getYear(), ldt.getMonthValue(), ldt.getDayOfMonth(), ldt.getHour(), ldt.getMinute(), ldt.getSecond()); + if (fraction.length() > 0) { + ymdhms.append(".").append(fraction); + } + + return ymdhms.toString(); + } + + private static String pad(int length, long v) { + StringBuilder s = new StringBuilder(Long.toString(v)); + + while(s.length() < length) { + s.insert(0, "0"); + } + + return s.toString(); + } + + private static StringBuilder hms(StringBuilder b, int h, int m, int s) { + int2(b, h); + b.append(':'); + int2(b, m); + b.append(':'); + int2(b, s); + return b; + } + + private static StringBuilder ymdhms(StringBuilder b, int year, int month, int day, int h, int m, int s) { + ymd(b, year, month, day); + b.append(' '); + hms(b, h, m, s); + return b; + } + + private static StringBuilder ymd(StringBuilder b, int year, int month, int day) { + int4(b, year); + b.append('-'); + int2(b, month); + b.append('-'); + int2(b, day); + return b; + } + + private static void int4(StringBuilder buf, int i) { + buf.append((char)(48 + i / 1000 % 10)); + buf.append((char)(48 + i / 100 % 10)); + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + private static void int2(StringBuilder buf, int i) { + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + public static String unixTimeToString(int time) { + StringBuilder buf = new StringBuilder(8); + unixTimeToString(buf, time, 0); + return buf.toString(); + } + + private static void unixTimeToString(StringBuilder buf, int time, int precision) { + while(time < 0) { + time = (int)((long)time + 86400000L); + } + + int h = time / 3600000; + int time2 = time % 3600000; + int m = time2 / '\uea60'; + int time3 = time2 % '\uea60'; + int s = time3 / 1000; + int ms = time3 % 1000; + int2(buf, h); + buf.append(':'); + int2(buf, m); + buf.append(':'); + int2(buf, s); + if (precision > 0) { + buf.append('.'); + + while(precision > 0) { + buf.append((char)(48 + ms / 100)); + ms %= 100; + ms *= 10; + if (ms == 0) { + break; + } + + --precision; + } + } + + } + + public static int timeToInternal(Time time) { + long ts = time.getTime() + (long)LOCAL_TZ.getOffset(time.getTime()); + return (int)(ts % 86400000L); + } + + public static int localTimeToUnixDate(LocalTime time) { + return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000; + } +} diff --git a/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/shims120/CollectStreamTableSink.java b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/shims120/CollectStreamTableSink.java new file mode 100644 index 00000000000..b9dfa45aac1 --- /dev/null +++ b/flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/shims120/CollectStreamTableSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims120; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Table sink for collecting the results locally using sockets. + */ +public class CollectStreamTableSink implements RetractStreamTableSink { + + private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); + + private final InetAddress targetAddress; + private final int targetPort; + private final TypeSerializer> serializer; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + public CollectStreamTableSink(InetAddress targetAddress, + int targetPort, + TypeSerializer> serializer) { + LOGGER.info("Use address: {}:{}", targetAddress.getHostAddress(), targetPort); + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.serializer = serializer; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + final CollectStreamTableSink copy = + new CollectStreamTableSink(targetAddress, targetPort, serializer); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @Override + public TypeInformation getRecordType() { + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + @Override + public DataStreamSink consumeDataStream(DataStream> stream) { + // add sink + return stream + .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) + .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) + .setParallelism(1); + } + + @Override + public TupleTypeInfo> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } +} diff --git a/flink/pom.xml b/flink/pom.xml index 833c068a8c6..227c9d6d917 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -38,6 +38,7 @@ flink1.15-shims flink1.16-shims flink1.17-shims + flink1.20-shims @@ -46,7 +47,7 @@ 1.15.1 1.16.0 1.17.1 - + 1.20.2 2.12.7 2.12 diff --git a/testing/env_python_3_with_flink_120.yml b/testing/env_python_3_with_flink_120.yml new file mode 100644 index 00000000000..ac5f3538c74 --- /dev/null +++ b/testing/env_python_3_with_flink_120.yml @@ -0,0 +1,30 @@ +name: python_3_with_flink +channels: + - conda-forge + - defaults +dependencies: + - pycodestyle + - scipy + - numpy=1.19.5 + - grpcio + - pandasql + - ipython + - ipython_genutils + - ipykernel + - jupyter_client=5 + - hvplot + - holoviews=1.16 + - plotnine + - seaborn + - intake + - intake-parquet + - intake-xarray + - altair + - vega_datasets + - plotly + - jinja2=3.0.3 + - pip + - pip: + - apache-flink==1.20.2 + - protobuf==3.20.* +