diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index 5e6c446c5d7..41496bdf560 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -74,6 +74,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; @@ -131,6 +132,7 @@ public DataSource createDataSource(Context context) { boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE); + boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -172,6 +174,7 @@ public DataSource createDataSource(Context context) { .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) .includeDatabaseInTableId(tableIdIncludeDatabase) + .includeSchemaChanges(includeSchemaChanges) .getConfigFactory(); List tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java index 6084df1f36f..b2c72626afd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceProvider; @@ -34,13 +35,16 @@ import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; import org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter; import org.apache.flink.cdc.connectors.postgres.table.PostgreSQLReadableMetadata; +import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; import org.apache.flink.connector.base.source.reader.RecordEmitter; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** A {@link DataSource} for Postgres cdc connector. */ @Internal @@ -50,6 +54,7 @@ public class PostgresDataSource implements DataSource { private final PostgresSourceConfig postgresSourceConfig; private final List readableMetadataList; + Map> beforeTableColumnsOidMaps = new HashMap<>(); public PostgresDataSource(PostgresSourceConfigFactory configFactory) { this(configFactory, new ArrayList<>()); @@ -67,12 +72,17 @@ public PostgresDataSource( public EventSourceProvider getEventSourceProvider() { String databaseName = postgresSourceConfig.getDatabaseList().get(0); boolean includeDatabaseInTableId = postgresSourceConfig.isIncludeDatabaseInTableId(); + beforeTableColumnsOidMaps = + PostgresSchemaUtils.getAllTablesColumnOids( + postgresSourceConfig, postgresSourceConfig.getTableList()); DebeziumEventDeserializationSchema deserializer = new PostgresEventDeserializer( DebeziumChangelogMode.ALL, readableMetadataList, includeDatabaseInTableId, - databaseName); + databaseName, + postgresSourceConfig, + beforeTableColumnsOidMaps); PostgresOffsetFactory postgresOffsetFactory = new PostgresOffsetFactory(); PostgresDialect postgresDialect = new PostgresDialect(postgresSourceConfig); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java index 7e9ac4b0c58..3b2638f6dc1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -273,4 +273,12 @@ public class PostgresDataSourceOptions { "Whether to include database in the generated Table ID. " + "If set to true, the Table ID will be in the format (database, schema, table). " + "If set to false, the Table ID will be in the format (schema, table). Defaults to false."); + + @Experimental + public static final ConfigOption SCHEMA_CHANGE_ENABLED = + ConfigOptions.key("schema-change.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java index c31dbc73b7f..851c474e895 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java @@ -19,9 +19,20 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; import org.apache.flink.cdc.connectors.postgres.table.PostgreSQLReadableMetadata; +import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils; import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; import org.apache.flink.table.data.TimestampData; @@ -31,56 +42,69 @@ import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; import io.debezium.data.geometry.Point; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.locationtech.jts.geom.Coordinate; import org.locationtech.jts.io.WKBReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** Event deserializer for {@link PostgresDataSource}. */ @Internal public class PostgresEventDeserializer extends DebeziumEventDeserializationSchema { - + private static final Logger LOG = LoggerFactory.getLogger(PostgresEventDeserializer.class); private static final long serialVersionUID = 1L; private List readableMetadataList; private final boolean includeDatabaseInTableId; private final String databaseName; + private Map schemaMap = new HashMap<>(); + private final PostgresSourceConfig postgresSourceConfig; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private Map> beforeTableColumnsOidMaps; public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) { - this(changelogMode, new ArrayList<>(), false, null); + this(changelogMode, new ArrayList<>(), false, null, null, null); } public PostgresEventDeserializer( DebeziumChangelogMode changelogMode, List readableMetadataList) { - this(changelogMode, readableMetadataList, false, null); + this(changelogMode, readableMetadataList, false, null, null, null); } public PostgresEventDeserializer( DebeziumChangelogMode changelogMode, List readableMetadataList, boolean includeDatabaseInTableId) { - this(changelogMode, readableMetadataList, includeDatabaseInTableId, null); + this(changelogMode, readableMetadataList, includeDatabaseInTableId, null, null, null); } public PostgresEventDeserializer( DebeziumChangelogMode changelogMode, List readableMetadataList, boolean includeDatabaseInTableId, - String databaseName) { + String databaseName, + PostgresSourceConfig postgresSourceConfig, + Map> beforeTableColumnsOidMaps) { super(new PostgresSchemaDataTypeInference(), changelogMode); this.readableMetadataList = readableMetadataList; this.includeDatabaseInTableId = includeDatabaseInTableId; this.databaseName = databaseName; + this.postgresSourceConfig = postgresSourceConfig; + this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps; } @Override @@ -88,6 +112,305 @@ protected List deserializeSchemaChangeRecord(SourceRecord rec return Collections.emptyList(); } + @Override + public List deserialize(SourceRecord record) throws Exception { + List result = new ArrayList<>(); + if (postgresSourceConfig.isIncludeSchemaChanges()) { + handleSchemaChange(record, result); + } + if (isDataChangeRecord(record)) { + LOG.trace("Process data change record: {}", record); + result.addAll(deserializeDataChangeRecord(record)); + } else if (isSchemaChangeRecord(record)) { + LOG.trace("Process schema change record: {}", record); + } else { + LOG.trace("Ignored other record: {}", record); + return Collections.emptyList(); + } + return result; + } + + private void handleSchemaChange(SourceRecord record, List result) { + TableId tableId = getTableId(record); + Schema valueSchema = record.valueSchema(); + Schema beforeSchema = schemaMap.get(tableId); + List beforeColumnNames; + List afterColumnNames; + Schema afterSchema = fieldSchema(valueSchema, Envelope.FieldName.AFTER); + List afterFields = afterSchema.fields(); + org.apache.flink.cdc.common.schema.Schema schema = + PostgresSchemaUtils.getTableSchema(postgresSourceConfig, tableId); + List columns = schema.getColumns(); + Map beforeColumnsOidMaps = + beforeTableColumnsOidMaps.get(TableId.tableId(tableId.getTableName())); + // When the first piece of data arrives, beforeSchema is empty + if (beforeSchema != null) { + beforeColumnNames = + beforeSchema.fields().stream().map(e -> e.name()).collect(Collectors.toList()); + afterColumnNames = afterFields.stream().map(e -> e.name()).collect(Collectors.toList()); + List newAddColumnNames = findAddedElements(beforeColumnNames, afterColumnNames); + List newDelColumnNames = + findRemovedElements(beforeColumnNames, afterColumnNames); + Map renameColumnMaps = new HashMap<>(); + // Process the fields of rename + if (!newAddColumnNames.isEmpty() && !newDelColumnNames.isEmpty()) { + renameColumnMaps = + getRenameColumnMaps( + tableId, + newAddColumnNames, + newDelColumnNames, + beforeSchema, + afterSchema, + beforeColumnsOidMaps); + if (!renameColumnMaps.isEmpty()) { + result.add(new RenameColumnEvent(tableId, renameColumnMaps)); + Map finalRenameColumnMaps1 = renameColumnMaps; + renameColumnMaps + .keySet() + .forEach( + e -> { + beforeColumnsOidMaps.put( + finalRenameColumnMaps1.get(e), + beforeColumnsOidMaps.get(e)); + }); + } + newAddColumnNames.removeAll(renameColumnMaps.values()); + newDelColumnNames.removeAll(renameColumnMaps.keySet()); + } + // Process the fields of add + if (!newAddColumnNames.isEmpty()) { + newAddColumns( + tableId, + afterSchema, + result, + columns, + newAddColumnNames, + beforeColumnsOidMaps); + } + // Process the fields of delete + if (!newDelColumnNames.isEmpty()) { + newDelColumns( + tableId, + beforeColumnNames, + result, + newDelColumnNames, + beforeColumnsOidMaps); + } + // Handling fields with changed types + findModifiedTypeColumns( + tableId, + beforeSchema, + beforeColumnNames, + afterSchema, + result, + columns, + renameColumnMaps); + } + schemaMap.put(tableId, afterSchema); + } + + private void newDelColumns( + TableId tableId, + List beforeColumnNames, + List result, + List newDelColumnNames, + Map beforeColumnsOidMaps) { + newDelColumnNames.forEach( + e -> { + result.add(new DropColumnEvent(tableId, Collections.singletonList(e))); + beforeColumnNames.removeAll(newDelColumnNames); + beforeColumnsOidMaps.remove(e); + }); + } + + private void newAddColumns( + TableId tableId, + Schema afterSchema, + List result, + List columns, + List newAddColumnNames, + Map beforeColumnsOidMaps) { + List newAddColumns = + columns.stream() + .filter(e -> newAddColumnNames.contains(e.getName())) + .collect(Collectors.toList()); + if (newAddColumns.size() != newAddColumnNames.size()) { + List notInCurrentTableColumns = + newAddColumnNames.stream() + .filter(e -> !newAddColumns.contains(e)) + .collect(Collectors.toList()); + notInCurrentTableColumns.forEach( + e -> { + Field field = afterSchema.field(e); + DataType dataType = convertKafkaTypeToFlintType(field.schema()); + PhysicalColumn column = + new PhysicalColumn(e, dataType, field.schema().doc()); + result.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + column, + AddColumnEvent.ColumnPosition.LAST, + null)))); + }); + } + newAddColumns.forEach( + e -> { + SchemaChangeEvent event = + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + e, AddColumnEvent.ColumnPosition.LAST, null))); + result.add(event); + }); + Map newAddColumnsOidMaps = + PostgresSchemaUtils.getColumnOids(postgresSourceConfig, tableId, newAddColumnNames); + newAddColumnNames.forEach( + e -> { + beforeColumnsOidMaps.put(e, newAddColumnsOidMaps.getOrDefault(e, null)); + }); + } + + private static DataType convertKafkaTypeToFlintType(Schema kafkaSchema) { + switch (kafkaSchema.type()) { + case STRING: + return DataTypes.STRING(); + case INT8: + return DataTypes.TINYINT(); + case INT16: + return DataTypes.SMALLINT(); + case INT32: + return DataTypes.INT(); + case INT64: + return DataTypes.BIGINT(); + case FLOAT32: + return DataTypes.FLOAT(); + case FLOAT64: + return DataTypes.DOUBLE(); + case BOOLEAN: + return DataTypes.BOOLEAN(); + case BYTES: + return DataTypes.BYTES(); + case ARRAY: + Schema elementSchema = kafkaSchema.valueSchema(); + DataType elementType = convertKafkaTypeToFlintType(elementSchema); + return DataTypes.ARRAY(elementType); + case STRUCT: + Schema schema = kafkaSchema.valueSchema(); + DataType dataType = convertKafkaTypeToFlintType(schema); + return DataTypes.ROW(dataType); + case MAP: + return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + default: + return DataTypes.STRING(); + } + } + + private void findModifiedTypeColumns( + TableId tableId, + Schema oldSchema, + List oldColumnNames, + Schema afterSchema, + List result, + List columns, + Map renameColumnMaps) { + Map finalRenameColumnMaps = renameColumnMaps; + oldColumnNames.stream() + .forEach( + oldFieldName -> { + Field oldField = oldSchema.field(oldFieldName); + Field afterField = afterSchema.field(oldFieldName); + if (afterField == null) { + afterField = + afterSchema.field(finalRenameColumnMaps.get(oldFieldName)); + } + String afterFieldName = afterField.name(); + if (!oldField.schema() + .type() + .getName() + .equals(afterField.schema().type().getName()) + || (oldField.schema().defaultValue() != null + && afterField.schema().defaultValue() != null + && !oldField.schema() + .defaultValue() + .equals(afterField.schema().defaultValue())) + || (oldField.schema().parameters() != null + && afterField.schema().parameters() != null + && !oldField.schema() + .parameters() + .equals(afterField.schema().parameters()))) { + Map typeMapping = new HashMap<>(); + Column column = + columns.stream() + .filter(e -> e.getName().equals(afterFieldName)) + .findFirst() + .get(); + typeMapping.put(afterField.name(), column.getType()); + result.add(new AlterColumnTypeEvent(tableId, typeMapping)); + } + }); + } + + private Map getRenameColumnMaps( + TableId tableId, + List newAddColumnNames, + List newDelColumnNames, + Schema oldSchema, + Schema afterSchema, + Map beforeColumnsOidMaps) { + Map renameColumnMaps = new HashMap<>(); + Map newAddColumnsOidMaps = + PostgresSchemaUtils.getColumnOids(postgresSourceConfig, tableId, newAddColumnNames); + Map newDelColumnsOidMaps = new HashMap<>(); + + newDelColumnNames.forEach( + e -> { + if (beforeColumnsOidMaps.keySet().contains(e)) { + newDelColumnsOidMaps.put(e, beforeColumnsOidMaps.get(e)); + } else { + newDelColumnsOidMaps.put(e, null); + } + }); + + newAddColumnNames.forEach( + e -> { + if (!newAddColumnsOidMaps.keySet().contains(e)) { + newAddColumnsOidMaps.put(e, null); + } + }); + for (Map.Entry newDelEntry : newDelColumnsOidMaps.entrySet()) { + for (Map.Entry newAddEntry : newAddColumnsOidMaps.entrySet()) { + if ((newDelEntry.getValue() == null && newAddEntry.getValue() == null) + || (newDelEntry.getValue() == null && newAddEntry.getValue() != null)) { + int oldFieldIndex = oldSchema.field(newDelEntry.getKey()).index(); + int afterFieldIndex = afterSchema.field(newAddEntry.getKey()).index(); + if (oldFieldIndex == afterFieldIndex) { + renameColumnMaps.put(newDelEntry.getKey(), newAddEntry.getKey()); + } + } else if (newDelEntry.getValue().equals(newAddEntry.getValue())) { + renameColumnMaps.put(newDelEntry.getKey(), newAddEntry.getKey()); + } + } + } + return renameColumnMaps; + } + + public static List findAddedElements(List a, List b) { + Set setA = new HashSet<>(a); + Set setB = new HashSet<>(b); + setB.removeAll(setA); + return new ArrayList<>(setB); + } + + public static List findRemovedElements(List a, List b) { + Set setA = new HashSet<>(a); + Set setB = new HashSet<>(b); + setA.removeAll(setB); + return new ArrayList<>(setA); + } + @Override protected boolean isDataChangeRecord(SourceRecord record) { Schema valueSchema = record.valueSchema(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java index 93a7099ac39..9e4936fc86b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.util.FlinkRuntimeException; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresObjectUtils; @@ -40,6 +41,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -100,6 +102,64 @@ public static Schema getTableSchema(PostgresSourceConfig sourceConfig, TableId t } } + public static Map getColumnOids( + PostgresSourceConfig sourceConfig, TableId tableId, List columns) { + try (PostgresConnection jdbc = getPostgresDialect(sourceConfig).openJdbcConnection()) { + Map oidMaps = new HashMap<>(); + String inClause = + columns.stream() + .map(column -> "'" + column.replace("'", "''") + "'") + .collect(Collectors.joining(",", "in (", ")")); + try { + jdbc.query( + "SELECT a.attname attname,a.attnum as oid \n" + + "FROM pg_attribute a\n" + + "JOIN pg_class b ON a.attrelid = b.oid\n" + + "WHERE b.relname = '" + + tableId.getTableName() + + "' AND a.attname " + + inClause, + rs -> { + while (rs.next()) { + oidMaps.put(rs.getString(1), rs.getInt(2)); + } + }); + } catch (SQLException e) { + throw new FlinkRuntimeException(e); + } + return oidMaps; + } + } + + public static Map> getAllTablesColumnOids( + PostgresSourceConfig sourceConfig, List tableList) { + try (PostgresConnection jdbc = getPostgresDialect(sourceConfig).openJdbcConnection()) { + Map> tableOidMaps = new HashMap<>(); + String inClause = + tableList.stream() + .map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'") + .collect(Collectors.joining(",", "in (", ")")); + try { + jdbc.query( + "SELECT b.relname,a.attname attname,a.attnum AS oid FROM pg_attribute a JOIN pg_class b ON a.attrelid = b.oid WHERE b.relname " + + inClause + + " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum", + rs -> { + while (rs.next()) { + TableId tableId = TableId.tableId(rs.getString(1)); + if (tableOidMaps.get(tableId) == null) { + tableOidMaps.put(tableId, new HashMap<>()); + } + tableOidMaps.get(tableId).put(rs.getString(2), rs.getInt(3)); + } + }); + } catch (SQLException e) { + throw new FlinkRuntimeException(e); + } + return tableOidMaps; + } + } + public static PostgresDialect getPostgresDialect(PostgresSourceConfig sourceConfig) { String key = sourceConfig.getJdbcUrl(); return dialectCache.computeIfAbsent(key, k -> new PostgresDialect(sourceConfig)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java index b2aef9eed9c..5cc35232343 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java @@ -21,13 +21,18 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.factories.Factory; import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.source.FlinkSourceProvider; import org.apache.flink.cdc.common.types.DataType; @@ -59,6 +64,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,6 +163,278 @@ public void testInitialStartupMode() throws Exception { assertThat(inventoryDatabase.checkSlot(slotName)).isEqualTo(slotName); } + @ParameterizedTest(name = "testType: {0}") + @ValueSource( + strings = { + "modifyType", + "drop,add", + "rename", + "drop,add,rename", + "add,rename", + "add2column,rename2column", + "add,rename,add2column,rename2column", + "add,rename,drop,add" + }) + public void testPostgresSchemaEvolution(String testType) throws Exception { + inventoryDatabase.createAndInitialize(); + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGRES_CONTAINER.getHost()) + .port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList("inventory.products") + .startupOptions(StartupOptions.initial()) + .serverTimeZone("UTC"); + configFactory.database(inventoryDatabase.getDatabaseName()); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + configFactory.includeSchemaChanges(true); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new PostgresDataSource(configFactory).getEventSourceProvider(); + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + TableId tableId = TableId.tableId("inventory", "products"); + CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId); + + // generate snapshot data + List expectedSnapshot = getSnapshotExpected(tableId); + + // In this configuration, several subtasks might emit their corresponding CreateTableEvent + // to downstream. Since it is not possible to predict how many CreateTableEvents should we + // expect, we simply filter them out from expected sets, and assert there's at least one. + fetchResultsExcept(events, expectedSnapshot.size(), createTableEvent); + try (Connection conn = + getJdbcConnection(POSTGRES_CONTAINER, inventoryDatabase.getDatabaseName()); + Statement stmt = conn.createStatement()) { + stmt.execute( + "INSERT INTO inventory.products (name, description, weight) " + + "VALUES ('scooter', 'Small 2-wheel scooter', 3.14)"); + List actual; + List expected; + Map renameMap = new HashMap<>(); + switch (testType) { + case "modifyType": + stmt.execute( + "ALTER TABLE inventory.products ALTER COLUMN weight TYPE VARCHAR(50)"); + stmt.execute( + "INSERT INTO inventory.products (name,description, weight) " + + "VALUES ('football','A leather football', '0.45')"); + actual = fetchResults(events, 3); + expected = new ArrayList<>(); + expected.add( + new AlterColumnTypeEvent( + tableId, + Collections.singletonMap("weight", DataTypes.VARCHAR(50)))); + assertThat(actual).contains(expected.toArray(new Event[0])); + break; + case "drop,add": + stmt.execute("ALTER TABLE inventory.products DROP COLUMN name"); + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age varchar(100)"); + stmt.execute( + "INSERT INTO inventory.products ( description, weight,age) " + + "VALUES ('A leather football', 0.45,11)"); + actual = fetchResults(events, 4); + expected = new ArrayList<>(); + expected.add(new DropColumnEvent(tableId, Collections.singletonList("name"))); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age", DataTypes.VARCHAR(100)))))); + assertThat(actual).contains(expected.toArray(new Event[0])); + break; + case "rename": + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN weight TO weighta"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weighta) " + + "VALUES ('football','A leather football', 0.45)"); + actual = fetchResults(events, 3); + expected = new ArrayList<>(); + expected.add( + new RenameColumnEvent( + tableId, Collections.singletonMap("weight", "weighta"))); + assertThat(actual).contains(expected.toArray(new Event[0])); + break; + case "drop,add,rename": + stmt.execute("ALTER TABLE inventory.products DROP COLUMN name"); + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age varchar(100)"); + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN weight TO weighta"); + stmt.execute( + "INSERT INTO inventory.products ( description, weighta,age) " + + "VALUES ( 'A leather football', 0.45,11)"); + actual = fetchResults(events, 4); + expected = new ArrayList<>(); + expected.add( + new RenameColumnEvent( + tableId, Collections.singletonMap("weight", "weighta"))); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age", DataTypes.VARCHAR(100)))))); + expected.add(new DropColumnEvent(tableId, Collections.singletonList("name"))); + assertThat(actual).contains(expected.toArray(new Event[0])); + break; + case "add,rename": + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age varchar(100)"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age) " + + "VALUES ('football', 'A leather football', 0.45,11)"); + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN age TO age1"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age1) " + + "VALUES ('football2', 'A leather football', 0.45,12)"); + actual = fetchResults(events, 5); + expected = new ArrayList<>(); + expected.add( + new RenameColumnEvent( + tableId, Collections.singletonMap("age", "age1"))); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age", DataTypes.STRING()))))); + assertThat(actual).contains(expected.toArray(new Event[0])); + break; + case "add2column,rename2column": + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age varchar(100)"); + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age1 varchar(100)"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age,age1) " + + "VALUES ('football', 'A leather football', 0.45,11,11)"); + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN age TO agea"); + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN age1 TO age1a"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,agea,age1a) " + + "VALUES ('football2', 'A leather football', 0.45,12,12)"); + actual = fetchResults(events, 6); + expected = new ArrayList<>(); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age", DataTypes.STRING()))))); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age1", DataTypes.STRING()))))); + renameMap.put("age", "agea"); + renameMap.put("age1", "age1a"); + expected.add(new RenameColumnEvent(tableId, renameMap)); + assertThat(actual).contains(expected.toArray(new Event[0])); + break; + case "add,rename,add2column,rename2column": + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age varchar(100)"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age) " + + "VALUES ('football', 'A leather football', 0.45,11)"); + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN age TO age1"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age1) " + + "VALUES ('football2', 'A leather football', 0.45,12)"); + + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age2 varchar(100)"); + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age3 varchar(100)"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age1,age2,age3) " + + "VALUES ('football', 'A leather football', 0.45,11,11,11)"); + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN age2 TO age2a"); + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN age3 TO age3a"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age1,age2a,age3a) " + + "VALUES ('football2', 'A leather football', 0.45,12,12,12)"); + actual = fetchResults(events, 10); + expected = new ArrayList<>(); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age", DataTypes.STRING()))))); + + expected.add( + new RenameColumnEvent( + tableId, Collections.singletonMap("age", "age1"))); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age2", DataTypes.STRING()))))); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age3", DataTypes.STRING()))))); + renameMap = new HashMap<>(); + renameMap.put("age2", "age2a"); + renameMap.put("age3", "age3a"); + expected.add(new RenameColumnEvent(tableId, renameMap)); + assertThat(actual).contains(expected.toArray(new Event[0])); + break; + case "add,rename,drop,add": + stmt.execute("ALTER TABLE inventory.products ADD COLUMN age varchar(100)"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age) " + + "VALUES ('football', 'A leather football', 0.45,11)"); + stmt.execute("ALTER TABLE inventory.products RENAME COLUMN age TO age1"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,age1) " + + "VALUES ('football2', 'A leather football', 0.45,12)"); + stmt.execute("ALTER TABLE inventory.products DROP COLUMN age1"); + stmt.execute("ALTER TABLE inventory.products ADD COLUMN email varchar(100)"); + stmt.execute( + "INSERT INTO inventory.products (name, description, weight,email) " + + "VALUES ('football', 'A leather football', 0.45,'aaa')"); + actual = fetchResults(events, 7); + expected = new ArrayList<>(); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "age", DataTypes.STRING()))))); + expected.add( + new RenameColumnEvent( + tableId, Collections.singletonMap("age", "age1"))); + expected.add( + new RenameColumnEvent( + tableId, Collections.singletonMap("age1", "email"))); + + assertThat(actual).contains(expected.toArray(new Event[0])); + break; + default: + } + } + } + @Test public void testLatestOffsetStartupMode() throws Exception { inventoryDatabase.createAndInitialize(); @@ -600,6 +878,16 @@ private static List fetchResultsExcept(Iterator iter, int size, T side return result; } + private static List fetchResults(Iterator iter, int size) { + List result = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + T event = iter.next(); + result.add(event); + size--; + } + return result; + } + private List getSnapshotExpected(TableId tableId) { RowType rowType = RowType.of( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index 0f550ca96b3..bbd60e57469 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -99,6 +99,12 @@ public PostgresSourceBuilder schemaList(String... schemaList) { return this; } + /** Whether the {@link PostgresIncrementalSource} should output the schema changes or not. */ + public PostgresSourceBuilder includeSchemaChanges(boolean includeSchemaChanges) { + this.configFactory.includeSchemaChanges(includeSchemaChanges); + return this; + } + /** * An required list of regular expressions that match fully-qualified table identifiers for * tables to be monitored; any table not included in the list will be excluded from monitoring.