Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ jobs:
${{ runner.os }}-zeppelin-
- name: install environment
run: |
./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-117 ${MAVEN_ARGS}
./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-120 ${MAVEN_ARGS}
./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
- name: Setup conda environment with python 3.9 and R
uses: conda-incubator/setup-miniconda@v3
Expand All @@ -239,7 +239,7 @@ jobs:
fail-fast: false
matrix:
python: [ 3.9 ]
flink: [116, 117]
flink: [116, 117, 120]
include:
# Flink 1.15 supports Python 3.6, 3.7, and 3.8
# https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/
Expand Down
1 change: 1 addition & 0 deletions flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Currently, it has the following modules clustered into two groups:
* flink1.15-shims
* flink1.16-shims
* flink1.17-shims
* flink1.20-shims

* flink-scala-2.12

Expand Down
42 changes: 41 additions & 1 deletion flink/flink-scala-2.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

<properties>
<!--library versions-->
<flink.version>${flink1.17.version}</flink.version>
<flink.version>${flink1.20.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
<flink.scala.compile.version>${flink.scala.version}</flink.scala.compile.version>
Expand Down Expand Up @@ -71,6 +71,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.20-shims</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-python</artifactId>
Expand Down Expand Up @@ -1300,5 +1306,39 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-120</id>
<properties>
<flink.version>${flink1.20.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@

package org.apache.zeppelin.flink

import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.nio.file.Files
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.jar.JarFile

import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.{ExecutionEnvironmentFactory, ExecutionEnvironment => JExecutionEnvironment}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration._
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironmentFactory, StreamExecutionEnvironment => JStreamExecutionEnvironment}
import org.apache.flink.api.java.{ExecutionEnvironmentFactory, ExecutionEnvironment => JExecutionEnvironment}
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironmentFactory, StreamExecutionEnvironment => JStreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment}
Expand All @@ -46,19 +39,24 @@ import org.apache.flink.table.module.hive.HiveModule
import org.apache.flink.yarn.cli.FlinkYarnSessionCli
import org.apache.zeppelin.conf.ZeppelinConfiguration
import org.apache.zeppelin.dep.DependencyResolver
import org.apache.zeppelin.flink.internal.FlinkShell
import org.apache.zeppelin.flink.internal.FlinkShell._
import org.apache.zeppelin.flink.internal.FlinkILoop
import org.apache.zeppelin.flink.internal.FlinkShell._
import org.apache.zeppelin.interpreter.Interpreter.FormType
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterHookRegistry, InterpreterResult}
import org.slf4j.{Logger, LoggerFactory}

import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.nio.file.Files
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.jar.JarFile
import scala.collection.JavaConversions
import scala.collection.JavaConverters._
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{Completion, IMain, IR, JPrintWriter, Results, SimpleReader}
import scala.tools.nsc.interpreter.{Completion, IMain, IR, JPrintWriter, SimpleReader}

/**
* It instantiate flink scala shell and create env, senv, btenv, stenv.
Expand Down Expand Up @@ -211,8 +209,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
Some(ensureYarnConfig(config)
.copy(name = Some(appName))))

val slotNum = Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots",
properties.getProperty("flink.tm.slot", "1")))
val slotNum = Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots",
properties.getProperty("flink.tm.slot", "1")))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(slots = Some(slotNum))))
Expand Down Expand Up @@ -426,39 +424,103 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
}

private def createTableEnvs(): Unit = {
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader)
val tableConfig = new TableConfig
tableConfig.getConfiguration.addAll(configuration)

this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims,
this.benv, this.senv, tableConfig, this.userJars.map(new URL(_)).asJava)

// blink planner
val btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
.asInstanceOf[EnvironmentSettings.Builder]
.inBatchMode()
.build()
this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkScalaShellLoader);
flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
this.java_btenv = this.btenv

val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
.asInstanceOf[EnvironmentSettings.Builder]
.inStreamingMode()
.build()
this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader)
flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader)

if (!flinkVersion.isAfterFlink114()) {
// flink planner is not supported after flink 1.14
this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
if (flinkVersion.getMajorVersion == 1 && flinkVersion.getMinorVersion <= 17) {
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
LOGGER.warn("Executing Table envs for 117")
Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader)
val tableConfig = new TableConfig
tableConfig.getConfiguration.addAll(configuration)

this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims,
this.benv, this.senv, tableConfig, this.userJars.map(new URL(_)).asJava)

// blink planner
val btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
.asInstanceOf[EnvironmentSettings.Builder]
.inBatchMode()
.build()
this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkScalaShellLoader);
flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient"))
this.java_btenv = this.btenv

val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
.asInstanceOf[EnvironmentSettings.Builder]
.inStreamingMode()
.build()
this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader)
flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient"))
this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader)

if (!flinkVersion.isAfterFlink114()) {
// flink planner is not supported after flink 1.14
this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment()
}
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
}
else {
val originalCl = Thread.currentThread().getContextClassLoader
try {
val shellCl = getFlinkScalaShellLoader
Thread.currentThread().setContextClassLoader(shellCl)
// Create settings (no Blink planner in 1.20; this is the unified planner)
val btSettings = EnvironmentSettings.newInstance()
.inBatchMode()
.build()

val stSettings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build()

// --- Batch TableEnvironment (unified, not Blink/Java/Scala-specific) ---
// NOTE: TableConfig ctor is deprecated/removed; configure AFTER creation.
val btEnv = org.apache.flink.table.api.TableEnvironment.create(btSettings)
btEnv.getConfig.getConfiguration.addAll(configuration)
this.btenv = btEnv
this.java_btenv = btEnv // single impl for both Java/Scala APIs
flinkILoop.bind("btenv",
classOf[org.apache.flink.table.api.TableEnvironment].getCanonicalName,
btEnv,
List("@transient"))

// --- Streaming TableEnvironment (Scala & Java bridges) ---
// Scala bridge
val stEnv =
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment.create(this.senv, stSettings)
stEnv.getConfig.getConfiguration.addAll(configuration)
this.stenv = stEnv
flinkILoop.bind("stenv",
classOf[org.apache.flink.table.api.bridge.scala.StreamTableEnvironment].getCanonicalName,
stEnv,
List("@transient"))


// Java StreamExecutionEnvironment from the Scala one
val jStreamEnv: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment =
this.senv.getJavaEnv // in Scala this accessor is available as `javaEnv`

// Java bridge
this.java_stenv =
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(jStreamEnv, stSettings)



// --- (Optional) If you previously injected user jars via a custom loader,
// consider also setting PipelineOptions.JARS so jobs carry them when submitted:
// import org.apache.flink.configuration.PipelineOptions
// configuration.set(PipelineOptions.JARS, this.userJars.asJava)
// If you do this, set it BEFORE creating the envs so it takes effect.

// No Flink planner (pre-1.14) fallback in 1.20+
this.btenv_2 = null
this.java_btenv_2 = null

} finally {
Thread.currentThread().setContextClassLoader(originalCl)
}
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
}

Expand Down Expand Up @@ -519,7 +581,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
// -6 because of .class
var className = je.getName.substring(0, je.getName.length - 6)
className = className.replace('/', '.')
if (udfPackages.isEmpty || udfPackages.exists( p => className.startsWith(p))) {
if (udfPackages.isEmpty || udfPackages.exists(p => className.startsWith(p))) {
val c = cl.loadClass(className)
val udf = c.newInstance()
if (udf.isInstanceOf[ScalarFunction]) {
Expand All @@ -539,7 +601,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
}
}
} catch {
case e : Throwable =>
case e: Throwable =>
LOGGER.info("Fail to inspect udf class: " + je.getName, e)
}
}
Expand Down Expand Up @@ -592,19 +654,61 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
* This is just a workaround to make table api work in multiple threads.
*/
def createPlannerAgain(): Unit = {
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader)
val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
.asInstanceOf[EnvironmentSettings.Builder]
.inStreamingMode()
.build()
this.tblEnvFactory.createStreamPlanner(stEnvSetting)
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
if (flinkVersion.getMajorVersion == 1 && flinkVersion.getMinorVersion <= 17) {
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader)
val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder()
.asInstanceOf[EnvironmentSettings.Builder]
.inStreamingMode()
.build()
this.tblEnvFactory.createStreamPlanner(stEnvSetting)
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
} else {
val originalCl = Thread.currentThread().getContextClassLoader
try {
val shellCl = getFlinkScalaShellLoader
Thread.currentThread().setContextClassLoader(shellCl)

val stSettings = org.apache.flink.table.api.EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()

// Recreate Scala StreamTableEnvironment
val newScalaStEnv =
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment.create(this.senv, stSettings)
newScalaStEnv.getConfig.getConfiguration.addAll(configuration)
this.stenv = newScalaStEnv
flinkILoop.bind(
"stenv",
classOf[org.apache.flink.table.api.bridge.scala.StreamTableEnvironment].getCanonicalName,
newScalaStEnv,
List("@transient")
)

// Recreate Java StreamTableEnvironment (from the Scala env’s Java backing env)
val jEnv: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment = this.senv.getJavaEnv
val newJavaStEnv =
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(jEnv, stSettings)
newJavaStEnv.getConfig.getConfiguration.addAll(configuration)
this.java_stenv = newJavaStEnv

// If you also need a (batch) TableEnvironment, rebuild it similarly:
// val btSettings = EnvironmentSettings.newInstance().inBatchMode().build()
// this.btenv = org.apache.flink.table.api.TableEnvironment.create(btSettings)
// this.btenv.getConfig.getConfiguration.addAll(configuration)
// this.java_btenv = this.btenv

} finally {
Thread.currentThread().setContextClassLoader(originalCl)
}
}
}


def interpret(code: String, context: InterpreterContext): InterpreterResult = {
val originalStdOut = System.out
val originalStdErr = System.err;
Expand Down Expand Up @@ -671,7 +775,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
// flink 1.12 use copied version of configuration, so in order to update configuration we have to
// get the internal configuration of StreamExecutionEnvironment.
val internalConfiguration = getConfigurationOfStreamExecutionEnv()
if (!StringUtils.isBlank(savepointPath) && resumeFromSavepoint){
if (!StringUtils.isBlank(savepointPath) && resumeFromSavepoint) {
LOGGER.info("Resume job from savepoint , savepointPath = {}", savepointPath)
internalConfiguration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), savepointPath);
return
Expand Down Expand Up @@ -913,7 +1017,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
}
}

def getUserJars:java.util.List[String] = JavaConversions.seqAsJavaList(userJars)
def getUserJars: java.util.List[String] = JavaConversions.seqAsJavaList(userJars)

def completion(buf: String, cursor: Int, context: InterpreterContext): java.util.List[InterpreterCompletion]

Expand Down
Loading
Loading