Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Set;

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_POSITION;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
Expand All @@ -54,7 +55,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/** Unit test for {@link org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser}. */
/** Unit test for {@link YamlPipelineDefinitionParser}. */
class YamlPipelineDefinitionParserTest {

@Test
Expand Down Expand Up @@ -206,6 +207,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception {
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
Expand All @@ -218,6 +220,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception {
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
Expand All @@ -230,6 +233,7 @@ void testSchemaEvolutionTypesConfiguration() throws Exception {
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
CREATE_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
Expand All @@ -238,14 +242,20 @@ void testSchemaEvolutionTypesConfiguration() throws Exception {
null,
null,
ImmutableSet.of(
ADD_COLUMN, ALTER_COLUMN_TYPE, CREATE_TABLE, DROP_COLUMN, RENAME_COLUMN));
ADD_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
CREATE_TABLE,
DROP_COLUMN,
RENAME_COLUMN));
testSchemaEvolutionTypesParsing(
"lenient",
null,
"[]",
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
Expand Down Expand Up @@ -532,6 +542,7 @@ void testParsingFullDefinitionFromString() throws Exception {
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Expand All @@ -558,6 +569,7 @@ void testParsingFullDefinitionFromString() throws Exception {
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Expand Down Expand Up @@ -644,6 +656,7 @@ void testParsingFullDefinitionFromString() throws Exception {
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A {@link SchemaChangeEvent} that represents column position changes in a table, such as {@code
* ALTER TABLE ... MODIFY COLUMN ... AFTER/BEFORE} DDL operations.
*/
@PublicEvolving
public class AlterColumnPositionEvent implements SchemaChangeEventWithPreSchema, SchemaChangeEvent {

private static final long serialVersionUID = 1L;

private final TableId tableId;

/** key => column name, value => new position information. */
private final Map<String, ColumnPosition> positionMapping;

/** key => column name, value => old position (0-based). */
private final Map<String, Integer> oldPositionMapping;

public AlterColumnPositionEvent(TableId tableId, Map<String, ColumnPosition> positionMapping) {
this.tableId = tableId;
this.positionMapping = positionMapping;
this.oldPositionMapping = new HashMap<>();
}

public AlterColumnPositionEvent(
TableId tableId,
Map<String, ColumnPosition> positionMapping,
Map<String, Integer> oldPositionMapping) {
this.tableId = tableId;
this.positionMapping = positionMapping;
this.oldPositionMapping = oldPositionMapping;
}

/** Returns the position mapping. */
public Map<String, ColumnPosition> getPositionMapping() {
return positionMapping;
}

/** Returns the old position mapping. */
public Map<String, Integer> getOldPositionMapping() {
return oldPositionMapping;
}

@Override
public TableId tableId() {
return tableId;
}

@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_COLUMN_POSITION;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new AlterColumnPositionEvent(newTableId, positionMapping, oldPositionMapping);
}

@Override
public boolean hasPreSchema() {
return !oldPositionMapping.isEmpty();
}

@Override
public void fillPreSchema(Schema oldSchema) {
oldPositionMapping.clear();
List<Column> columns = oldSchema.getColumns();
for (int i = 0; i < columns.size(); i++) {
String columnName = columns.get(i).getName();
if (positionMapping.containsKey(columnName)) {
oldPositionMapping.put(columnName, i);
}
}
}

@Override
public boolean trimRedundantChanges() {
if (hasPreSchema()) {
Set<String> redundantChanges =
positionMapping.entrySet().stream()
.filter(
entry -> {
String columnName = entry.getKey();
ColumnPosition newPos = entry.getValue();
Integer oldPos = oldPositionMapping.get(columnName);
return oldPos != null
&& oldPos.equals(newPos.getAbsolutePosition());
})
.map(Map.Entry::getKey)
.collect(Collectors.toSet());

positionMapping.keySet().removeAll(redundantChanges);
oldPositionMapping.keySet().removeAll(redundantChanges);
}
return !positionMapping.isEmpty();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AlterColumnPositionEvent)) {
return false;
}
AlterColumnPositionEvent that = (AlterColumnPositionEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(positionMapping, that.positionMapping)
&& Objects.equals(oldPositionMapping, that.oldPositionMapping);
}

@Override
public int hashCode() {
return Objects.hash(tableId, positionMapping, oldPositionMapping);
}

@Override
public String toString() {
if (hasPreSchema()) {
return "AlterColumnPositionEvent{"
+ "tableId="
+ tableId
+ ", positionMapping="
+ positionMapping
+ ", oldPositionMapping="
+ oldPositionMapping
+ '}';
} else {
return "AlterColumnPositionEvent{"
+ "tableId="
+ tableId
+ ", positionMapping="
+ positionMapping
+ '}';
}
}

/** Represents the position information for a column. */
public static class ColumnPosition implements Serializable {

private static final long serialVersionUID = 1L;

private final int absolutePosition; // 0-based absolute position
private final @Nullable String afterColumn; // relative position: after this column
private final boolean isFirst; // whether to move to first position

/** Create position for first column. */
public static ColumnPosition first() {
return new ColumnPosition(0, null, true);
}

/** Create position after specified column. */
public static ColumnPosition after(String columnName) {
return new ColumnPosition(-1, columnName, false);
}

/** Create position at absolute index. */
public static ColumnPosition at(int position) {
return new ColumnPosition(position, null, false);
}

private ColumnPosition(
int absolutePosition, @Nullable String afterColumn, boolean isFirst) {
this.absolutePosition = absolutePosition;
this.afterColumn = afterColumn;
this.isFirst = isFirst;
}

public int getAbsolutePosition() {
return absolutePosition;
}

@Nullable
public String getAfterColumn() {
return afterColumn;
}

public boolean isFirst() {
return isFirst;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (!(o instanceof ColumnPosition)) {
return false;
}

ColumnPosition that = (ColumnPosition) o;
return absolutePosition == that.absolutePosition
&& isFirst == that.isFirst
&& Objects.equals(afterColumn, that.afterColumn);
}

@Override
public int hashCode() {
return Objects.hash(absolutePosition, afterColumn, isFirst);
}

@Override
public String toString() {
return "ColumnPosition{"
+ "absolutePosition="
+ absolutePosition
+ ", afterColumn='"
+ afterColumn
+ '\''
+ ", isFirst="
+ isFirst
+ '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public enum SchemaChangeEventType {
DROP_COLUMN("drop.column"),
DROP_TABLE("drop.table"),
RENAME_COLUMN("rename.column"),
TRUNCATE_TABLE("truncate.table");
TRUNCATE_TABLE("truncate.table"),
ALTER_COLUMN_POSITION("alter.column.position"),
ALTER_COLUMN_COMMENT("alter.column.comment");

private final String tag;

Expand All @@ -55,6 +57,8 @@ public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
return RENAME_COLUMN;
} else if (event instanceof TruncateTableEvent) {
return TRUNCATE_TABLE;
} else if (event instanceof AlterColumnPositionEvent) {
return ALTER_COLUMN_POSITION;
} else {
throw new RuntimeException("Unknown schema change event type: " + event.getClass());
}
Expand All @@ -76,6 +80,10 @@ public static SchemaChangeEventType ofTag(String tag) {
return RENAME_COLUMN;
case "truncate.table":
return TRUNCATE_TABLE;
case "alter.column.position":
return ALTER_COLUMN_POSITION;
case "alter.column.comment":
return ALTER_COLUMN_COMMENT;
default:
throw new RuntimeException("Unknown schema change event type: " + tag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.common.annotation.PublicEvolving;

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_POSITION;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
Expand All @@ -36,7 +37,7 @@ public class SchemaChangeEventTypeFamily {

public static final SchemaChangeEventType[] ADD = {ADD_COLUMN};

public static final SchemaChangeEventType[] ALTER = {ALTER_COLUMN_TYPE};
public static final SchemaChangeEventType[] ALTER = {ALTER_COLUMN_TYPE, ALTER_COLUMN_POSITION};

public static final SchemaChangeEventType[] CREATE = {CREATE_TABLE};

Expand All @@ -47,12 +48,13 @@ public class SchemaChangeEventTypeFamily {
public static final SchemaChangeEventType[] TABLE = {CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE};

public static final SchemaChangeEventType[] COLUMN = {
ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN
ADD_COLUMN, ALTER_COLUMN_TYPE, ALTER_COLUMN_POSITION, DROP_COLUMN, RENAME_COLUMN
};

public static final SchemaChangeEventType[] ALL = {
ADD_COLUMN,
ALTER_COLUMN_TYPE,
ALTER_COLUMN_POSITION,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
Expand Down
Loading
Loading