Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
Expand Down Expand Up @@ -131,6 +132,7 @@ public DataSource createDataSource(Context context) {
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -172,6 +174,7 @@ public DataSource createDataSource(Context context) {
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.includeDatabaseInTableId(tableIdIncludeDatabase)
.includeSchemaChanges(includeSchemaChanges)
.getConfigFactory();

List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
Expand All @@ -34,13 +35,16 @@
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter;
import org.apache.flink.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.connector.base.source.reader.RecordEmitter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** A {@link DataSource} for Postgres cdc connector. */
@Internal
Expand All @@ -50,6 +54,7 @@ public class PostgresDataSource implements DataSource {
private final PostgresSourceConfig postgresSourceConfig;

private final List<PostgreSQLReadableMetadata> readableMetadataList;
Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps = new HashMap<>();

public PostgresDataSource(PostgresSourceConfigFactory configFactory) {
this(configFactory, new ArrayList<>());
Expand All @@ -67,12 +72,17 @@ public PostgresDataSource(
public EventSourceProvider getEventSourceProvider() {
String databaseName = postgresSourceConfig.getDatabaseList().get(0);
boolean includeDatabaseInTableId = postgresSourceConfig.isIncludeDatabaseInTableId();
beforeTableColumnsOidMaps =
PostgresSchemaUtils.getAllTablesColumnOids(
postgresSourceConfig, postgresSourceConfig.getTableList());
Comment on lines +75 to +77
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak risk: The getAllTablesColumnOids method at line 76 opens a database connection internally. If this method throws an exception, the connection may not be properly closed. While the method uses try-with-resources internally, if an exception occurs between line 75 and 85 during deserializer construction, the partially initialized state could cause issues. Consider wrapping this in proper error handling.

Copilot uses AI. Check for mistakes.
Comment on lines +75 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the parallelism of PostgresDataSource is greater than 1, will there be any inconsistency issues here?

DebeziumEventDeserializationSchema deserializer =
new PostgresEventDeserializer(
DebeziumChangelogMode.ALL,
readableMetadataList,
includeDatabaseInTableId,
databaseName);
databaseName,
postgresSourceConfig,
beforeTableColumnsOidMaps);

PostgresOffsetFactory postgresOffsetFactory = new PostgresOffsetFactory();
PostgresDialect postgresDialect = new PostgresDialect(postgresSourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,12 @@ public class PostgresDataSourceOptions {
"Whether to include database in the generated Table ID. "
+ "If set to true, the Table ID will be in the format (database, schema, table). "
+ "If set to false, the Table ID will be in the format (schema, table). Defaults to false.");

@Experimental
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
ConfigOptions.key("schema-change.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether send schema change events, by default is true. If set to false, the schema changes will not be sent.");
}
Loading
Loading