diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index 50206761a19..561b1efac3c 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -270,6 +270,18 @@ pipeline: 此为实验性选项,默认值为 false。 + + table-id.include-database + optional + false + Boolean + + 是否在生成的 Table ID 中包含数据库名称。
+ 如果设置为 true,Table ID 的格式为 (数据库, 模式, 表)。
+ 如果设置为 false,Table ID 的格式为 (模式, 表)。
+ 默认值为 false。 + + diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index 03dd4e5a314..fa5ce5b105f 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -262,6 +262,18 @@ pipeline: Experimental option, defaults to false. + + table-id.include-database + optional + false + Boolean + + Whether to include database in the generated Table ID.
+ If set to true, the Table ID will be in the format (database, schema, table).
+ If set to false, the Table ID will be in the format (schema, table).
+ Defaults to false. + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index 918d479f6df..5e6c446c5d7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -80,6 +80,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.USERNAME; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; @@ -129,6 +130,7 @@ public DataSource createDataSource(Context context) { int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); + boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -169,6 +171,7 @@ public DataSource createDataSource(Context context) { .skipSnapshotBackfill(skipSnapshotBackfill) .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) + .includeDatabaseInTableId(tableIdIncludeDatabase) .getConfigFactory(); List tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null); @@ -197,6 +200,7 @@ public DataSource createDataSource(Context context) { String metadataList = config.get(METADATA_LIST); List readableMetadataList = listReadableMetadata(metadataList); + // Create a custom PostgresDataSource that passes the includeDatabaseInTableId flag return new PostgresDataSource(configFactory, readableMetadataList); } @@ -257,6 +261,7 @@ public Set> optionalOptions() { options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); options.add(METADATA_LIST); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); + options.add(TABLE_ID_INCLUDE_DATABASE); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java index 767fb5fc33a..6084df1f36f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java @@ -65,8 +65,14 @@ public PostgresDataSource( @Override public EventSourceProvider getEventSourceProvider() { + String databaseName = postgresSourceConfig.getDatabaseList().get(0); + boolean includeDatabaseInTableId = postgresSourceConfig.isIncludeDatabaseInTableId(); DebeziumEventDeserializationSchema deserializer = - new PostgresEventDeserializer(DebeziumChangelogMode.ALL, readableMetadataList); + new PostgresEventDeserializer( + DebeziumChangelogMode.ALL, + readableMetadataList, + includeDatabaseInTableId, + databaseName); PostgresOffsetFactory postgresOffsetFactory = new PostgresOffsetFactory(); PostgresDialect postgresDialect = new PostgresDialect(postgresSourceConfig); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java index ec51de8b680..7e9ac4b0c58 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -264,4 +264,13 @@ public class PostgresDataSourceOptions { .defaultValue(false) .withDescription( "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); + + public static final ConfigOption TABLE_ID_INCLUDE_DATABASE = + ConfigOptions.key("table-id.include-database") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to include database in the generated Table ID. " + + "If set to true, the Table ID will be in the format (database, schema, table). " + + "If set to false, the Table ID will be in the format (schema, table). Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java index a37c35c5216..c31dbc73b7f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java @@ -50,18 +50,37 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem private static final long serialVersionUID = 1L; private List readableMetadataList; + private final boolean includeDatabaseInTableId; + private final String databaseName; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) { - super(new PostgresSchemaDataTypeInference(), changelogMode); + this(changelogMode, new ArrayList<>(), false, null); } public PostgresEventDeserializer( DebeziumChangelogMode changelogMode, List readableMetadataList) { + this(changelogMode, readableMetadataList, false, null); + } + + public PostgresEventDeserializer( + DebeziumChangelogMode changelogMode, + List readableMetadataList, + boolean includeDatabaseInTableId) { + this(changelogMode, readableMetadataList, includeDatabaseInTableId, null); + } + + public PostgresEventDeserializer( + DebeziumChangelogMode changelogMode, + List readableMetadataList, + boolean includeDatabaseInTableId, + String databaseName) { super(new PostgresSchemaDataTypeInference(), changelogMode); this.readableMetadataList = readableMetadataList; + this.includeDatabaseInTableId = includeDatabaseInTableId; + this.databaseName = databaseName; } @Override @@ -87,7 +106,11 @@ protected boolean isSchemaChangeRecord(SourceRecord record) { @Override protected TableId getTableId(SourceRecord record) { String[] parts = record.topic().split("\\."); - return TableId.tableId(parts[1], parts[2]); + if (includeDatabaseInTableId && databaseName != null) { + return TableId.tableId(databaseName, parts[1], parts[2]); + } else { + return TableId.tableId(parts[1], parts[2]); + } } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java index 02761d8f311..dd862354c25 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java @@ -61,6 +61,7 @@ import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent; import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord; import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent; +import static org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId; /** The {@link RecordEmitter} implementation for PostgreSQL pipeline connector. */ public class PostgresPipelineRecordEmitter extends IncrementalSourceRecordEmitter { @@ -73,6 +74,7 @@ public class PostgresPipelineRecordEmitter extends IncrementalSourceRecordEmi // Used when startup mode is not initial private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true; private boolean isBounded = false; + private boolean includeDatabaseInTableId = false; private final Map createTableEventCache; @@ -88,6 +90,7 @@ public PostgresPipelineRecordEmitter( sourceConfig.isIncludeSchemaChanges(), offsetFactory); this.sourceConfig = sourceConfig; + this.includeDatabaseInTableId = sourceConfig.isIncludeDatabaseInTableId(); this.postgresDialect = postgresDialect; this.alreadySendCreateTableTables = new HashSet<>(); this.createTableEventCache = @@ -103,10 +106,17 @@ public void applySplit(SourceSplitBase split) { // TableSchemas in SnapshotSplit only contains one table. createTableEventCache.putAll(generateCreateTableEvent(sourceConfig)); } else { - for (TableChanges.TableChange tableChange : split.getTableSchemas().values()) { + for (Map.Entry entry : + split.getTableSchemas().entrySet()) { + TableId tableId = + entry.getKey(); // Use the TableId from the map key which contains full info + TableChanges.TableChange tableChange = entry.getValue(); CreateTableEvent createTableEvent = new CreateTableEvent( - toCdcTableId(tableChange.getId()), + toCdcTableId( + tableId, + sourceConfig.getDatabaseList().get(0), + includeDatabaseInTableId), buildSchemaFromTable(tableChange.getTable())); ((DebeziumEventDeserializationSchema) debeziumDeserializationSchema) .applyChangeEvent(createTableEvent); @@ -128,10 +138,8 @@ protected void processElement( } else if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { TableId tableId = splitState.asSnapshotSplitState().toSourceSplit().getTableId(); if (!alreadySendCreateTableTables.contains(tableId)) { - try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) { - sendCreateTableEvent(jdbc, tableId, (SourceOutput) output); - alreadySendCreateTableTables.add(tableId); - } + sendCreateTableEvent(tableId, (SourceOutput) output); + alreadySendCreateTableTables.add(tableId); } } else { boolean isDataChangeRecord = isDataChangeRecord(element); @@ -189,21 +197,8 @@ private Schema buildSchemaFromTable(Table table) { return tableBuilder.build(); } - private void sendCreateTableEvent( - PostgresConnection jdbc, TableId tableId, SourceOutput output) { - Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc); - output.collect( - new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.schema(), tableId.table()), - schema)); - } - - private org.apache.flink.cdc.common.event.TableId toCdcTableId( - io.debezium.relational.TableId dbzTableId) { - String schemaName = - dbzTableId.catalog() == null ? dbzTableId.schema() : dbzTableId.catalog(); - return org.apache.flink.cdc.common.event.TableId.tableId(schemaName, dbzTableId.table()); + private void sendCreateTableEvent(TableId tableId, SourceOutput output) { + output.collect(getCreateTableEvent(sourceConfig, tableId)); } private CreateTableEvent getCreateTableEvent( @@ -211,8 +206,10 @@ private CreateTableEvent getCreateTableEvent( try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) { Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc); return new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.schema(), tableId.table()), + toCdcTableId( + tableId, + sourceConfig.getDatabaseList().get(0), + includeDatabaseInTableId), schema); } } @@ -244,8 +241,10 @@ private Map generateCreateTableEvent( createTableEventCache.put( tableId, new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.schema(), tableId.table()), + toCdcTableId( + tableId, + this.sourceConfig.getDatabaseList().get(0), + includeDatabaseInTableId), schema)); } return createTableEventCache; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java index 0f3b0580644..93a7099ac39 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java @@ -208,12 +208,26 @@ public static Column toColumn( public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { return new io.debezium.relational.TableId( - tableId.getSchemaName(), null, tableId.getTableName()); + tableId.getNamespace(), tableId.getSchemaName(), tableId.getTableName()); } public static org.apache.flink.cdc.common.event.TableId toCdcTableId( io.debezium.relational.TableId dbzTableId) { - return org.apache.flink.cdc.common.event.TableId.tableId( - dbzTableId.schema(), dbzTableId.table()); + return toCdcTableId(dbzTableId, null, false); + } + + public static org.apache.flink.cdc.common.event.TableId toCdcTableId( + io.debezium.relational.TableId dbzTableId, + String databaseName, + boolean includeDatabaseInTableId) { + String schema = dbzTableId.schema(); + String table = dbzTableId.table(); + if (includeDatabaseInTableId && databaseName != null) { + return org.apache.flink.cdc.common.event.TableId.tableId(databaseName, schema, table); + } else if (schema != null) { + return org.apache.flink.cdc.common.event.TableId.tableId(schema, table); + } else { + return org.apache.flink.cdc.common.event.TableId.tableId(table); + } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java index bd6c46e2548..b2aef9eed9c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java @@ -57,7 +57,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,7 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; @@ -103,6 +105,14 @@ public void after() throws SQLException { inventoryDatabase.removeSlot(slotName); } + static Stream provideParameters() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(false, false), + Arguments.of(true, false), + Arguments.of(false, true)); + } + @Test public void testInitialStartupMode() throws Exception { inventoryDatabase.createAndInitialize(); @@ -341,9 +351,10 @@ private CollectResultIterator addCollector( return iterator; } - @ParameterizedTest(name = "unboundedChunkFirst: {0}") - @ValueSource(booleans = {true, false}) - public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws Exception { + @ParameterizedTest + @MethodSource("provideParameters") + public void testInitialStartupModeWithOpts( + boolean unboundedChunkFirst, boolean isTableIdIncludeDatabase) throws Exception { inventoryDatabase.createAndInitialize(); org.apache.flink.cdc.common.configuration.Configuration sourceConfiguration = new org.apache.flink.cdc.common.configuration.Configuration(); @@ -365,6 +376,8 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E sourceConfiguration.set( PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED, unboundedChunkFirst); + sourceConfiguration.set( + PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE, isTableIdIncludeDatabase); Factory.Context context = new FactoryHelper.DefaultContext( @@ -384,7 +397,12 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E new EventTypeInfo()) .executeAndCollect(); - TableId tableId = TableId.tableId("inventory", "products"); + TableId tableId; + if (isTableIdIncludeDatabase) { + tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "inventory", "products"); + } else { + tableId = TableId.tableId("inventory", "products"); + } CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId); // generate snapshot data @@ -582,16 +600,6 @@ private static List fetchResultsExcept(Iterator iter, int size, T side return result; } - // Helper method to create a temporary directory for savepoint - private Path createTempSavepointDir() throws Exception { - return Files.createTempDirectory("postgres-savepoint"); - } - - // Helper method to execute the job and create a savepoint - private String createSavepoint(JobClient jobClient, Path savepointDir) throws Exception { - return jobClient.stopWithSavepoint(true, savepointDir.toAbsolutePath().toString()).get(); - } - private List getSnapshotExpected(TableId tableId) { RowType rowType = RowType.of( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index bbd65b5e9ab..0f550ca96b3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -305,6 +305,12 @@ public PostgresSourceBuilder includePartitionedTables(boolean includePartitio return this; } + /** Whether to include database in the generated Table ID. */ + public PostgresSourceBuilder includeDatabaseInTableId(boolean includeDatabaseInTableId) { + this.configFactory.setIncludeDatabaseInTableId(includeDatabaseInTableId); + return this; + } + /** * Build the {@link PostgresIncrementalSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java index 30271612800..4402219a647 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java @@ -39,6 +39,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig { private final int subtaskId; private final int lsnCommitCheckpointsDelay; private final boolean includePartitionedTables; + private final boolean includeDatabaseInTableId; public PostgresSourceConfig( int subtaskId, @@ -69,7 +70,8 @@ public PostgresSourceConfig( boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, boolean assignUnboundedChunkFirst, - boolean includePartitionedTables) { + boolean includePartitionedTables, + boolean includeDatabaseInTableId) { super( startupOptions, databaseList, @@ -100,6 +102,7 @@ public PostgresSourceConfig( this.subtaskId = subtaskId; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; this.includePartitionedTables = includePartitionedTables; + this.includeDatabaseInTableId = includeDatabaseInTableId; } /** @@ -148,4 +151,13 @@ public String getJdbcUrl() { public PostgresConnectorConfig getDbzConnectorConfig() { return new PostgresConnectorConfig(getDbzConfiguration()); } + + /** + * Returns whether to include database in the generated Table ID. + * + * @return whether to include database in the generated Table ID + */ + public boolean isIncludeDatabaseInTableId() { + return includeDatabaseInTableId; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 670d4f37a56..847b1547461 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -54,6 +54,9 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { private boolean includePartitionedTables; + private boolean includeDatabaseInTableId = + PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue(); + /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ @Override public PostgresSourceConfig create(int subtaskId) { @@ -136,7 +139,8 @@ public PostgresSourceConfig create(int subtaskId) { scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, - includePartitionedTables); + includePartitionedTables, + includeDatabaseInTableId); } /** @@ -189,4 +193,9 @@ public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) { public void setIncludePartitionedTables(boolean includePartitionedTables) { this.includePartitionedTables = includePartitionedTables; } + + /** Set whether to include database in the generated Table ID. */ + public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) { + this.includeDatabaseInTableId = includeDatabaseInTableId; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java index f498c264532..db04ac14237 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java @@ -97,4 +97,13 @@ public class PostgresSourceOptions extends JdbcSourceOptions { + "If enabled:\n" + "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n" + "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice."); + + public static final ConfigOption TABLE_ID_INCLUDE_DATABASE = + ConfigOptions.key("table-id.include-database") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to include database in the generated Table ID.\n" + + "If set to true, the Table ID will be in the format (database, schema, table).\n" + + "If set to false, the Table ID will be in the format (schema, table). Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java index b3b3e778d16..49b62cb3977 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java @@ -132,9 +132,8 @@ public void configure(SourceSplitBase sourceSplitBase) { .edit() .with( "table.include.list", - ((SnapshotSplit) sourceSplitBase) - .getTableId() - .toString()) + getTableList( + ((SnapshotSplit) sourceSplitBase).getTableId())) .with( SLOT_NAME.name(), ((PostgresSourceConfig) sourceConfig) @@ -151,13 +150,13 @@ public void configure(SourceSplitBase sourceSplitBase) { // when backfilled split, only current table schema should be scan builder.with( "table.include.list", - sourceSplitBase - .asStreamSplit() - .getTableSchemas() - .keySet() - .iterator() - .next() - .toString()); + getTableList( + sourceSplitBase + .asStreamSplit() + .getTableSchemas() + .keySet() + .iterator() + .next())); } dbzConfig = @@ -385,4 +384,11 @@ private boolean isBackFillSplit(SourceSplitBase sourceSplitBase) { && !StreamSplit.STREAM_SPLIT_ID.equalsIgnoreCase( sourceSplitBase.asStreamSplit().splitId()); } + + private String getTableList(TableId tableId) { + if (tableId.schema() == null || tableId.schema().isEmpty()) { + return tableId.table(); + } + return tableId.schema() + "." + tableId.table(); + } }