diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index bf4c377ee2c..ee21219ee79 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -44,6 +44,7 @@ import java.util.Set; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_POSITION; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; @@ -54,7 +55,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; -/** Unit test for {@link org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser}. */ +/** Unit test for {@link YamlPipelineDefinitionParser}. */ class YamlPipelineDefinitionParserTest { @Test @@ -206,6 +207,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception { ImmutableSet.of( ADD_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, CREATE_TABLE, DROP_COLUMN, DROP_TABLE, @@ -218,6 +220,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception { ImmutableSet.of( ADD_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, CREATE_TABLE, DROP_COLUMN, DROP_TABLE, @@ -230,6 +233,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception { ImmutableSet.of( ADD_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, CREATE_TABLE, RENAME_COLUMN, TRUNCATE_TABLE)); @@ -238,7 +242,12 @@ void testSchemaEvolutionTypesConfiguration() throws Exception { null, null, ImmutableSet.of( - ADD_COLUMN, ALTER_COLUMN_TYPE, CREATE_TABLE, DROP_COLUMN, RENAME_COLUMN)); + ADD_COLUMN, + ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, + CREATE_TABLE, + DROP_COLUMN, + RENAME_COLUMN)); testSchemaEvolutionTypesParsing( "lenient", null, @@ -246,6 +255,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception { ImmutableSet.of( ADD_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, CREATE_TABLE, DROP_COLUMN, DROP_TABLE, @@ -532,6 +542,7 @@ void testParsingFullDefinitionFromString() throws Exception { ImmutableSet.of( DROP_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, ADD_COLUMN, CREATE_TABLE, RENAME_COLUMN)), @@ -558,6 +569,7 @@ void testParsingFullDefinitionFromString() throws Exception { ImmutableSet.of( DROP_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, ADD_COLUMN, CREATE_TABLE, RENAME_COLUMN)), @@ -644,6 +656,7 @@ void testParsingFullDefinitionFromString() throws Exception { ImmutableSet.of( DROP_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, ADD_COLUMN, CREATE_TABLE, RENAME_COLUMN)), diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnPositionEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnPositionEvent.java new file mode 100644 index 00000000000..a3d29f36e6a --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnPositionEvent.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A {@link SchemaChangeEvent} that represents column position changes in a table, such as {@code + * ALTER TABLE ... MODIFY COLUMN ... AFTER/BEFORE} DDL operations. + */ +@PublicEvolving +public class AlterColumnPositionEvent implements SchemaChangeEventWithPreSchema, SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + /** key => column name, value => new position information. */ + private final Map positionMapping; + + /** key => column name, value => old position (0-based). */ + private final Map oldPositionMapping; + + public AlterColumnPositionEvent(TableId tableId, Map positionMapping) { + this.tableId = tableId; + this.positionMapping = positionMapping; + this.oldPositionMapping = new HashMap<>(); + } + + public AlterColumnPositionEvent( + TableId tableId, + Map positionMapping, + Map oldPositionMapping) { + this.tableId = tableId; + this.positionMapping = positionMapping; + this.oldPositionMapping = oldPositionMapping; + } + + /** Returns the position mapping. */ + public Map getPositionMapping() { + return positionMapping; + } + + /** Returns the old position mapping. */ + public Map getOldPositionMapping() { + return oldPositionMapping; + } + + @Override + public TableId tableId() { + return tableId; + } + + @Override + public SchemaChangeEventType getType() { + return SchemaChangeEventType.ALTER_COLUMN_POSITION; + } + + @Override + public SchemaChangeEvent copy(TableId newTableId) { + return new AlterColumnPositionEvent(newTableId, positionMapping, oldPositionMapping); + } + + @Override + public boolean hasPreSchema() { + return !oldPositionMapping.isEmpty(); + } + + @Override + public void fillPreSchema(Schema oldSchema) { + oldPositionMapping.clear(); + List columns = oldSchema.getColumns(); + for (int i = 0; i < columns.size(); i++) { + String columnName = columns.get(i).getName(); + if (positionMapping.containsKey(columnName)) { + oldPositionMapping.put(columnName, i); + } + } + } + + @Override + public boolean trimRedundantChanges() { + if (hasPreSchema()) { + Set redundantChanges = + positionMapping.entrySet().stream() + .filter( + entry -> { + String columnName = entry.getKey(); + ColumnPosition newPos = entry.getValue(); + Integer oldPos = oldPositionMapping.get(columnName); + return oldPos != null + && oldPos.equals(newPos.getAbsolutePosition()); + }) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + positionMapping.keySet().removeAll(redundantChanges); + oldPositionMapping.keySet().removeAll(redundantChanges); + } + return !positionMapping.isEmpty(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AlterColumnPositionEvent)) { + return false; + } + AlterColumnPositionEvent that = (AlterColumnPositionEvent) o; + return Objects.equals(tableId, that.tableId) + && Objects.equals(positionMapping, that.positionMapping) + && Objects.equals(oldPositionMapping, that.oldPositionMapping); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, positionMapping, oldPositionMapping); + } + + @Override + public String toString() { + if (hasPreSchema()) { + return "AlterColumnPositionEvent{" + + "tableId=" + + tableId + + ", positionMapping=" + + positionMapping + + ", oldPositionMapping=" + + oldPositionMapping + + '}'; + } else { + return "AlterColumnPositionEvent{" + + "tableId=" + + tableId + + ", positionMapping=" + + positionMapping + + '}'; + } + } + + /** Represents the position information for a column. */ + public static class ColumnPosition implements Serializable { + + private static final long serialVersionUID = 1L; + + private final int absolutePosition; // 0-based absolute position + private final @Nullable String afterColumn; // relative position: after this column + private final boolean isFirst; // whether to move to first position + + /** Create position for first column. */ + public static ColumnPosition first() { + return new ColumnPosition(0, null, true); + } + + /** Create position after specified column. */ + public static ColumnPosition after(String columnName) { + return new ColumnPosition(-1, columnName, false); + } + + /** Create position at absolute index. */ + public static ColumnPosition at(int position) { + return new ColumnPosition(position, null, false); + } + + private ColumnPosition( + int absolutePosition, @Nullable String afterColumn, boolean isFirst) { + this.absolutePosition = absolutePosition; + this.afterColumn = afterColumn; + this.isFirst = isFirst; + } + + public int getAbsolutePosition() { + return absolutePosition; + } + + @Nullable + public String getAfterColumn() { + return afterColumn; + } + + public boolean isFirst() { + return isFirst; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof ColumnPosition)) { + return false; + } + + ColumnPosition that = (ColumnPosition) o; + return absolutePosition == that.absolutePosition + && isFirst == that.isFirst + && Objects.equals(afterColumn, that.afterColumn); + } + + @Override + public int hashCode() { + return Objects.hash(absolutePosition, afterColumn, isFirst); + } + + @Override + public String toString() { + return "ColumnPosition{" + + "absolutePosition=" + + absolutePosition + + ", afterColumn='" + + afterColumn + + '\'' + + ", isFirst=" + + isFirst + + '}'; + } + } +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java index bbe4b415c6a..121e590116b 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java @@ -28,7 +28,9 @@ public enum SchemaChangeEventType { DROP_COLUMN("drop.column"), DROP_TABLE("drop.table"), RENAME_COLUMN("rename.column"), - TRUNCATE_TABLE("truncate.table"); + TRUNCATE_TABLE("truncate.table"), + ALTER_COLUMN_POSITION("alter.column.position"), + ALTER_COLUMN_COMMENT("alter.column.comment"); private final String tag; @@ -55,6 +57,8 @@ public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) { return RENAME_COLUMN; } else if (event instanceof TruncateTableEvent) { return TRUNCATE_TABLE; + } else if (event instanceof AlterColumnPositionEvent) { + return ALTER_COLUMN_POSITION; } else { throw new RuntimeException("Unknown schema change event type: " + event.getClass()); } @@ -76,6 +80,10 @@ public static SchemaChangeEventType ofTag(String tag) { return RENAME_COLUMN; case "truncate.table": return TRUNCATE_TABLE; + case "alter.column.position": + return ALTER_COLUMN_POSITION; + case "alter.column.comment": + return ALTER_COLUMN_COMMENT; default: throw new RuntimeException("Unknown schema change event type: " + tag); } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java index c1adfd71618..0d7b515d2c1 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventTypeFamily.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_POSITION; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; @@ -36,7 +37,7 @@ public class SchemaChangeEventTypeFamily { public static final SchemaChangeEventType[] ADD = {ADD_COLUMN}; - public static final SchemaChangeEventType[] ALTER = {ALTER_COLUMN_TYPE}; + public static final SchemaChangeEventType[] ALTER = {ALTER_COLUMN_TYPE, ALTER_COLUMN_POSITION}; public static final SchemaChangeEventType[] CREATE = {CREATE_TABLE}; @@ -47,12 +48,13 @@ public class SchemaChangeEventTypeFamily { public static final SchemaChangeEventType[] TABLE = {CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE}; public static final SchemaChangeEventType[] COLUMN = { - ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN + ADD_COLUMN, ALTER_COLUMN_TYPE, ALTER_COLUMN_POSITION, DROP_COLUMN, RENAME_COLUMN }; public static final SchemaChangeEventType[] ALL = { ADD_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, CREATE_TABLE, DROP_COLUMN, DROP_TABLE, diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnPositionEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnPositionEventVisitor.java new file mode 100644 index 00000000000..b3b8698ebdb --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/AlterColumnPositionEventVisitor.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.event.visitor; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.AlterColumnPositionEvent; + +/** Visitor for {@link AlterColumnPositionEvent}. */ +@Internal +public interface AlterColumnPositionEventVisitor { + T visit(AlterColumnPositionEvent event) throws E; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java index c905c8dee4f..19e86277ef3 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/visitor/SchemaChangeEventVisitor.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnPositionEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; @@ -34,6 +35,7 @@ public static T visit( SchemaChangeEvent event, AddColumnEventVisitor addColumnVisitor, AlterColumnTypeEventVisitor alterColumnTypeEventVisitor, + AlterColumnPositionEventVisitor alterColumnPositionEventVisitor, CreateTableEventVisitor createTableEventVisitor, DropColumnEventVisitor dropColumnEventVisitor, DropTableEventVisitor dropTableEventVisitor, @@ -50,6 +52,11 @@ public static T visit( return null; } return alterColumnTypeEventVisitor.visit((AlterColumnTypeEvent) event); + } else if (event instanceof AlterColumnPositionEvent) { + if (alterColumnPositionEventVisitor == null) { + return null; + } + return alterColumnPositionEventVisitor.visit((AlterColumnPositionEvent) event); } else if (event instanceof CreateTableEvent) { if (createTableEventVisitor == null) { return null; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java index 483752ce92e..60e71fef3e9 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnPositionEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; @@ -77,6 +78,11 @@ public static SchemaChangeEvent recreateSchemaChangeEvent( tableId, alterColumnEvent.getTypeMapping(), alterColumnEvent.getOldTypeMapping()), + alterColumnPositionEvent -> + new AlterColumnPositionEvent( + tableId, + alterColumnPositionEvent.getPositionMapping(), + alterColumnPositionEvent.getOldPositionMapping()), createTableEvent -> new CreateTableEvent(tableId, createTableEvent.getSchema()), dropColumnEvent -> new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames()), diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 131a783f551..478383e636b 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnPositionEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; @@ -111,6 +112,8 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve event, addColumnEvent -> applyAddColumnEvent(addColumnEvent, schema), alterColumnTypeEvent -> applyAlterColumnTypeEvent(alterColumnTypeEvent, schema), + alterColumnPositionEvent -> + applyAlterColumnPositionEvent(alterColumnPositionEvent, schema), createTableEvent -> createTableEvent.getSchema(), dropColumnEvent -> applyDropColumnEvent(dropColumnEvent, schema), dropTableEvent -> schema, @@ -211,6 +214,61 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche return oldSchema.copy(columns); } + private static Schema applyAlterColumnPositionEvent( + AlterColumnPositionEvent event, Schema oldSchema) { + List columns = new ArrayList<>(oldSchema.getColumns()); + + for (Map.Entry entry : + event.getPositionMapping().entrySet()) { + String columnName = entry.getKey(); + AlterColumnPositionEvent.ColumnPosition position = entry.getValue(); + + // Find and remove the column to be repositioned + Column targetColumn = null; + int oldIndex = -1; + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equals(columnName)) { + targetColumn = columns.get(i); + oldIndex = i; + break; + } + } + + if (targetColumn == null) { + throw new IllegalArgumentException("Column " + columnName + " not found in schema"); + } + + columns.remove(oldIndex); + + // Calculate new position and insert + int newIndex; + if (position.isFirst()) { + newIndex = 0; + } else if (position.getAfterColumn() != null) { + // Find position after specified column + int afterIndex = -1; + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equals(position.getAfterColumn())) { + afterIndex = i; + break; + } + } + if (afterIndex < 0) { + throw new IllegalArgumentException( + "Reference column " + position.getAfterColumn() + " not found"); + } + newIndex = afterIndex + 1; + } else { + // Use absolute position + newIndex = Math.min(position.getAbsolutePosition(), columns.size()); + } + + columns.add(newIndex, targetColumn); + } + + return oldSchema.copy(columns); + } + /** * This function determines if the given schema change event {@code event} should be sent to * downstream based on if the given transform rule has asterisk, and what columns are @@ -347,6 +405,63 @@ public static boolean isSchemaChangeEventRedundant( } return true; }, + alterColumnPositionEvent -> { + // It has not been applied if schema does not even exist + if (!latestSchema.isPresent()) { + return false; + } + Schema schema = latestSchema.get(); + List columns = schema.getColumns(); + + // Check if all columns are already in their expected positions + for (Map.Entry entry : + alterColumnPositionEvent.getPositionMapping().entrySet()) { + String columnName = entry.getKey(); + AlterColumnPositionEvent.ColumnPosition expectedPosition = + entry.getValue(); + + // Find current position of the column + int currentIndex = -1; + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equals(columnName)) { + currentIndex = i; + break; + } + } + + if (currentIndex < 0) { + return false; // Column not found + } + + // Calculate expected position + int expectedIndex; + if (expectedPosition.isFirst()) { + expectedIndex = 0; + } else if (expectedPosition.getAfterColumn() != null) { + int afterIndex = -1; + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i) + .getName() + .equals(expectedPosition.getAfterColumn())) { + afterIndex = i; + break; + } + } + if (afterIndex < 0) { + return false; // Reference column not found + } + expectedIndex = afterIndex + 1; + } else { + expectedIndex = expectedPosition.getAbsolutePosition(); + } + + // Check if column is already at expected position + if (currentIndex != expectedIndex) { + return false; + } + } + return true; + }, createTableEvent -> { // It has been applied if such table already exists return latestSchema.isPresent(); diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java index 76d32db5128..a7af3595384 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_POSITION; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; @@ -37,7 +38,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE; import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat; -/** A test for the {@link org.apache.flink.cdc.common.utils.ChangeEventUtils}. */ +/** A test for the {@link ChangeEventUtils}. */ class ChangeEventUtilsTest { @Test void testResolveSchemaEvolutionOptions() { @@ -54,6 +55,7 @@ void testResolveSchemaEvolutionOptions() { CREATE_TABLE, DROP_TABLE, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, ADD_COLUMN, DROP_COLUMN)); @@ -64,6 +66,7 @@ void testResolveSchemaEvolutionOptions() { Sets.set( ADD_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, RENAME_COLUMN, CREATE_TABLE, TRUNCATE_TABLE)); @@ -77,7 +80,12 @@ void testResolveSchemaEvolutionOptions() { ChangeEventUtils.resolveSchemaEvolutionOptions( Collections.singletonList("column"), Collections.singletonList("drop.column"))) - .isEqualTo(Sets.set(ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN)); + .isEqualTo( + Sets.set( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, + RENAME_COLUMN)); assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( @@ -89,6 +97,7 @@ void testResolveSchemaEvolutionOptions() { TRUNCATE_TABLE, RENAME_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, CREATE_TABLE)); } @@ -99,6 +108,7 @@ void testResolveSchemaEvolutionTag() { Arrays.asList( ADD_COLUMN, ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, CREATE_TABLE, DROP_COLUMN, DROP_TABLE, @@ -107,7 +117,12 @@ void testResolveSchemaEvolutionTag() { assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column")) .isEqualTo( - Arrays.asList(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN)); + Arrays.asList( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + ALTER_COLUMN_POSITION, + DROP_COLUMN, + RENAME_COLUMN)); assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table")) .isEqualTo(Arrays.asList(CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE)); @@ -128,7 +143,7 @@ void testResolveSchemaEvolutionTag() { .isEqualTo(Collections.singletonList(CREATE_TABLE)); assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter")) - .isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE)); + .isEqualTo(Arrays.asList(ALTER_COLUMN_TYPE, ALTER_COLUMN_POSITION)); assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type")) .isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE)); @@ -138,5 +153,8 @@ void testResolveSchemaEvolutionTag() { assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column")) .isEqualTo(Collections.singletonList(ADD_COLUMN)); + + assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.position")) + .isEqualTo(Collections.singletonList(ALTER_COLUMN_POSITION)); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index fdf4b53669b..185575af909 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -121,6 +121,9 @@ public void applySchemaChange(SchemaChangeEvent event) { applyAlterColumnTypeEvent(alterColumnTypeEvent); return null; }, + alterColumnPositionEvent -> { + return null; + }, createTableEvent -> { applyCreateTableEvent(createTableEvent); return null; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java index d9d91328605..9cb3c1a509b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java @@ -109,6 +109,9 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) applyAlterColumnType(alterColumnTypeEvent); return null; }, + alterColumnPositionEvent -> { + return null; + }, createTableEvent -> { applyCreateTable(createTableEvent); return null; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 82f5be7d976..f41f45fd525 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.mysql.source.parser; import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnPositionEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; @@ -362,7 +363,6 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) } Map typeMapping = new HashMap<>(); - typeMapping.put(oldColumnName, fromDbzColumn(column, tinyInt1isBit)); changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); @@ -371,6 +371,29 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) renameMap.put(oldColumnName, newColumnName); changes.add(new RenameColumnEvent(currentTable, renameMap)); } + + // Check for position changes + String targetColumnName = newColumnName != null ? newColumnName : oldColumnName; + if (ctx.FIRST() != null) { + Map positionMapping = + new HashMap<>(); + positionMapping.put( + targetColumnName, AlterColumnPositionEvent.ColumnPosition.first()); + changes.add(new AlterColumnPositionEvent(currentTable, positionMapping)); + } else if (ctx.AFTER() != null) { + String afterColumn = + parser.parseName(ctx.uid(2)); // ctx.uid(2) for CHANGE COLUMN + if (isTableIdCaseInsensitive) { + afterColumn = afterColumn.toLowerCase(Locale.ROOT); + } + Map positionMapping = + new HashMap<>(); + positionMapping.put( + targetColumnName, + AlterColumnPositionEvent.ColumnPosition.after(afterColumn)); + changes.add(new AlterColumnPositionEvent(currentTable, positionMapping)); + } + listeners.remove(columnDefinitionListener); }, columnDefinitionListener); @@ -416,13 +439,36 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) parser.runIfNotNull( () -> { Column column = columnDefinitionListener.getColumn(); - Map typeMapping = new HashMap<>(); - typeMapping.put( + String columnName = isTableIdCaseInsensitive ? column.name().toLowerCase(Locale.ROOT) - : column.name(), - fromDbzColumn(column, tinyInt1isBit)); + : column.name(); + + // Generate AlterColumnTypeEvent for type changes + Map typeMapping = new HashMap<>(); + typeMapping.put(columnName, fromDbzColumn(column, tinyInt1isBit)); changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + + // Check for position changes + if (ctx.FIRST() != null) { + Map positionMapping = + new HashMap<>(); + positionMapping.put( + columnName, AlterColumnPositionEvent.ColumnPosition.first()); + changes.add(new AlterColumnPositionEvent(currentTable, positionMapping)); + } else if (ctx.AFTER() != null) { + String afterColumn = parser.parseName(ctx.uid(1)); + if (isTableIdCaseInsensitive) { + afterColumn = afterColumn.toLowerCase(Locale.ROOT); + } + Map positionMapping = + new HashMap<>(); + positionMapping.put( + columnName, + AlterColumnPositionEvent.ColumnPosition.after(afterColumn)); + changes.add(new AlterColumnPositionEvent(currentTable, positionMapping)); + } + listeners.remove(columnDefinitionListener); }, columnDefinitionListener); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java index 65aa715194f..4ccb1178ab8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplier.java @@ -75,6 +75,9 @@ public void applySchemaChange(SchemaChangeEvent event) { applyAlterColumnTypeEvent(alterColumnTypeEvent); return null; }, + alterColumnPositionEvent -> { + return null; + }, createTableEvent -> { applyCreateTableEvent(createTableEvent); return null; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 51e0fb0ed0e..9a4e399cca1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.paimon.sink; import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnPositionEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; @@ -112,7 +113,8 @@ public Set getSupportedSchemaEvolutionTypes() { SchemaChangeEventType.ADD_COLUMN, SchemaChangeEventType.DROP_COLUMN, SchemaChangeEventType.RENAME_COLUMN, - SchemaChangeEventType.ALTER_COLUMN_TYPE); + SchemaChangeEventType.ALTER_COLUMN_TYPE, + SchemaChangeEventType.ALTER_COLUMN_POSITION); } @Override @@ -131,6 +133,10 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) applyAlterColumnType(alterColumnTypeEvent); return null; }, + alterColumnPositionEvent -> { + applyAlterColumnPosition(alterColumnPositionEvent); + return null; + }, createTableEvent -> { applyCreateTable(createTableEvent); return null; @@ -341,6 +347,43 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv } } + private void applyAlterColumnPosition(AlterColumnPositionEvent event) + throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + Table table = catalog.getTable(tableIdToIdentifier(event)); + List columnNames = table.rowType().getFieldNames(); + + for (Map.Entry entry : + event.getPositionMapping().entrySet()) { + String columnName = entry.getKey(); + AlterColumnPositionEvent.ColumnPosition position = entry.getValue(); + + SchemaChange.Move move; + if (position.isFirst()) { + move = SchemaChange.Move.first(columnName); + } else if (position.getAfterColumn() != null) { + move = SchemaChange.Move.after(columnName, position.getAfterColumn()); + } else { + int targetIndex = + Math.min(position.getAbsolutePosition(), columnNames.size() - 1); + if (targetIndex == 0) { + move = SchemaChange.Move.first(columnName); + } else { + move = + SchemaChange.Move.after( + columnName, columnNames.get(targetIndex - 1)); + } + } + tableChangeList.add(SchemaChange.updateColumnPosition(move)); + } + + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Exception e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + private void applyTruncateTable(TruncateTableEvent event) throws SchemaEvolveException { try { Table table = catalog.getTable(tableIdToIdentifier(event)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java index b0da50c84c1..a3580488192 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java @@ -111,6 +111,9 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) applyAlterColumnType(alterColumnTypeEvent); return null; }, + alterColumnPositionEvent -> { + return null; + }, createTableEvent -> { applyCreateTable(createTableEvent); return null; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java index 7a5da9c7231..ae969c767db 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/SchemaChangeEventSerializer.java @@ -32,6 +32,7 @@ import java.io.IOException; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_POSITION; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; @@ -81,6 +82,7 @@ public SchemaChangeEvent copy(SchemaChangeEvent from) { from, AddColumnEventSerializer.INSTANCE::copy, AlterColumnTypeEventSerializer.INSTANCE::copy, + AlterColumnPositionEventSerializer.INSTANCE::copy, CreateTableEventSerializer.INSTANCE::copy, DropColumnEventSerializer.INSTANCE::copy, DropTableEventSerializer.INSTANCE::copy, @@ -113,6 +115,12 @@ public void serialize(SchemaChangeEvent record, DataOutputView target) throws IO AlterColumnTypeEventSerializer.INSTANCE.serialize(alterColumnTypeEvent, target); return null; }, + alterColumnPositionEvent -> { + enumSerializer.serialize(ALTER_COLUMN_POSITION, target); + AlterColumnPositionEventSerializer.INSTANCE.serialize( + alterColumnPositionEvent, target); + return null; + }, createTableEvent -> { enumSerializer.serialize(CREATE_TABLE, target); CreateTableEventSerializer.INSTANCE.serialize(createTableEvent, target); @@ -154,6 +162,8 @@ public SchemaChangeEvent deserialize(DataInputView source) throws IOException { return RenameColumnEventSerializer.INSTANCE.deserialize(source); case ALTER_COLUMN_TYPE: return AlterColumnTypeEventSerializer.INSTANCE.deserialize(source); + case ALTER_COLUMN_POSITION: + return AlterColumnPositionEventSerializer.INSTANCE.deserialize(source); case DROP_TABLE: return DropTableEventSerializer.INSTANCE.deserialize(source); case TRUNCATE_TABLE: diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java index 740c77bd49d..a0be429cffb 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnPositionEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; @@ -347,6 +348,62 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), harness.clearOutputRecords(); } + // Test AlterColumnPositionEvent + { + List alterColumnPositionEvents = + Arrays.asList( + new AlterColumnPositionEvent( + tableId, + ImmutableMap.of("toshi",AlterColumnPositionEvent.ColumnPosition.after("id")), + ImmutableMap.of("toshi",2)), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 14, FLOAT, 23f, STRING, "David")), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 15, FLOAT, 30f, STRING, "Emma"))); + + processEvent(schemaOperator, alterColumnPositionEvents); + + List collect = harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList()); + + + List union = ListUtils.union( + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ALTER_COLUMN_POSITION)), + alterColumnPositionEvents); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ALTER_COLUMN_POSITION)), + alterColumnPositionEvents)); + + Schema schemaV6 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("toshi", FLOAT) + .physicalColumn("namae", STRING) + .primaryKey("id") + .build(); + Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV6); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV6); + + harness.clearOutputRecords(); + } + harness.close(); }