Skip to content

Conversation

Neuw84
Copy link

@Neuw84 Neuw84 commented Aug 20, 2025

What is this PR for?

A few sentences describing the overall goals of the pull request's commits.
First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html

Add Flink 1.20 support for Apache Flink interpreter. Being the latest 1.X release and LTS one it is important to be able to support it. For example we can use latest Iceberg versions with it.

What type of PR is it?

Feature

Todos

What is the Jira issue?

https://issues.apache.org/jira/browse/ZEPPELIN-6284

How should this be tested?

  • Strongly recommended: add automated unit tests for any new or changed behavior
  • Outline any manual steps to test the PR here.

Screenshots (if appropriate)

Questions:

  • Does the license files need to update? No
  • Is there breaking changes for older versions? No
  • Does this needs documentation? No

@Neuw84
Copy link
Author

Neuw84 commented Aug 20, 2025

Based on previous work from 1.18 from these pull requests.

#4864
#4739

@jongyoul
Copy link
Member

@Neuw84 Thank you for your contribution. By the way, flink test is failing now. Could you please check and fix them?

@jongyoul jongyoul requested a review from Copilot August 21, 2025 05:48
Copilot

This comment was marked as outdated.

Angel Conde Manjon and others added 3 commits August 21, 2025 12:26
# Conflicts:
#	flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/Flink120Shims.java
#	flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/shims120/CollectStreamTableSink.java
@Neuw84
Copy link
Author

Neuw84 commented Aug 21, 2025

@jongyoul I have fixed Copilot comments and added the corresponding test file.

If all is good let me know if I need to clean the "commit log".

@jongyoul jongyoul requested a review from Copilot August 21, 2025 10:58
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for Apache Flink 1.20 interpreter to Zeppelin, bringing compatibility with the latest LTS release of Flink 1.x series.

Key changes include:

  • Creation of Flink 1.20 shims implementation with necessary adapters and utilities
  • Addition of Python environment configuration for Flink 1.20 testing
  • Updates to build configuration and CI workflows to include Flink 1.20 support

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
testing/env_python_3_with_flink_120.yml New conda environment configuration for Python 3 with Flink 1.20.0
flink/pom.xml Adds flink1.20-shims module and version property
flink/flink1.20-shims/ New module containing Flink 1.20 specific implementations and adapters
flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java Updates shim loader to recognize Flink 1.20
flink/flink-scala-2.12/pom.xml Updates default Flink version and adds Flink 1.20 profile
flink/README.md Updates documentation to include flink1.20-shims module
.github/workflows/core.yml Updates CI to test with Flink 1.20
Comments suppressed due to low confidence (2)

flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java:109

  • Using a Unicode character literal '\uea60' (60000) for milliseconds conversion is unclear and error-prone. Replace with the explicit constant 60000 or a named constant like MILLISECONDS_PER_MINUTE.
            formattedArray[i] =

flink/flink1.20-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java:110

  • Using a Unicode character literal '\uea60' (60000) for milliseconds conversion is unclear and error-prone. Replace with the explicit constant 60000 or a named constant like MILLISECONDS_PER_MINUTE.
                    formattedTimestamp(array.get(i), elementType, sessionTimeZone);

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

}

public static int localTimeToUnixDate(LocalTime time) {
return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000;
Copy link

Copilot AI Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a Unicode character literal '\uea60' (60000) for milliseconds conversion is unclear and error-prone. Replace with the explicit constant 60000 or a named constant like MILLISECONDS_PER_MINUTE.

Suggested change
return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000;
return time.getHour() * 3600000 + time.getMinute() * MILLISECONDS_PER_MINUTE + time.getSecond() * 1000 + time.getNano() / 1000000;

Copilot uses AI. Check for mistakes.

@Neuw84
Copy link
Author

Neuw84 commented Aug 21, 2025

Unmmm, looking at the docs I see this weird error. Seems like it is not being able to launch the cluster, but a little bit lost here.

 WARN [2025-08-21 11:04:10,021] ({flink-metrics-pekko.actor.supervisor-dispatcher-5} SupervisorActor.java[rpcActorFailed]:144) - RpcActor pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it down now.
java.lang.NullPointerException
	at org.slf4j.impl.Reload4jMDCAdapter.setContextMap(Reload4jMDCAdapter.java:81)
	at org.slf4j.MDC.setContextMap(MDC.java:264)
	at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
 WARN [2025-08-21 11:04:10,031] ({main} HadoopUtils.java[getHadoopConfiguration]:139) - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).

Any help is appreciated as the Exception do not provide much info....

Indeed the code that is present here is working at least for Scala and SQL interfaces ( checked with some Notebooks) with Java 11.

Hopefully this will fix the NPE with log4j.
@Neuw84
Copy link
Author

Neuw84 commented Aug 21, 2025

Seems that the error comes from a NPE Exception from Log4j.

Flink 1.20.2 seems to address the issue.

https://flink.apache.org/2025/07/10/apache-flink-1.20.2-release-announcement/?utm_source=chatgpt.com

@Neuw84
Copy link
Author

Neuw84 commented Aug 21, 2025

Log4j issue solved with upgrade to 1.20.2, now I am getting a fancy scala error with some version mismatch.

error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe

to reproduce them.

mvn -pl flink/flink-scala-2.12 -Pflink-120  clean install -DskipTests -am
mvn -pl flink/flink-scala-2.12 -Pflink-120 test

Tried to fix it but without luck yet.

Using a docker-compose setup to connect Zeppelin to a Flink cluster on 1.20.2 with this code work as expected ( Scala , SQL interfaces).

@jongyoul
Copy link
Member

@zjffdu Could you please help to fix the error if you could share your time to review it?

@Neuw84
Copy link
Author

Neuw84 commented Aug 21, 2025

After some research I wasn´t able to identify the offending library. Spend some time time trying to fix scala versions without luck via maven.

The code works well as said on my previous comment, should be a thing of scala-reflect/compiler used on the tests but no clue.

Copy link
Contributor

@Reamer Reamer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now using a newer JDK version and should also be using the new Java tools.

} else {
String flinkHome = System.getenv("FLINK_HOME");
if (flinkHome != null && !flinkHome.isBlank()) {
pyDir = Paths.get(flinkHome).resolve("opt").resolve("python");
Copy link
Contributor

@Reamer Reamer Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use Paths.resolve() in various places?
I would have shortened it to Paths.get(flinkHome, “opt”, "python").

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@Neuw84
Copy link
Author

Neuw84 commented Aug 22, 2025

After digging and digging with scala libraries I think that the error is not solvable.... I don't see any scala library collisions and tried forcing many versions.

Seems that they changed how you need to create the environments and that should updated on ScalaInterpreter.

We should have a codepath for 1.17 and below and one for 1.20 now.

Angel Conde Manjon added 2 commits August 22, 2025 15:46
File "/usr/share/miniconda/envs/python_3_with_flink/lib/python3.9/site-packages/google/protobuf/descriptor.py", line 796, in __new__
                                     _message.Message._CheckCalledFromGeneratedFile()
                                 TypeError: Descriptors cannot not be created directly.
                                 If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
                                 If you cannot immediately regenerate your protos, some other possible workarounds are:
                                  1. Downgrade the protobuf package to 3.20.x or lower.
                                  2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).
@pan3793
Copy link
Member

pan3793 commented Aug 25, 2025

now I am getting a fancy scala error with some version mismatch.

error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe

This is why I abandoned the previous PR ... recompiling Flink with a higher Scala version does not work either ... the key point here is FLINK-32560 added a @deprecated(since = "1.18.0") to trait ImplicitExpressionConversions, the issue will go away if we revert FLINK-32560

@pan3793
Copy link
Member

pan3793 commented Aug 25, 2025

FLINK-36327 replaces the @deprecated(since = "1.18.0") with Java's @Deprecated, you may want to try Flink 2.0

@A-Maniovich
Copy link

That would be great!

@A-Maniovich
Copy link

A-Maniovich commented Aug 27, 2025

I dont know if you have noticed or not, but the error isn't actually fatal. If you try to run the cell a second time it surprisingly will work without any problems.

error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe

@pan3793
Copy link
Member

pan3793 commented Aug 27, 2025

I dont know if you have noticed or not, but the error isn't actually fatal. If you try to run the cell a second time it surprisingly will work without any problems.

error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe

I also noticed that, but have no idea why, due to my limited knowledge of scala internal :(

@pan3793
Copy link
Member

pan3793 commented Aug 27, 2025

I think there are two directions now:

  1. try Flink 2.0 to see if FLINK-36327 helps
  2. eliminate usage of Flink's Scala API since it was deprecated by Flink for a long time.

@Neuw84
Copy link
Author

Neuw84 commented Aug 27, 2025

The problem is that Flink 2.0 deletes all scala APIs, that would mean that we need to rewrite the Flink interpreter and see what are the ideal options there. SQL interface ok, but then... python+java repl? It would need a huge rearchitecture....

I will try to spend more time on this (Flink 1.20) but I can't promise anything, any help would be appreciated. As it is know it works but I would like this to be merged to have at least LTS support before migrate to Flink 2.0...

@tbonelee tbonelee changed the title [ZeppelinZEPPELIN-6284] Added Flink 1.20 interpreter. [ZEPPELIN-6284] Added Flink 1.20 interpreter. Sep 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants