diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MaxComputeE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MaxComputeE2eITCase.java
index 59d966bc2af..24c72bd2ff1 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MaxComputeE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MaxComputeE2eITCase.java
@@ -19,13 +19,13 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.exceptions.ValidationException;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.table.api.ValidationException;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
index 76936b6987b..3de85c4d178 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.exceptions.ValidationException;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
@@ -27,7 +28,6 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.table.api.ValidationException;
import org.apache.hudi.common.model.HoodieTableType;
import org.assertj.core.api.Assertions;
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 641180580c7..fa3ecec7d20 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.exceptions.ValidationException;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
@@ -28,7 +29,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import com.github.dockerjava.api.DockerClient;
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
index 96173989c38..0d1ae70f674 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
@@ -19,6 +19,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.exceptions.ValidationException;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
@@ -28,7 +29,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.core.Version;
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-api/pom.xml b/flink-cdc-flink-compat/flink-cdc-flink-compat-api/pom.xml
new file mode 100644
index 00000000000..5bc997d783f
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-api/pom.xml
@@ -0,0 +1,50 @@
+
+
+
+
+ flink-cdc-flink-compat
+ org.apache.flink
+ ${revision}
+
+ 4.0.0
+
+ flink-cdc-flink-compat-api
+ API for Flink version-specific source/sink binding (Flink 1.x vs 2.x).
+
+
+
+ org.apache.flink
+ flink-cdc-common
+ ${project.version}
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
+
+
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-api/src/main/java/org/apache/flink/cdc/flink/compat/FlinkPipelineBridge.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-api/src/main/java/org/apache/flink/cdc/flink/compat/FlinkPipelineBridge.java
new file mode 100644
index 00000000000..5595491df34
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-api/src/main/java/org/apache/flink/cdc/flink/compat/FlinkPipelineBridge.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.flink.compat;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.sink.EventSinkProvider;
+import org.apache.flink.cdc.common.source.EventSourceProvider;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.function.Function;
+
+/**
+ * Version-specific bridge for binding {@link EventSourceProvider} / {@link EventSinkProvider} to
+ * Flink DataStream API.
+ *
+ * Flink 1.x implementation supports both Source/Sink V2 and legacy SourceFunction/SinkFunction.
+ * Flink 2.x implementation supports only Source/Sink V2.
+ *
+ *
Implementations are loaded via {@link java.util.ServiceLoader}; exactly one compat JAR
+ * (flink-cdc-flink-compat-flink1 or flink-cdc-flink-compat-flink2) must be on the classpath.
+ */
+@Internal
+public interface FlinkPipelineBridge {
+
+ /**
+ * Creates a {@link DataStreamSource} from the given {@link EventSourceProvider}.
+ *
+ * @param env execution environment
+ * @param eventSourceProvider provider (e.g. FlinkSourceProvider or FlinkSourceFunctionProvider
+ * on 1.x)
+ * @param sourceName name for the source operator
+ * @return the data stream source
+ * @throws IllegalStateException if the provider type is not supported by this bridge
+ */
+ DataStreamSource createDataStreamSource(
+ StreamExecutionEnvironment env,
+ EventSourceProvider eventSourceProvider,
+ String sourceName);
+
+ /**
+ * Adds the given {@link EventSinkProvider} as a sink to the input stream.
+ *
+ * @param input input event stream
+ * @param eventSinkProvider provider (e.g. FlinkSinkProvider or FlinkSinkFunctionProvider on
+ * 1.x)
+ * @param sinkName name for the sink operator
+ * @param isBatchMode whether the job is in batch mode
+ * @param schemaOperatorID schema operator ID for coordination
+ * @param uidGenerator generates operator UID from a suffix (e.g. "sink-writer" ->
+ * "prefix-sink-writer")
+ */
+ void sinkTo(
+ DataStream input,
+ EventSinkProvider eventSinkProvider,
+ String sinkName,
+ boolean isBatchMode,
+ OperatorID schemaOperatorID,
+ Function uidGenerator);
+}
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/pom.xml b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/pom.xml
new file mode 100644
index 00000000000..3fc361a7380
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/pom.xml
@@ -0,0 +1,60 @@
+
+
+
+
+ flink-cdc-flink-compat
+ org.apache.flink
+ ${revision}
+
+ 4.0.0
+
+ flink-cdc-flink-compat-flink1
+ Flink 1.x compatibility: SourceFunction/SinkFunction providers and operators.
+
+
+
+ org.apache.flink
+ flink-cdc-flink-compat-api
+ ${project.version}
+
+
+ org.apache.flink
+ flink-cdc-common
+ ${project.version}
+
+
+ org.apache.flink
+ flink-cdc-runtime
+ ${project.version}
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
+
+
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/FlinkSinkFunctionProvider.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/common/sink/FlinkSinkFunctionProvider.java
similarity index 92%
rename from flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/FlinkSinkFunctionProvider.java
rename to flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/common/sink/FlinkSinkFunctionProvider.java
index 5e907bef6cb..56e3ebf451d 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/FlinkSinkFunctionProvider.java
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/common/sink/FlinkSinkFunctionProvider.java
@@ -23,7 +23,8 @@
/**
* {@code FlinkSinkFunctionProvider} is used to provide a Flink {@link SinkFunction} for writing
- * events to external systems.
+ * events to external systems. This interface lives in the Flink 1.x compat module because
+ * SinkFunction was removed in Flink 2.x.
*/
@PublicEvolving
public interface FlinkSinkFunctionProvider extends EventSinkProvider {
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/FlinkSourceFunctionProvider.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/common/source/FlinkSourceFunctionProvider.java
similarity index 92%
rename from flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/FlinkSourceFunctionProvider.java
rename to flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/common/source/FlinkSourceFunctionProvider.java
index 5713aac21a9..d08f865c3f5 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/FlinkSourceFunctionProvider.java
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/common/source/FlinkSourceFunctionProvider.java
@@ -23,7 +23,8 @@
/**
* {@code FlinkSourceFunctionProvider} is used to provide a Flink {@link SourceFunction} for reading
- * events from external systems.
+ * events from external systems. This interface lives in the Flink 1.x compat module because
+ * SourceFunction was removed in Flink 2.x.
*/
@PublicEvolving
public interface FlinkSourceFunctionProvider extends EventSourceProvider {
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/flink/compat/flink1/Flink1PipelineBridge.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/flink/compat/flink1/Flink1PipelineBridge.java
new file mode 100644
index 00000000000..2d5d17f0370
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/flink/compat/flink1/Flink1PipelineBridge.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.flink.compat.flink1;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.sink.EventSinkProvider;
+import org.apache.flink.cdc.common.sink.FlinkSinkFunctionProvider;
+import org.apache.flink.cdc.common.source.EventSourceProvider;
+import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.flink.compat.FlinkPipelineBridge;
+import org.apache.flink.cdc.runtime.operators.sink.BatchDataSinkFunctionOperator;
+import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.function.Function;
+
+/** Flink 1.x implementation of {@link FlinkPipelineBridge}. */
+@Internal
+public class Flink1PipelineBridge implements FlinkPipelineBridge {
+
+ private static final String SINK_WRITER_PREFIX = "Sink Writer: ";
+
+ @Override
+ public DataStreamSource createDataStreamSource(
+ StreamExecutionEnvironment env,
+ EventSourceProvider eventSourceProvider,
+ String sourceName) {
+ if (eventSourceProvider instanceof FlinkSourceProvider) {
+ FlinkSourceProvider sourceProvider = (FlinkSourceProvider) eventSourceProvider;
+ return env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ sourceName,
+ new EventTypeInfo());
+ }
+ if (eventSourceProvider instanceof FlinkSourceFunctionProvider) {
+ FlinkSourceFunctionProvider sourceFunctionProvider =
+ (FlinkSourceFunctionProvider) eventSourceProvider;
+ DataStreamSource stream =
+ env.addSource(sourceFunctionProvider.getSourceFunction(), new EventTypeInfo());
+ stream.name(sourceName);
+ return stream;
+ }
+ throw new IllegalStateException(
+ String.format(
+ "Unsupported EventSourceProvider type \"%s\"",
+ eventSourceProvider.getClass().getCanonicalName()));
+ }
+
+ @Override
+ public void sinkTo(
+ DataStream input,
+ EventSinkProvider eventSinkProvider,
+ String sinkName,
+ boolean isBatchMode,
+ OperatorID schemaOperatorID,
+ Function uidGenerator) {
+ if (!(eventSinkProvider instanceof FlinkSinkFunctionProvider)) {
+ throw new IllegalStateException(
+ String.format(
+ "Unsupported EventSinkProvider type \"%s\" for SinkFunction path",
+ eventSinkProvider.getClass().getCanonicalName()));
+ }
+ FlinkSinkFunctionProvider sinkFunctionProvider =
+ (FlinkSinkFunctionProvider) eventSinkProvider;
+ SinkFunction sinkFunction = sinkFunctionProvider.getSinkFunction();
+
+ StreamSink sinkOperator;
+ if (isBatchMode) {
+ sinkOperator = new BatchDataSinkFunctionOperator(sinkFunction);
+ } else {
+ sinkOperator = new DataSinkFunctionOperator(sinkFunction, schemaOperatorID);
+ }
+ StreamExecutionEnvironment executionEnvironment = input.getExecutionEnvironment();
+ PhysicalTransformation transformation =
+ new LegacySinkTransformation<>(
+ input.getTransformation(),
+ SINK_WRITER_PREFIX + sinkName,
+ sinkOperator,
+ executionEnvironment.getParallelism(),
+ false);
+ String uid = (uidGenerator != null) ? uidGenerator.apply("sink-writer") : null;
+ if (uid != null) {
+ transformation.setUid(uid);
+ }
+ executionEnvironment.addOperator(transformation);
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkFunctionOperator.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkFunctionOperator.java
similarity index 82%
rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkFunctionOperator.java
rename to flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkFunctionOperator.java
index 560909006d3..1e5c3109803 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkFunctionOperator.java
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkFunctionOperator.java
@@ -21,23 +21,18 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
- * An operator that processes records to be written into a {@link SinkFunction} in batch mode.
- *
- * The operator is a proxy of {@link StreamSink} in Flink.
- *
- *
The operator is always part of a sink pipeline and is the first operator.
+ * Batch-mode operator that writes into a {@link SinkFunction}. Lives in the Flink 1.x compat module
+ * because SinkFunction was removed in Flink 2.x.
*/
@Internal
public class BatchDataSinkFunctionOperator extends StreamSink {
public BatchDataSinkFunctionOperator(SinkFunction userFunction) {
super(userFunction);
- this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java
similarity index 83%
rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java
rename to flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java
index df0e7802b27..76261373947 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java
@@ -30,7 +30,6 @@
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -41,13 +40,8 @@
import java.util.Set;
/**
- * An operator that processes records to be written into a {@link
- * org.apache.flink.streaming.api.functions.sink.SinkFunction}.
- *
- * The operator is a proxy of {@link org.apache.flink.streaming.api.operators.StreamSink} in
- * Flink.
- *
- *
The operator is always part of a sink pipeline and is the first operator.
+ * An operator that processes records to be written into a {@link SinkFunction}. Lives in the Flink
+ * 1.x compat module because SinkFunction was removed in Flink 2.x.
*/
@Internal
public class DataSinkFunctionOperator extends StreamSink {
@@ -55,14 +49,12 @@ public class DataSinkFunctionOperator extends StreamSink {
private SchemaEvolutionClient schemaEvolutionClient;
private final OperatorID schemaOperatorID;
- /** A set of {@link TableId} that already processed {@link CreateTableEvent}. */
private final Set processedTableIds;
public DataSinkFunctionOperator(SinkFunction userFunction, OperatorID schemaOperatorID) {
super(userFunction);
this.schemaOperatorID = schemaOperatorID;
processedTableIds = new HashSet<>();
- this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
@@ -88,23 +80,17 @@ public void processElement(StreamRecord element) throws Exception {
Event event = element.getValue();
try {
- // FlushEvent triggers flush
if (event instanceof FlushEvent) {
handleFlushEvent(((FlushEvent) event));
return;
}
- // CreateTableEvent marks the table as processed directly
if (event instanceof CreateTableEvent) {
processedTableIds.add(((CreateTableEvent) event).tableId());
super.processElement(element);
return;
}
- // Check if the table is processed before emitting all other events, because we have to
- // make
- // sure that sink have a view of the full schema before processing any change events,
- // including schema changes.
ChangeEvent changeEvent = (ChangeEvent) event;
if (!processedTableIds.contains(changeEvent.tableId())) {
emitLatestSchema(changeEvent.tableId());
@@ -117,7 +103,6 @@ public void processElement(StreamRecord element) throws Exception {
}
}
- // ----------------------------- Helper functions -------------------------------
private void handleFlushEvent(FlushEvent event) throws Exception {
userFunction.finish();
if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) {
@@ -141,8 +126,6 @@ private void handleFlushEvent(FlushEvent event) throws Exception {
private void emitLatestSchema(TableId tableId) throws Exception {
Optional schema = schemaEvolutionClient.getLatestEvolvedSchema(tableId);
if (schema.isPresent()) {
- // request and process CreateTableEvent because SinkFunction need to retrieve
- // Schema to deserialize RecordData after resuming job.
super.processElement(new StreamRecord<>(new CreateTableEvent(tableId, schema.get())));
processedTableIds.add(tableId);
} else {
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/resources/META-INF/services/org.apache.flink.cdc.flink.compat.FlinkPipelineBridge b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/resources/META-INF/services/org.apache.flink.cdc.flink.compat.FlinkPipelineBridge
new file mode 100644
index 00000000000..7b72cdc6ae1
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink1/src/main/resources/META-INF/services/org.apache.flink.cdc.flink.compat.FlinkPipelineBridge
@@ -0,0 +1 @@
+org.apache.flink.cdc.flink.compat.flink1.Flink1PipelineBridge
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/pom.xml b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/pom.xml
new file mode 100644
index 00000000000..a79c78c70aa
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/pom.xml
@@ -0,0 +1,60 @@
+
+
+
+
+ flink-cdc-flink-compat
+ org.apache.flink
+ ${revision}
+
+ 4.0.0
+
+ flink-cdc-flink-compat-flink2
+ Flink 2.x compatibility: Source/Sink V2 only (no SourceFunction/SinkFunction).
+
+
+
+ org.apache.flink
+ flink-cdc-flink-compat-api
+ ${project.version}
+
+
+ org.apache.flink
+ flink-cdc-common
+ ${project.version}
+
+
+ org.apache.flink
+ flink-cdc-runtime
+ ${project.version}
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
+
+
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/api/connector/sink2/Sink$InitContext.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/api/connector/sink2/Sink$InitContext.java
new file mode 100644
index 00000000000..b5060b995cf
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/api/connector/sink2/Sink$InitContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+/**
+ * Compatibility stub for {@code Sink.InitContext} which was removed in Flink 2.x. Sink connector
+ * classes compiled against Flink 1.20 still reference {@code Sink$InitContext} in their bytecode
+ * (via {@code createWriter(Sink.InitContext)} method signatures). Java serialization introspects
+ * all method parameter types via {@code getDeclaredMethods0()}, causing {@code
+ * NoClassDefFoundError} on Flink 2.x if this class is absent. This empty stub makes the class
+ * loadable without providing any actual implementation.
+ */
+// CHECKSTYLE.OFF: TypeName
+public interface Sink$InitContext extends InitContext {}
+// CHECKSTYLE.ON: TypeName
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/cdc/flink/compat/flink2/Flink2PipelineBridge.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/cdc/flink/compat/flink2/Flink2PipelineBridge.java
new file mode 100644
index 00000000000..840eafaebb5
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/cdc/flink/compat/flink2/Flink2PipelineBridge.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.flink.compat.flink2;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.sink.EventSinkProvider;
+import org.apache.flink.cdc.common.source.EventSourceProvider;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.flink.compat.FlinkPipelineBridge;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.function.Function;
+
+/**
+ * Flink 2.x implementation of {@link FlinkPipelineBridge}.
+ *
+ * Supports only Source/Sink V2 ({@link FlinkSourceProvider}, {@link FlinkSinkProvider}).
+ * SourceFunction and SinkFunction were removed in Flink 2.x; use compat-flink1 for Flink 1.x.
+ */
+@Internal
+public class Flink2PipelineBridge implements FlinkPipelineBridge {
+
+ private static final String UNSUPPORTED_SOURCE_MSG =
+ "Flink 2.x supports only FlinkSourceProvider (Source V2). "
+ + "SourceFunction was removed in Flink 2.x. Use flink-cdc-flink-compat-flink1 for Flink 1.x.";
+ private static final String UNSUPPORTED_SINK_MSG =
+ "Flink 2.x supports only FlinkSinkProvider (Sink V2). "
+ + "SinkFunction was removed in Flink 2.x. Use flink-cdc-flink-compat-flink1 for Flink 1.x.";
+
+ @Override
+ public DataStreamSource createDataStreamSource(
+ StreamExecutionEnvironment env,
+ EventSourceProvider eventSourceProvider,
+ String sourceName) {
+ if (eventSourceProvider instanceof FlinkSourceProvider) {
+ FlinkSourceProvider sourceProvider = (FlinkSourceProvider) eventSourceProvider;
+ return env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ sourceName,
+ new EventTypeInfo());
+ }
+ throw new UnsupportedOperationException(
+ UNSUPPORTED_SOURCE_MSG
+ + " Got: "
+ + (eventSourceProvider == null
+ ? "null"
+ : eventSourceProvider.getClass().getName()));
+ }
+
+ @Override
+ public void sinkTo(
+ DataStream input,
+ EventSinkProvider eventSinkProvider,
+ String sinkName,
+ boolean isBatchMode,
+ OperatorID schemaOperatorID,
+ Function uidGenerator) {
+ // In Flink 2.x only FlinkSinkProvider (Sink V2) is supported; the translator handles it
+ // directly. This path is only hit for legacy SinkFunction-based providers, which are
+ // unsupported in 2.x.
+ throw new UnsupportedOperationException(
+ UNSUPPORTED_SINK_MSG
+ + " Got: "
+ + (eventSinkProvider == null
+ ? "null"
+ : eventSinkProvider.getClass().getName()));
+ }
+}
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/table/catalog/Catalog.java
new file mode 100644
index 00000000000..4d1185f3fd4
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+/**
+ * Minimal compatibility stub for Flink's Catalog from older table APIs. It exists so that
+ * connectors compiled against Flink 1.x table modules (for example certain Paimon artifacts) can
+ * still be loaded on Flink 2.2 runtimes where this type was removed or relocated.
+ */
+// CHECKSTYLE.OFF: TypeName
+public interface Catalog {}
+// CHECKSTYLE.ON: TypeName
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/table/factories/CatalogFactory.java b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/table/factories/CatalogFactory.java
new file mode 100644
index 00000000000..128e4d85269
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/java/org/apache/flink/table/factories/CatalogFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+/**
+ * Minimal compatibility stub for Flink's CatalogFactory which existed in older Flink table APIs and
+ * is still referenced by some connector artifacts (for example Paimon compiled against Flink 1.x).
+ * This interface is intentionally empty and only restores the missing type so that such connectors
+ * can be loaded on Flink 2.2 runtimes.
+ */
+// CHECKSTYLE.OFF: TypeName
+public interface CatalogFactory {}
+// CHECKSTYLE.ON: TypeName
diff --git a/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/resources/META-INF/services/org.apache.flink.cdc.flink.compat.FlinkPipelineBridge b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/resources/META-INF/services/org.apache.flink.cdc.flink.compat.FlinkPipelineBridge
new file mode 100644
index 00000000000..71b7f1083c0
--- /dev/null
+++ b/flink-cdc-flink-compat/flink-cdc-flink-compat-flink2/src/main/resources/META-INF/services/org.apache.flink.cdc.flink.compat.FlinkPipelineBridge
@@ -0,0 +1 @@
+org.apache.flink.cdc.flink.compat.flink2.Flink2PipelineBridge
diff --git a/flink-cdc-flink-compat/pom.xml b/flink-cdc-flink-compat/pom.xml
new file mode 100644
index 00000000000..220fbc74ce1
--- /dev/null
+++ b/flink-cdc-flink-compat/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+
+ flink-cdc-parent
+ org.apache.flink
+ ${revision}
+
+ 4.0.0
+
+ flink-cdc-flink-compat
+ pom
+ Flink version compatibility layer: Flink 1.x (SourceFunction/SinkFunction) vs Flink 2.x (Source/Sink V2).
+
+
+ flink-cdc-flink-compat-api
+ flink-cdc-flink-compat-flink1
+ flink-cdc-flink-compat-flink2
+
+
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index 15c0571ba16..45f91afaf98 100755
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -97,7 +97,7 @@ public SchemaOperator(
@Override
public void open() throws Exception {
super.open();
- subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
upstreamSchemaTable = HashBasedTable.create();
evolvedSchemaMap = new HashMap<>();
tableIdRouter = new TableIdRouter(routingRules);
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
index df990425afc..dacd8aa7626 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
@@ -32,7 +32,6 @@
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -70,7 +69,6 @@ public class BatchSchemaOperator extends AbstractStreamOperator
public BatchSchemaOperator(
List routingRules, MetadataApplier metadataApplier, String timezone) {
- this.chainingStrategy = ChainingStrategy.ALWAYS;
this.timezone = timezone;
this.routingRules = routingRules;
this.metadataApplier = metadataApplier;
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperatorFactory.java
new file mode 100644
index 00000000000..b9fc87ff30c
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.regular;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+
+import java.util.List;
+
+/**
+ * Factory to create {@link BatchSchemaOperator} with chaining strategy set for Flink 2.x
+ * compatibility.
+ */
+@Internal
+public class BatchSchemaOperatorFactory extends SimpleOperatorFactory
+ implements OneInputStreamOperatorFactory {
+
+ public BatchSchemaOperatorFactory(
+ List routingRules, MetadataApplier metadataApplier, String timezone) {
+ super(new BatchSchemaOperator(routingRules, metadataApplier, timezone));
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
index 901ba168db0..276cd059ced 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
@@ -39,7 +39,6 @@
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -109,7 +108,6 @@ public SchemaOperator(
Duration rpcTimeOut,
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
- this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeout = rpcTimeOut;
this.schemaChangeBehavior = schemaChangeBehavior;
this.timezone = timezone;
@@ -131,7 +129,7 @@ public void open() throws Exception {
this.schemaOperatorMetrics =
new SchemaOperatorMetrics(
getRuntimeContext().getMetricGroup(), schemaChangeBehavior);
- this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
this.originalSchemaMap = new HashMap<>();
this.evolvedSchemaMap = new HashMap<>();
this.router = new TableIdRouter(routingRules);
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
index 630acc90463..550d3f9fec6 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
@@ -24,6 +24,7 @@
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -50,6 +51,7 @@ public SchemaOperatorFactory(
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
super(new SchemaOperator(routingRules, rpcTimeout, schemaChangeBehavior, timezone));
+ setChainingStrategy(ChainingStrategy.ALWAYS);
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
this.schemaChangeBehavior = schemaChangeBehavior;
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
index 52900fef740..9deeb63b21f 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/BatchDataSinkWriterOperator.java
@@ -30,7 +30,6 @@
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -41,6 +40,7 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
/**
* An operator that processes records to be written into a {@link Sink} in batch mode.
@@ -78,7 +78,6 @@ public BatchDataSinkWriterOperator(
this.sink = sink;
this.processingTimeService = processingTimeService;
this.mailboxExecutor = mailboxExecutor;
- this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
@@ -88,40 +87,43 @@ public void setup(
Output>> output) {
super.setup(containingTask, config, output);
flinkWriterOperator = createFlinkWriterOperator();
- this.>>getFlinkWriterOperator()
- .setup(containingTask, config, output);
+ invokeFlinkWriterOperatorMethod(
+ "setup",
+ new Class>[] {StreamTask.class, StreamConfig.class, Output.class},
+ containingTask,
+ config,
+ output);
}
@Override
public void open() throws Exception {
- this.>>getFlinkWriterOperator().open();
+ invokeFlinkWriterOperatorMethod("open", new Class>[0]);
copySinkWriter = getFieldValue("sinkWriter");
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
- this.>>getFlinkWriterOperator()
- .initializeState(context);
+ invokeFlinkWriterOperatorMethod(
+ "initializeState", new Class>[] {StateInitializationContext.class}, context);
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
- this.>>getFlinkWriterOperator()
- .snapshotState(context);
+ invokeFlinkWriterOperatorMethod(
+ "snapshotState", new Class>[] {StateSnapshotContext.class}, context);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
- this.>>getFlinkWriterOperator()
- .processWatermark(mark);
+ invokeFlinkWriterOperatorMethod("processWatermark", new Class>[] {Watermark.class}, mark);
}
@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
super.processWatermarkStatus(watermarkStatus);
- this.>>getFlinkWriterOperator()
- .processWatermarkStatus(watermarkStatus);
+ invokeFlinkWriterOperatorMethod(
+ "processWatermarkStatus", new Class>[] {WatermarkStatus.class}, watermarkStatus);
}
@Override
@@ -138,19 +140,18 @@ public void processElement(StreamRecord element) throws Exception {
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
- this.>>getFlinkWriterOperator()
- .prepareSnapshotPreBarrier(checkpointId);
+ invokeFlinkWriterOperatorMethod(
+ "prepareSnapshotPreBarrier", new Class>[] {long.class}, checkpointId);
}
@Override
public void close() throws Exception {
- this.>>getFlinkWriterOperator()
- .close();
+ invokeFlinkWriterOperatorMethod("close", new Class>[0]);
}
@Override
public void endInput() throws Exception {
- this.getFlinkWriterOperator().endInput();
+ invokeFlinkWriterOperatorMethod("endInput", new Class>[0]);
}
// ----------------------------- Helper functions -------------------------------
@@ -168,11 +169,68 @@ private Object createFlinkWriterOperator() {
.getUserCodeClassLoader()
.loadClass(
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
- Constructor> constructor =
- flinkWriterClass.getDeclaredConstructor(
- Sink.class, ProcessingTimeService.class, MailboxExecutor.class);
- constructor.setAccessible(true);
- return constructor.newInstance(sink, processingTimeService, mailboxExecutor);
+
+ // Try to find a constructor whose first three parameters are compatible with
+ // (Sink, ProcessingTimeService, MailboxExecutor). This makes the code resilient
+ // against Flink 2.x adding extra parameters to the constructor.
+ Constructor> target = null;
+ for (Constructor> c : flinkWriterClass.getDeclaredConstructors()) {
+ Class>[] p = c.getParameterTypes();
+ if (p.length >= 3
+ && Sink.class.isAssignableFrom(p[0])
+ && ProcessingTimeService.class.isAssignableFrom(p[1])
+ && MailboxExecutor.class.isAssignableFrom(p[2])) {
+ target = c;
+ break;
+ }
+ }
+
+ if (target == null) {
+ // Fallback: use the first declared constructor and best-effort argument mapping
+ // below. This covers Flink 2.2 where the constructor signature may have changed.
+ Constructor>[] all = flinkWriterClass.getDeclaredConstructors();
+ if (all.length == 0) {
+ throw new RuntimeException(
+ "No constructors found on SinkWriterOperator in Flink runtime");
+ }
+ target = all[0];
+ }
+
+ target.setAccessible(true);
+ Class>[] paramTypes = target.getParameterTypes();
+ Object[] args = new Object[paramTypes.length];
+ for (int i = 0; i < paramTypes.length; i++) {
+ Class> t = paramTypes[i];
+ if (Sink.class.isAssignableFrom(t)) {
+ args[i] = sink;
+ } else if (ProcessingTimeService.class.isAssignableFrom(t)) {
+ args[i] = processingTimeService;
+ } else if (MailboxExecutor.class.isAssignableFrom(t)) {
+ args[i] = mailboxExecutor;
+ } else if (t.isPrimitive()) {
+ if (t == boolean.class) {
+ args[i] = false;
+ } else if (t == char.class) {
+ args[i] = '\0';
+ } else if (t == byte.class) {
+ args[i] = (byte) 0;
+ } else if (t == short.class) {
+ args[i] = (short) 0;
+ } else if (t == int.class) {
+ args[i] = 0;
+ } else if (t == long.class) {
+ args[i] = 0L;
+ } else if (t == float.class) {
+ args[i] = 0.0f;
+ } else if (t == double.class) {
+ args[i] = 0.0d;
+ }
+ } else {
+ // Best effort: pass null for any extra, unknown parameters.
+ args[i] = null;
+ }
+ }
+ return target.newInstance(args);
} catch (Exception e) {
throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e);
}
@@ -204,4 +262,24 @@ private T getFieldValue(String fieldName) throws IllegalAccessException {
private T getFlinkWriterOperator() {
return (T) flinkWriterOperator;
}
+
+ private void invokeFlinkWriterOperatorMethod(
+ String methodName, Class>[] parameterTypes, Object... args) {
+ try {
+ Method m = flinkWriterOperator.getClass().getDeclaredMethod(methodName, parameterTypes);
+ m.setAccessible(true);
+ m.invoke(flinkWriterOperator, args);
+ } catch (NoSuchMethodException e) {
+ // Method does not exist in this Flink version (for example, open() signature changed);
+ // ignore for compatibility.
+ return;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to invoke method "
+ + methodName
+ + " on wrapped flink writer operator "
+ + flinkWriterOperator.getClass().getName(),
+ e);
+ }
+ }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index 0432d1f54cf..e24b077ca03 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -36,7 +36,6 @@
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -47,6 +46,7 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
@@ -97,7 +97,6 @@ public DataSinkWriterOperator(
this.mailboxExecutor = mailboxExecutor;
this.schemaOperatorID = schemaOperatorID;
this.processedTableIds = new HashSet<>();
- this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
@@ -107,8 +106,14 @@ public void setup(
Output>> output) {
super.setup(containingTask, config, output);
flinkWriterOperator = createFlinkWriterOperator();
- this.>>getFlinkWriterOperator()
- .setup(containingTask, config, output);
+ // Call protected AbstractStreamOperator.setup(...) on the wrapped operator via reflection
+ // to avoid JVM verifier issues with protected access across packages.
+ invokeFlinkWriterOperatorMethod(
+ "setup",
+ new Class>[] {StreamTask.class, StreamConfig.class, Output.class},
+ containingTask,
+ config,
+ output);
schemaEvolutionClient =
new SchemaEvolutionClient(
containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
@@ -117,35 +122,82 @@ public void setup(
@Override
public void open() throws Exception {
- this.>>getFlinkWriterOperator().open();
+ invokeFlinkWriterOperatorMethod("open", new Class>[0]);
copySinkWriter = getFieldValue("sinkWriter");
+ ensureEmitDownstream();
+ }
+
+ /**
+ * Flink 2.2 {@code SinkWriterOperator} sets {@code emitDownstream = sink instanceof
+ * SupportsCommitter}. Sinks like Paimon implement the older {@code TwoPhaseCommittingSink}
+ * rather than Flink 2.x {@code SupportsCommitter}, so committables are silently discarded. This
+ * method forces the flag to {@code true} and fills in the committable serializer so that the
+ * wrapped operator properly emits committables to the downstream committer.
+ */
+ private void ensureEmitDownstream() {
+ try {
+ Field emitField = findField(flinkWriterOperator.getClass(), "emitDownstream");
+ if (emitField == null) {
+ return;
+ }
+ emitField.setAccessible(true);
+ if (!emitField.getBoolean(flinkWriterOperator)) {
+ emitField.setBoolean(flinkWriterOperator, true);
+
+ Field serField = findField(flinkWriterOperator.getClass(), "committableSerializer");
+ if (serField != null) {
+ serField.setAccessible(true);
+ if (serField.get(flinkWriterOperator) == null) {
+ try {
+ Method getSerializer =
+ sink.getClass().getMethod("getCommittableSerializer");
+ getSerializer.setAccessible(true);
+ serField.set(flinkWriterOperator, getSerializer.invoke(sink));
+ } catch (NoSuchMethodException ignored) {
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not force emitDownstream on wrapped SinkWriterOperator", e);
+ }
+ }
+
+ private static Field findField(Class> clazz, String name) {
+ while (clazz != null) {
+ try {
+ return clazz.getDeclaredField(name);
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ return null;
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
- schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
- this.>>getFlinkWriterOperator()
- .initializeState(context);
+ schemaEvolutionClient.registerSubtask(getSubtaskIndexCompat());
+ invokeFlinkWriterOperatorMethod(
+ "initializeState", new Class>[] {StateInitializationContext.class}, context);
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
- this.>>getFlinkWriterOperator()
- .snapshotState(context);
+ invokeFlinkWriterOperatorMethod(
+ "snapshotState", new Class>[] {StateSnapshotContext.class}, context);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
- this.>>getFlinkWriterOperator()
- .processWatermark(mark);
+ invokeFlinkWriterOperatorMethod("processWatermark", new Class>[] {Watermark.class}, mark);
}
@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
super.processWatermarkStatus(watermarkStatus);
- this.>>getFlinkWriterOperator()
- .processWatermarkStatus(watermarkStatus);
+ invokeFlinkWriterOperatorMethod(
+ "processWatermarkStatus", new Class>[] {WatermarkStatus.class}, watermarkStatus);
}
@Override
@@ -188,19 +240,18 @@ public void processElement(StreamRecord element) throws Exception {
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
- this.>>getFlinkWriterOperator()
- .prepareSnapshotPreBarrier(checkpointId);
+ invokeFlinkWriterOperatorMethod(
+ "prepareSnapshotPreBarrier", new Class>[] {long.class}, checkpointId);
}
@Override
public void close() throws Exception {
- this.>>getFlinkWriterOperator()
- .close();
+ invokeFlinkWriterOperatorMethod("close", new Class>[0]);
}
@Override
public void endInput() throws Exception {
- this.getFlinkWriterOperator().endInput();
+ invokeFlinkWriterOperatorMethod("endInput", new Class>[0]);
}
// ----------------------------- Helper functions -------------------------------
@@ -223,7 +274,7 @@ private void handleFlushEvent(FlushEvent event) throws Exception {
});
}
schemaEvolutionClient.notifyFlushSuccess(
- getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
+ getSubtaskIndexCompat(), event.getSourceSubTaskId());
}
private void emitLatestSchema(TableId tableId) throws Exception {
@@ -250,16 +301,106 @@ private Object createFlinkWriterOperator() {
.getUserCodeClassLoader()
.loadClass(
"org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator");
- Constructor> constructor =
- flinkWriterClass.getDeclaredConstructor(
- Sink.class, ProcessingTimeService.class, MailboxExecutor.class);
- constructor.setAccessible(true);
- return constructor.newInstance(sink, processingTimeService, mailboxExecutor);
+
+ // Try to find a constructor whose first three parameters are compatible with
+ // (Sink, ProcessingTimeService, MailboxExecutor). This makes the code resilient
+ // against Flink 2.x adding extra parameters to the constructor.
+ Constructor> target = null;
+ for (Constructor> c : flinkWriterClass.getDeclaredConstructors()) {
+ Class>[] p = c.getParameterTypes();
+ if (p.length >= 3
+ && Sink.class.isAssignableFrom(p[0])
+ && ProcessingTimeService.class.isAssignableFrom(p[1])
+ && MailboxExecutor.class.isAssignableFrom(p[2])) {
+ target = c;
+ break;
+ }
+ }
+
+ if (target == null) {
+ // Fallback: use the first declared constructor and best-effort argument mapping
+ // below. This covers Flink 2.2 where the constructor signature may have changed.
+ Constructor>[] all = flinkWriterClass.getDeclaredConstructors();
+ if (all.length == 0) {
+ throw new RuntimeException(
+ "No constructors found on SinkWriterOperator in Flink runtime");
+ }
+ target = all[0];
+ }
+
+ target.setAccessible(true);
+ Class>[] paramTypes = target.getParameterTypes();
+ Object[] args = new Object[paramTypes.length];
+ for (int i = 0; i < paramTypes.length; i++) {
+ Class> t = paramTypes[i];
+ if (Sink.class.isAssignableFrom(t)) {
+ args[i] = sink;
+ } else if (ProcessingTimeService.class.isAssignableFrom(t)) {
+ args[i] = processingTimeService;
+ } else if (MailboxExecutor.class.isAssignableFrom(t)) {
+ args[i] = mailboxExecutor;
+ } else if (t.isPrimitive()) {
+ if (t == boolean.class) {
+ args[i] = false;
+ } else if (t == char.class) {
+ args[i] = '\0';
+ } else if (t == byte.class) {
+ args[i] = (byte) 0;
+ } else if (t == short.class) {
+ args[i] = (short) 0;
+ } else if (t == int.class) {
+ args[i] = 0;
+ } else if (t == long.class) {
+ args[i] = 0L;
+ } else if (t == float.class) {
+ args[i] = 0.0f;
+ } else if (t == double.class) {
+ args[i] = 0.0d;
+ }
+ } else {
+ // Best effort: pass null for any extra, unknown parameters.
+ args[i] = null;
+ }
+ }
+ return target.newInstance(args);
} catch (Exception e) {
throw new RuntimeException("Failed to create SinkWriterOperator in Flink", e);
}
}
+ /**
+ * Obtains the subtask index in a way that is compatible with both Flink 1.x and 2.x.
+ *
+ * Flink 2.x removed {@code StreamingRuntimeContext.getIndexOfThisSubtask()}. The correct
+ * replacement is {@code getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()}.
+ *
+ *
IMPORTANT: Returning a wrong (fixed) index here causes ALL sink subtasks to report
+ * the same ID to SchemaCoordinator. The coordinator then waits forever for the missing subtask
+ * IDs, causing a 3-minute timeout in SchemaOperator.
+ */
+ private int getSubtaskIndexCompat() {
+ try {
+ Object ctx = getRuntimeContext();
+ // Flink 2.x: getTaskInfo().getIndexOfThisSubtask()
+ try {
+ Method getTaskInfo = ctx.getClass().getMethod("getTaskInfo");
+ getTaskInfo.setAccessible(true);
+ Object taskInfo = getTaskInfo.invoke(ctx);
+ Method getIndex = taskInfo.getClass().getMethod("getIndexOfThisSubtask");
+ getIndex.setAccessible(true);
+ return (Integer) getIndex.invoke(taskInfo);
+ } catch (NoSuchMethodException ignored) {
+ // fall through to Flink 1.x path
+ }
+ // Flink 1.x: direct getIndexOfThisSubtask()
+ Method m = ctx.getClass().getMethod("getIndexOfThisSubtask");
+ m.setAccessible(true);
+ return (Integer) m.invoke(ctx);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to obtain subtask index from RuntimeContext", e);
+ }
+ }
+
/**
* Finds a field by name from its declaring class. This also searches for the field in super
* classes.
@@ -286,4 +427,24 @@ private T getFieldValue(String fieldName) throws IllegalAccessException {
private T getFlinkWriterOperator() {
return (T) flinkWriterOperator;
}
+
+ private void invokeFlinkWriterOperatorMethod(
+ String methodName, Class>[] parameterTypes, Object... args) {
+ try {
+ Method m = flinkWriterOperator.getClass().getDeclaredMethod(methodName, parameterTypes);
+ m.setAccessible(true);
+ m.invoke(flinkWriterOperator, args);
+ } catch (NoSuchMethodException e) {
+ // Method does not exist in this Flink version (for example, open() signature changed);
+ // ignore for compatibility.
+ return;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to invoke method "
+ + methodName
+ + " on wrapped flink writer operator "
+ + flinkWriterOperator.getClass().getName(),
+ e);
+ }
+ }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
index 278518f976e..a286ac2e70c 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java
@@ -23,17 +23,16 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
/** Operator factory for {@link DataSinkWriterOperator}. */
@Internal
public class DataSinkWriterOperatorFactory
extends AbstractStreamOperatorFactory>
- implements OneInputStreamOperatorFactory>,
- YieldingOperatorFactory> {
+ implements OneInputStreamOperatorFactory> {
private final Sink sink;
private final boolean isBounded;
@@ -41,6 +40,7 @@ public class DataSinkWriterOperatorFactory
public DataSinkWriterOperatorFactory(
Sink sink, boolean isBounded, OperatorID schemaOperatorID) {
+ setChainingStrategy(ChainingStrategy.ALWAYS);
this.sink = sink;
this.isBounded = isBounded;
this.schemaOperatorID = schemaOperatorID;
@@ -50,11 +50,10 @@ public DataSinkWriterOperatorFactory(
@Override
public >> T createStreamOperator(
StreamOperatorParameters> parameters) {
-
if (isBounded) {
BatchDataSinkWriterOperator writerOperator =
new BatchDataSinkWriterOperator<>(
- sink, processingTimeService, getMailboxExecutor());
+ sink, processingTimeService, parameters.getMailboxExecutor());
writerOperator.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
@@ -63,7 +62,10 @@ public >> T createStreamOpera
}
DataSinkWriterOperator writerOperator =
new DataSinkWriterOperator<>(
- sink, processingTimeService, getMailboxExecutor(), schemaOperatorID);
+ sink,
+ processingTimeService,
+ parameters.getMailboxExecutor(),
+ schemaOperatorID);
writerOperator.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/exception/SinkWrapperException.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/exception/SinkWrapperException.java
index 18dab43dd8b..273a6fb8cce 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/exception/SinkWrapperException.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/exception/SinkWrapperException.java
@@ -20,9 +20,9 @@
import org.apache.flink.cdc.common.event.Event;
/**
- * A generic {@link RuntimeException} thrown when an error occurs in {@link
- * org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator} and {@link
- * org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator}.
+ * A generic {@link RuntimeException} thrown when an error occurs in sink operators (e.g. {@link
+ * org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator} or the SinkFunction-based
+ * operator in the Flink 1.x compat module).
*/
public class SinkWrapperException extends RuntimeException {
public SinkWrapperException(Event event, Throwable cause) {
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index ff6cc512f44..235c5fc64bc 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -39,7 +39,6 @@
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -85,7 +84,6 @@ public static PreTransformOperatorBuilder newBuilder() {
this.preTransformChangeInfoMap = new HashMap<>();
this.preTransformProcessorMap = new HashMap<>();
this.schemaMetadataTransformers = new ArrayList<>();
- this.chainingStrategy = ChainingStrategy.ALWAYS;
this.transformRules = transformRules;
this.udfFunctions = udfFunctions;
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
index bb4df65a5f6..32b12617183 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
@@ -79,4 +79,10 @@ public PreTransformOperatorBuilder addUdfFunctions(
public PreTransformOperator build() {
return new PreTransformOperator(transformRules, udfFunctions);
}
+
+ /** Builds a factory with chaining strategy set (for Flink 2.x compatibility). */
+ public PreTransformOperatorFactory buildFactory() {
+ return new PreTransformOperatorFactory(
+ new PreTransformOperator(transformRules, udfFunctions));
+ }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorFactory.java
new file mode 100644
index 00000000000..fea98e2d5f2
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+
+/**
+ * Factory for {@link PreTransformOperator} with chaining strategy set for Flink 2.x compatibility.
+ */
+@Internal
+public class PreTransformOperatorFactory extends SimpleOperatorFactory
+ implements OneInputStreamOperatorFactory {
+
+ public PreTransformOperatorFactory(PreTransformOperator operator) {
+ super(operator);
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperator.java
index d6142868f17..90901a7b1e0 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperator.java
@@ -29,7 +29,6 @@
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -58,7 +57,6 @@ public class BatchRegularPrePartitionOperator extends AbstractStreamOperator hashFunctionProvider) {
- this.chainingStrategy = ChainingStrategy.ALWAYS;
this.downstreamParallelism = downstreamParallelism;
this.hashFunctionProvider = hashFunctionProvider;
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperatorFactory.java
new file mode 100644
index 00000000000..4f9ad2288bc
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/BatchRegularPrePartitionOperatorFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.partitioning;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+
+/**
+ * Factory for {@link BatchRegularPrePartitionOperator} with chaining strategy set for Flink 2.x
+ * compatibility.
+ */
+@Internal
+public class BatchRegularPrePartitionOperatorFactory
+ extends SimpleOperatorFactory
+ implements OneInputStreamOperatorFactory {
+
+ public BatchRegularPrePartitionOperatorFactory(
+ int downstreamParallelism, HashFunctionProvider hashFunctionProvider) {
+ super(new BatchRegularPrePartitionOperator(downstreamParallelism, hashFunctionProvider));
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
index bedbe8e8e09..02a9469bdb8 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
@@ -30,7 +30,6 @@
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -55,7 +54,6 @@ public class DistributedPrePartitionOperator extends AbstractStreamOperator hashFunctionProvider) {
- this.chainingStrategy = ChainingStrategy.ALWAYS;
this.downstreamParallelism = downstreamParallelism;
this.hashFunctionProvider = hashFunctionProvider;
}
@@ -63,7 +61,7 @@ public DistributedPrePartitionOperator(
@Override
public void open() throws Exception {
super.open();
- subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
schemaMap = new HashMap<>();
hashFunctionMap = new HashMap<>();
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperatorFactory.java
new file mode 100644
index 00000000000..3a90065e51e
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperatorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.partitioning;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+
+/**
+ * Factory for {@link DistributedPrePartitionOperator} with chaining strategy set for Flink 2.x
+ * compatibility.
+ */
+@Internal
+public class DistributedPrePartitionOperatorFactory extends SimpleOperatorFactory
+ implements OneInputStreamOperatorFactory {
+
+ public DistributedPrePartitionOperatorFactory(
+ int downstreamParallelism, HashFunctionProvider hashFunctionProvider) {
+ super(new DistributedPrePartitionOperator(downstreamParallelism, hashFunctionProvider));
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java
index 38f177472fb..106d189a9c9 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java
@@ -33,7 +33,6 @@
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -67,7 +66,6 @@ public RegularPrePartitionOperator(
OperatorID schemaOperatorId,
int downstreamParallelism,
HashFunctionProvider hashFunctionProvider) {
- this.chainingStrategy = ChainingStrategy.ALWAYS;
this.schemaOperatorId = schemaOperatorId;
this.downstreamParallelism = downstreamParallelism;
this.hashFunctionProvider = hashFunctionProvider;
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperatorFactory.java
new file mode 100644
index 00000000000..eef12bdf116
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.partitioning;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+
+/**
+ * Factory for {@link RegularPrePartitionOperator} with chaining strategy set for Flink 2.x
+ * compatibility.
+ */
+@Internal
+public class RegularPrePartitionOperatorFactory extends SimpleOperatorFactory
+ implements OneInputStreamOperatorFactory {
+
+ public RegularPrePartitionOperatorFactory(
+ OperatorID schemaOperatorId,
+ int downstreamParallelism,
+ HashFunctionProvider hashFunctionProvider) {
+ super(
+ new RegularPrePartitionOperator(
+ schemaOperatorId, downstreamParallelism, hashFunctionProvider));
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/EnumSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/EnumSerializer.java
index 5b93c320d4f..c7828108960 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/EnumSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/EnumSerializer.java
@@ -238,7 +238,8 @@ public TypeSerializer restoreSerializer() {
return new EnumSerializer<>(enumClass, previousEnums);
}
- @Override
+ // Flink 1.x abstract method - removed in Flink 2.x, @Override omitted for cross-version
+ // compatibility
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
if (!(newSerializer instanceof EnumSerializer)) {
@@ -269,6 +270,32 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
reconfiguredSerializer);
}
+
+ // Flink 2.x new abstract method - no @Override so this also compiles against Flink 1.x
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializerSnapshot oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof EnumSerializerSnapshot)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ @SuppressWarnings("unchecked")
+ EnumSerializerSnapshot oldSnapshot =
+ (EnumSerializerSnapshot) oldSerializerSnapshot;
+ if (!enumClass.equals(oldSnapshot.enumClass)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ if (Arrays.equals(oldSnapshot.previousEnums, previousEnums)) {
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
+ Set reconfiguredEnumSet =
+ new LinkedHashSet<>(Arrays.asList(oldSnapshot.previousEnums));
+ reconfiguredEnumSet.addAll(Arrays.asList(previousEnums));
+ @SuppressWarnings("unchecked")
+ T[] reconfiguredEnums =
+ reconfiguredEnumSet.toArray(
+ (T[]) Array.newInstance(enumClass, reconfiguredEnumSet.size()));
+ return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+ new EnumSerializer<>(enumClass, reconfiguredEnums));
+ }
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NestedSerializersSnapshotDelegate.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NestedSerializersSnapshotDelegate.java
index 66c62630dac..8eedaeef14e 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NestedSerializersSnapshotDelegate.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NestedSerializersSnapshotDelegate.java
@@ -184,7 +184,14 @@ public static NestedSerializersSnapshotDelegate readNestedSerializerSnapshots(
// Utilities
// ------------------------------------------------------------------------
- /** Utility method to conjure up a new scope for the generic parameters. */
+ /**
+ * Utility method to conjure up a new scope for the generic parameters.
+ *
+ * In Flink 1.x, TypeSerializerSnapshot.resolveSchemaCompatibility(TypeSerializer) is the
+ * abstract method. In Flink 2.x, it was replaced by
+ * resolveSchemaCompatibility(TypeSerializerSnapshot). This method uses reflection to call the
+ * right variant at runtime.
+ */
@SuppressWarnings("unchecked")
private static TypeSerializerSchemaCompatibility resolveCompatibility(
TypeSerializer> serializer, TypeSerializerSnapshot> snapshot) {
@@ -192,7 +199,32 @@ private static TypeSerializerSchemaCompatibility resolveCompatibility(
TypeSerializer typedSerializer = (TypeSerializer) serializer;
TypeSerializerSnapshot typedSnapshot = (TypeSerializerSnapshot) snapshot;
- return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
+ // Try Flink 1.x approach: oldSnapshot.resolveSchemaCompatibility(newSerializer)
+ try {
+ java.lang.reflect.Method m =
+ typedSnapshot
+ .getClass()
+ .getMethod("resolveSchemaCompatibility", TypeSerializer.class);
+ return (TypeSerializerSchemaCompatibility) m.invoke(typedSnapshot, typedSerializer);
+ } catch (NoSuchMethodException ignored) {
+ // fall through to Flink 2.x approach
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to resolve schema compatibility (Flink 1.x path)", e);
+ }
+
+ // Flink 2.x approach: newSnapshot.resolveSchemaCompatibility(oldSnapshot)
+ try {
+ TypeSerializerSnapshot newSnapshot = typedSerializer.snapshotConfiguration();
+ java.lang.reflect.Method m =
+ newSnapshot
+ .getClass()
+ .getMethod("resolveSchemaCompatibility", TypeSerializerSnapshot.class);
+ return (TypeSerializerSchemaCompatibility) m.invoke(newSnapshot, typedSnapshot);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to resolve schema compatibility (Flink 2.x path)", e);
+ }
}
private static TypeSerializer>[] snapshotsToRestoreSerializers(
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java
index c230c8c9a8c..33eb92742fd 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java
@@ -172,7 +172,8 @@ public TypeSerializer restoreSerializer() {
return new NullableSerializerWrapper<>(innerSerializer);
}
- @Override
+ // Flink 1.x abstract method - removed in Flink 2.x, @Override omitted for cross-version
+ // compatibility
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
if (!(newSerializer instanceof NullableSerializerWrapper)) {
@@ -187,5 +188,20 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
+
+ // Flink 2.x new abstract method - no @Override so this also compiles against Flink 1.x
+ @SuppressWarnings("unchecked")
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializerSnapshot oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof NullableSerializerWrapperSnapshot)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ NullableSerializerWrapperSnapshot snapshot =
+ (NullableSerializerWrapperSnapshot) oldSerializerSnapshot;
+ if (!innerSerializer.equals(snapshot.innerSerializer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java
index 39939375d50..ecbc03d7e9d 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java
@@ -265,7 +265,8 @@ public TypeSerializer restoreSerializer() {
return new ArrayDataSerializer(previousType, previousEleSer);
}
- @Override
+ // Flink 1.x abstract method - removed in Flink 2.x, @Override omitted for cross-version
+ // compatibility
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
if (!(newSerializer instanceof ArrayDataSerializer)) {
@@ -280,5 +281,20 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
+
+ // Flink 2.x new abstract method - no @Override so this also compiles against Flink 1.x
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializerSnapshot oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof ArrayDataSerializerSnapshot)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ ArrayDataSerializerSnapshot snapshot =
+ (ArrayDataSerializerSnapshot) oldSerializerSnapshot;
+ if (!previousType.equals(snapshot.previousType)
+ || !previousEleSer.equals(snapshot.previousEleSer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/DecimalDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/DecimalDataSerializer.java
index 4872330f37e..97d2e356147 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/DecimalDataSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/DecimalDataSerializer.java
@@ -178,7 +178,8 @@ public TypeSerializer restoreSerializer() {
return new DecimalDataSerializer(previousPrecision, previousScale);
}
- @Override
+ // Flink 1.x abstract method - removed in Flink 2.x, @Override omitted for cross-version
+ // compatibility
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
if (!(newSerializer instanceof DecimalDataSerializer)) {
@@ -193,5 +194,21 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
+
+ // Flink 2.x changed the abstract method to
+ // resolveSchemaCompatibility(TypeSerializerSnapshot).
+ // No @Override so this also compiles against Flink 1.x where the method does not exist.
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializerSnapshot oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof DecimalSerializerSnapshot)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ DecimalSerializerSnapshot snapshot = (DecimalSerializerSnapshot) oldSerializerSnapshot;
+ if (previousPrecision != snapshot.previousPrecision
+ || previousScale != snapshot.previousScale) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializer.java
index c3622db2340..987f2082353 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/LocalZonedTimestampDataSerializer.java
@@ -177,7 +177,8 @@ public TypeSerializer restoreSerializer() {
return new LocalZonedTimestampDataSerializer(previousPrecision);
}
- @Override
+ // Flink 1.x abstract method - removed in Flink 2.x, @Override omitted for cross-version
+ // compatibility
public TypeSerializerSchemaCompatibility
resolveSchemaCompatibility(TypeSerializer newSerializer) {
if (!(newSerializer instanceof LocalZonedTimestampDataSerializer)) {
@@ -192,5 +193,22 @@ public TypeSerializer restoreSerializer() {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
+
+ // Flink 2.x changed the abstract method to
+ // resolveSchemaCompatibility(TypeSerializerSnapshot).
+ // No @Override so this also compiles against Flink 1.x where the method does not exist.
+ public TypeSerializerSchemaCompatibility
+ resolveSchemaCompatibility(
+ TypeSerializerSnapshot oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof LocalZonedTimestampDataSerializerSnapshot)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ LocalZonedTimestampDataSerializerSnapshot snapshot =
+ (LocalZonedTimestampDataSerializerSnapshot) oldSerializerSnapshot;
+ if (previousPrecision != snapshot.previousPrecision) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java
index d8f92ee29f0..8b2e19cdef7 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java
@@ -306,7 +306,8 @@ public TypeSerializer restoreSerializer() {
return new MapDataSerializer(keyType, valueType, keySerializer, valueSerializer);
}
- @Override
+ // Flink 1.x abstract method - removed in Flink 2.x, @Override omitted for cross-version
+ // compatibility
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
if (!(newSerializer instanceof MapDataSerializer)) {
@@ -323,5 +324,21 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
+
+ // Flink 2.x new abstract method - no @Override so this also compiles against Flink 1.x
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializerSnapshot oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof MapDataSerializerSnapshot)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ MapDataSerializerSnapshot snapshot = (MapDataSerializerSnapshot) oldSerializerSnapshot;
+ if (!keyType.equals(snapshot.keyType)
+ || !valueType.equals(snapshot.valueType)
+ || !keySerializer.equals(snapshot.keySerializer)
+ || !valueSerializer.equals(snapshot.valueSerializer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializer.java
index babc6345bc5..10dfc060631 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/TimestampDataSerializer.java
@@ -171,7 +171,8 @@ public TypeSerializer restoreSerializer() {
return new TimestampDataSerializer(previousPrecision);
}
- @Override
+ // Flink 1.x abstract method - removed in Flink 2.x, @Override omitted for cross-version
+ // compatibility
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
if (!(newSerializer instanceof TimestampDataSerializer)) {
@@ -186,5 +187,21 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibili
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
+
+ // Flink 2.x changed the abstract method to
+ // resolveSchemaCompatibility(TypeSerializerSnapshot).
+ // No @Override so this also compiles against Flink 1.x where the method does not exist.
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializerSnapshot oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof TimestampDataSerializerSnapshot)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ TimestampDataSerializerSnapshot snapshot =
+ (TimestampDataSerializerSnapshot) oldSerializerSnapshot;
+ if (previousPrecision != snapshot.previousPrecision) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializer.java
index ca9861761cf..8676d903f75 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ZonedTimestampDataSerializer.java
@@ -181,7 +181,8 @@ public TypeSerializer restoreSerializer() {
return new ZonedTimestampDataSerializer(previousPrecision);
}
- @Override
+ // Flink 1.x abstract method - removed in Flink 2.x, @Override omitted for cross-version
+ // compatibility
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
if (!(newSerializer instanceof ZonedTimestampDataSerializer)) {
@@ -196,5 +197,19 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompat
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
+
+ // Flink 2.x new abstract method - no @Override so this also compiles against Flink 1.x
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializerSnapshot oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof ZonedTimestampDataSerializerSnapshot)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ ZonedTimestampDataSerializerSnapshot snapshot =
+ (ZonedTimestampDataSerializerSnapshot) oldSerializerSnapshot;
+ if (previousPrecision != snapshot.previousPrecision) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/EventTypeInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/EventTypeInfo.java
index 8087746cdec..c126abca628 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/EventTypeInfo.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/EventTypeInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.runtime.typeutils;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.event.Event;
@@ -58,11 +59,18 @@ public boolean isKeyType() {
return false;
}
- @Override
+ // Flink 1.x abstract method - not abstract in Flink 2.x so @Override omitted for cross-version
+ // compatibility
public TypeSerializer createSerializer(ExecutionConfig config) {
return EventSerializer.INSTANCE;
}
+ /** For Flink 2.2+ (TypeInformation.createSerializer now takes SerializerConfig). */
+ @Override
+ public TypeSerializer createSerializer(SerializerConfig config) {
+ return EventSerializer.INSTANCE;
+ }
+
@Override
public String toString() {
return "Event";
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/PartitioningEventTypeInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/PartitioningEventTypeInfo.java
index 4a3018fb98a..1de112319e4 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/PartitioningEventTypeInfo.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/PartitioningEventTypeInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.runtime.typeutils;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.annotation.Internal;
@@ -58,11 +59,18 @@ public boolean isKeyType() {
return false;
}
- @Override
+ // Flink 1.x abstract method - not abstract in Flink 2.x so @Override omitted for cross-version
+ // compatibility
public TypeSerializer createSerializer(ExecutionConfig config) {
return PartitioningEventSerializer.INSTANCE;
}
+ /** For Flink 2.2+ (TypeInformation.createSerializer now takes SerializerConfig). */
+ @Override
+ public TypeSerializer createSerializer(SerializerConfig config) {
+ return PartitioningEventSerializer.INSTANCE;
+ }
+
@Override
public String toString() {
return "PartitioningEvent";
diff --git a/pom.xml b/pom.xml
index 3d87b69054f..8f021aaffa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,7 @@ limitations under the License.
flink-cdc-dist
flink-cdc-connect
flink-cdc-runtime
+ flink-cdc-flink-compat
flink-cdc-e2e-tests
flink-cdc-pipeline-udf-examples
flink-cdc-pipeline-model
@@ -106,6 +107,8 @@ limitations under the License.
and thus is changing back to java 1.6 on each maven re-import -->
${source.java.version}
${target.java.version}
+
+ flink-cdc-flink-compat-flink1
@@ -732,6 +735,18 @@ limitations under the License.