Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ pipeline:
此为实验性选项,默认值为 false。
</td>
</tr>
<tr>
<td>table-id.include-database</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
是否在生成的 Table ID 中包含数据库名称。<br>
如果设置为 true,Table ID 的格式为 (数据库, 模式, 表)。<br>
如果设置为 false,Table ID 的格式为 (模式, 表)。<br>
默认值为 false。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,18 @@ pipeline:
Experimental option, defaults to false.
</td>
</tr>
<tr>
<td>table-id.include-database</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to include database in the generated Table ID.<br>
If set to true, the Table ID will be in the format (database, schema, table).<br>
If set to false, the Table ID will be in the format (schema, table).<br>
Defaults to false.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
Expand Down Expand Up @@ -129,6 +130,7 @@ public DataSource createDataSource(Context context) {
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -169,6 +171,7 @@ public DataSource createDataSource(Context context) {
.skipSnapshotBackfill(skipSnapshotBackfill)
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.includeDatabaseInTableId(tableIdIncludeDatabase)
.getConfigFactory();

List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
Expand Down Expand Up @@ -197,6 +200,7 @@ public DataSource createDataSource(Context context) {
String metadataList = config.get(METADATA_LIST);
List<PostgreSQLReadableMetadata> readableMetadataList = listReadableMetadata(metadataList);

// Create a custom PostgresDataSource that passes the includeDatabaseInTableId flag
return new PostgresDataSource(configFactory, readableMetadataList);
}

Expand Down Expand Up @@ -257,6 +261,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
options.add(METADATA_LIST);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(TABLE_ID_INCLUDE_DATABASE);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@ public PostgresDataSource(

@Override
public EventSourceProvider getEventSourceProvider() {
String databaseName = postgresSourceConfig.getDatabaseList().get(0);
boolean includeDatabaseInTableId = postgresSourceConfig.isIncludeDatabaseInTableId();
DebeziumEventDeserializationSchema deserializer =
new PostgresEventDeserializer(DebeziumChangelogMode.ALL, readableMetadataList);
new PostgresEventDeserializer(
DebeziumChangelogMode.ALL,
readableMetadataList,
includeDatabaseInTableId,
databaseName);

PostgresOffsetFactory postgresOffsetFactory = new PostgresOffsetFactory();
PostgresDialect postgresDialect = new PostgresDialect(postgresSourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,13 @@ public class PostgresDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");

public static final ConfigOption<Boolean> TABLE_ID_INCLUDE_DATABASE =
ConfigOptions.key("table-id.include-database")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to include database in the generated Table ID. "
+ "If set to true, the Table ID will be in the format (database, schema, table). "
+ "If set to false, the Table ID will be in the format (schema, table). Defaults to false.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,37 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem

private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this(changelogMode, new ArrayList<>(), false, null);
}

public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
this(changelogMode, readableMetadataList, false, null);
}

public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
this(changelogMode, readableMetadataList, includeDatabaseInTableId, null);
}

public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
String databaseName) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
}

@Override
Expand All @@ -87,7 +106,11 @@ protected boolean isSchemaChangeRecord(SourceRecord record) {
@Override
protected TableId getTableId(SourceRecord record) {
String[] parts = record.topic().split("\\.");
return TableId.tableId(parts[1], parts[2]);
if (includeDatabaseInTableId && databaseName != null) {
return TableId.tableId(databaseName, parts[1], parts[2]);
} else {
return TableId.tableId(parts[1], parts[2]);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
import static org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;

/** The {@link RecordEmitter} implementation for PostgreSQL pipeline connector. */
public class PostgresPipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> {
Expand All @@ -73,6 +74,7 @@ public class PostgresPipelineRecordEmitter<T> extends IncrementalSourceRecordEmi
// Used when startup mode is not initial
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
private boolean isBounded = false;
private boolean includeDatabaseInTableId = false;

private final Map<TableId, CreateTableEvent> createTableEventCache;

Expand All @@ -88,6 +90,7 @@ public PostgresPipelineRecordEmitter(
sourceConfig.isIncludeSchemaChanges(),
offsetFactory);
this.sourceConfig = sourceConfig;
this.includeDatabaseInTableId = sourceConfig.isIncludeDatabaseInTableId();
this.postgresDialect = postgresDialect;
this.alreadySendCreateTableTables = new HashSet<>();
this.createTableEventCache =
Expand All @@ -103,10 +106,17 @@ public void applySplit(SourceSplitBase split) {
// TableSchemas in SnapshotSplit only contains one table.
createTableEventCache.putAll(generateCreateTableEvent(sourceConfig));
} else {
for (TableChanges.TableChange tableChange : split.getTableSchemas().values()) {
for (Map.Entry<TableId, TableChanges.TableChange> entry :
split.getTableSchemas().entrySet()) {
TableId tableId =
entry.getKey(); // Use the TableId from the map key which contains full info
TableChanges.TableChange tableChange = entry.getValue();
CreateTableEvent createTableEvent =
new CreateTableEvent(
toCdcTableId(tableChange.getId()),
toCdcTableId(
tableId,
sourceConfig.getDatabaseList().get(0),
includeDatabaseInTableId),
buildSchemaFromTable(tableChange.getTable()));
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
.applyChangeEvent(createTableEvent);
Expand All @@ -128,10 +138,8 @@ protected void processElement(
} else if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
TableId tableId = splitState.asSnapshotSplitState().toSourceSplit().getTableId();
if (!alreadySendCreateTableTables.contains(tableId)) {
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
sendCreateTableEvent(jdbc, tableId, (SourceOutput<Event>) output);
alreadySendCreateTableTables.add(tableId);
}
sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
alreadySendCreateTableTables.add(tableId);
}
} else {
boolean isDataChangeRecord = isDataChangeRecord(element);
Expand Down Expand Up @@ -189,30 +197,19 @@ private Schema buildSchemaFromTable(Table table) {
return tableBuilder.build();
}

private void sendCreateTableEvent(
PostgresConnection jdbc, TableId tableId, SourceOutput<Event> output) {
Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
output.collect(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.schema(), tableId.table()),
schema));
}

private org.apache.flink.cdc.common.event.TableId toCdcTableId(
io.debezium.relational.TableId dbzTableId) {
String schemaName =
dbzTableId.catalog() == null ? dbzTableId.schema() : dbzTableId.catalog();
return org.apache.flink.cdc.common.event.TableId.tableId(schemaName, dbzTableId.table());
private void sendCreateTableEvent(TableId tableId, SourceOutput<Event> output) {
output.collect(getCreateTableEvent(sourceConfig, tableId));
}

private CreateTableEvent getCreateTableEvent(
PostgresSourceConfig sourceConfig, TableId tableId) {
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
return new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.schema(), tableId.table()),
toCdcTableId(
tableId,
sourceConfig.getDatabaseList().get(0),
includeDatabaseInTableId),
schema);
}
}
Expand Down Expand Up @@ -244,8 +241,10 @@ private Map<TableId, CreateTableEvent> generateCreateTableEvent(
createTableEventCache.put(
tableId,
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.schema(), tableId.table()),
toCdcTableId(
tableId,
this.sourceConfig.getDatabaseList().get(0),
includeDatabaseInTableId),
schema));
}
return createTableEventCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,26 @@ public static Column toColumn(

public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
return new io.debezium.relational.TableId(
tableId.getSchemaName(), null, tableId.getTableName());
tableId.getNamespace(), tableId.getSchemaName(), tableId.getTableName());
}

public static org.apache.flink.cdc.common.event.TableId toCdcTableId(
io.debezium.relational.TableId dbzTableId) {
return org.apache.flink.cdc.common.event.TableId.tableId(
dbzTableId.schema(), dbzTableId.table());
return toCdcTableId(dbzTableId, null, false);
}

public static org.apache.flink.cdc.common.event.TableId toCdcTableId(
io.debezium.relational.TableId dbzTableId,
String databaseName,
boolean includeDatabaseInTableId) {
String schema = dbzTableId.schema();
String table = dbzTableId.table();
if (includeDatabaseInTableId && databaseName != null) {
return org.apache.flink.cdc.common.event.TableId.tableId(databaseName, schema, table);
} else if (schema != null) {
return org.apache.flink.cdc.common.event.TableId.tableId(schema, table);
} else {
return org.apache.flink.cdc.common.event.TableId.tableId(table);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -74,6 +75,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
Expand Down Expand Up @@ -103,6 +105,14 @@ public void after() throws SQLException {
inventoryDatabase.removeSlot(slotName);
}

static Stream<Arguments> provideParameters() {
return Stream.of(
Arguments.of(true, true),
Arguments.of(false, false),
Arguments.of(true, false),
Arguments.of(false, true));
}

@Test
public void testInitialStartupMode() throws Exception {
inventoryDatabase.createAndInitialize();
Expand Down Expand Up @@ -341,9 +351,10 @@ private <T> CollectResultIterator<T> addCollector(
return iterator;
}

@ParameterizedTest(name = "unboundedChunkFirst: {0}")
@ValueSource(booleans = {true, false})
public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws Exception {
@ParameterizedTest
@MethodSource("provideParameters")
public void testInitialStartupModeWithOpts(
boolean unboundedChunkFirst, boolean isTableIdIncludeDatabase) throws Exception {
inventoryDatabase.createAndInitialize();
org.apache.flink.cdc.common.configuration.Configuration sourceConfiguration =
new org.apache.flink.cdc.common.configuration.Configuration();
Expand All @@ -365,6 +376,8 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E
sourceConfiguration.set(
PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED,
unboundedChunkFirst);
sourceConfiguration.set(
PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE, isTableIdIncludeDatabase);

Factory.Context context =
new FactoryHelper.DefaultContext(
Expand All @@ -384,7 +397,12 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E
new EventTypeInfo())
.executeAndCollect();

TableId tableId = TableId.tableId("inventory", "products");
TableId tableId;
if (isTableIdIncludeDatabase) {
tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "inventory", "products");
} else {
tableId = TableId.tableId("inventory", "products");
}
CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId);

// generate snapshot data
Expand Down Expand Up @@ -582,16 +600,6 @@ private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T side
return result;
}

// Helper method to create a temporary directory for savepoint
private Path createTempSavepointDir() throws Exception {
return Files.createTempDirectory("postgres-savepoint");
}

// Helper method to execute the job and create a savepoint
private String createSavepoint(JobClient jobClient, Path savepointDir) throws Exception {
return jobClient.stopWithSavepoint(true, savepointDir.toAbsolutePath().toString()).get();
}

private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ public PostgresSourceBuilder<T> includePartitionedTables(boolean includePartitio
return this;
}

/** Whether to include database in the generated Table ID. */
public PostgresSourceBuilder<T> includeDatabaseInTableId(boolean includeDatabaseInTableId) {
this.configFactory.setIncludeDatabaseInTableId(includeDatabaseInTableId);
return this;
}

/**
* Build the {@link PostgresIncrementalSource}.
*
Expand Down
Loading