Skip to content

[FLINK-38729] Add support for Flink 2.2.0#4294

Draft
lvyanquan wants to merge 1 commit intomasterfrom
FLINK-38729-2
Draft

[FLINK-38729] Add support for Flink 2.2.0#4294
lvyanquan wants to merge 1 commit intomasterfrom
FLINK-38729-2

Conversation

@lvyanquan
Copy link
Contributor

@lvyanquan lvyanquan commented Mar 2, 2026

Introduction

Building upon the foundation where existing modules continue to use Flink 1.20 dependencies, support for Flink 2.x versions is provided through newly added modules.

Development Plan

I plan to complete full support for Flink 2.x versions through three steps:

  1. The first step is to provide support for Flink 2.x versions in the common/runtime/composer modules,
    and perform integration tests and end-to-end tests on these modules based on a simple values pipeline
    connector to verify correctness.
  2. The second step is to implement a MySQL Pipeline connector that supports Flink 2.x versions, as it is
    our most commonly used CDC connector.
  3. The third step is to add support for Flink 2.x versions to existing source/pipeline connectors, if
    feasible.

This PR will complete the work of the first step.

Topics for Discussion

1. Module Design

Question: Is it necessary to design each module with a structure consisting of a common module, a module with 1.x API, and a module with 2.x API, as Paimon does?
My Answer: This would require creating three modules for every module in the project. I think this introduces too many additional modules. Therefore, I will keep existing modules' dependency on Flink 1.x unchanged, and only add new modules that depend on Flink 2.x. I will rewrite classes that depend on the new API, and use the shade plugin to reduce the number of classes that need to be rewritten in the new modules.

2. Test Coverage

Question: Is it necessary to add tests equivalent to those in the 1.x modules for each newly added 2.x module?
My Answer: This is a difficult decision point. Adding sufficient tests can guarantee the correctness and reliability of 2.x modules, but it would introduce a large amount of duplicate code and also increase the time required for CI runs. To avoid the burden of review, I have only added composer tests and e2e tests in this PR to ensure that the support for Flink 2.x is functional. I plan to add more complete tests in subsequent PRs (if necessary).

The above lists the points that I still consider uncertain during the implementation of this PR. Discussions are welcome.

Copy link
Member

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just investigated the implementation in apache/fluss#1176. Seems instead of creating a replica for each file that uses incompatible API (between Flink 1.x and 2.x), Hongshun introduces a compatible layer in fluss-flink-common package and puts all incompatible API usages inside, and dispatch it in fluss-flink-1.x and fluss-flink-2.x. I think if we took this approach, the only module that needs to be versioned is flink-cdc-common.

The obvious advantage is we don't need so much code duplication. This PR adds ~20k sloc code (mostly duplicate, and it's hard to review what's changed) while Fluss support is merely +856 -36. Though two codebase could not be compared directly, such huge difference is not negligible. This design also causes maintenance issue, as any following PR modifying related file must remember updating both 1.x and 2.x implementation and keep them in-sync.

I'm not strongly against the approach used in this PR, just wondering if the alternative solution is possible / why it's impossible.

Comment on lines +19 to +22
# Setup FLINK_HOME
args=("$@")
# Check if FLINK_HOME is set in command-line arguments by "--flink-home"
for ((i=0; i < ${#args[@]}; i++)); do
Copy link
Member

@yuxiqian yuxiqian Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't need two flink-cdc-dist module. Can we just ship both (1.x and 2.x) jars into dist and use a command line tool to decide which version to choose, or use ./bin/flink --version to detect it automatically?

@macdoor
Copy link

macdoor commented Mar 5, 2026

Hi @yuxiqian,

We've been working on Flink 2.2 compatibility in a fork and have a working implementation following the approach referenced in fluss#1176 — i.e., introducing a version-specific compat module with runtime bridges.

Our branch: https://github.com/macdoor/flink-cdc/tree/feature/opengauss-flink22-compat

Key changes we made to get OpenGauss → Paimon pipelines running on Flink 2.2:

flink-cdc-flink-compat module — two sub-modules: flink-cdc-flink-compat-flink1 (Flink 1.x bridge with SinkFunction/SourceFunction) and flink-cdc-flink-compat-flink2 (Flink 2.x stub classes: Sink$InitContext, CatalogFactory, Catalog).
DataSinkWriterOperator — reflection-based wrapping of SinkWriterOperator to find compatible constructors; fixed getSubtaskIndexCompat() to use getTaskInfo().getIndexOfThisSubtask() (previously returned hardcoded 0, breaking SchemaCoordinator flush synchronization).
DataSinkTranslator — getMethods() (not getDeclaredMethods()) to detect two-phase commit across superclasses; SupportsCommitter adapter proxy (serializable) for sinks that declare createCommitter() without implementing the interface.
Serializers — added resolveSchemaCompatibility(TypeSerializerSnapshot) (Flink 2.x new abstract method) to all custom TypeSerializerSnapshot implementations without @OverRide, so they compile against both 1.x and 2.x.
SourceSplitSerializer — reflection-based LogicalTypeParser.parse() to handle the removed single-arg overload.
PreCommitOperator / schema operators — replaced getRuntimeContext().getIndexOfThisSubtask() with getRuntimeContext().getTaskInfo().getIndexOfThisSubtask().

The pipeline now runs end-to-end on Flink 2.2.0 with a standalone session cluster (OpenGauss source → Paimon sink, all 4 operator stages visible including Sink Committer).

Happy to share details, open a draft PR, or contribute directly to this effort. Let us know what would be most helpful!

@lvyanquan
Copy link
Contributor Author

Hi @macdoor, I pulled your branch, but the code in this branch doesn't seem to be complete (it doesn't even compile). Did I miss something? Of course, your implementation in the PR is more concise. If you can get it to compile and run on Flink 2.2, feel free to submit a PR first so we can verify that the tests pass.

@macdoor
Copy link

macdoor commented Mar 6, 2026

Hi @lvyanquan,

Thanks for trying the branch. We’ve since opened a dedicated PR with a complete, compilable implementation:
PR: #4307feat: Flink 2.2 compatibility and dual-dist packaging

  • Build: two-step Maven commands (Java 17) to produce both flink-cdc-dist--1.20.jar and flink-cdc-dist--2.2.jar
  • Deployment: where to put each dist JAR and how to reuse the same connector JARs on Flink 1.20 and 2.2

You can clone/build from branch macdoor:feature/flink22-compat and run the described steps to verify; we’ve already run a full build and conflict resolution on that branch. If anything doesn’t work on your side, we can iterate in that PR.

@lvyanquan lvyanquan marked this pull request as draft March 6, 2026 11:41
@lvyanquan lvyanquan force-pushed the FLINK-38729-2 branch 2 times, most recently from 375a622 to 5b47571 Compare March 6, 2026 18:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants