Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.flink.cdc.connectors.fluss.sink;

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer;
Expand All @@ -43,19 +46,19 @@
import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.APPEND;
import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.DELETE;
import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.UPSERT;
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue;
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameSchemaIgnoreCommentAndDefaultValue;
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussSchema;

/** Serialization schema that converts a CDC data record to a Fluss event. */
public class FlussEventSerializationSchema implements FlussEventSerializer<Event> {
private static final long serialVersionUID = 1L;

private transient Map<TableId, TableSchemaInfo> tableInfoMap;
private transient Map<TableId, TableSchemaInfo> schemaMaps;
private transient Connection connection;

@Override
public void open(Connection connection) {
this.tableInfoMap = new HashMap<>();
this.schemaMaps = new HashMap<>();
this.connection = connection;
}

Expand All @@ -82,29 +85,45 @@ private void applySchemaChangeEvent(SchemaChangeEvent event) {
org.apache.flink.cdc.common.schema.Schema newSchema =
((CreateTableEvent) event).getSchema();
// if the table is not exist or the schema is changed, update the table info.
if (!tableInfoMap.containsKey(tableId)
|| !sameCdcColumnsIgnoreCommentAndDefaultValue(
tableInfoMap.get(tableId).upstreamCdcSchema, newSchema)) {
if (!schemaMaps.containsKey(tableId)
|| !sameSchemaIgnoreCommentAndDefaultValue(
schemaMaps.get(tableId).upstreamCdcSchema, newSchema)) {
Table table = connection.getTable(getTablePath(tableId));
TableSchemaInfo newSchemaInfo =
new TableSchemaInfo(newSchema, table.getTableInfo().getSchema());
tableInfoMap.put(tableId, newSchemaInfo);
schemaMaps.put(tableId, newSchemaInfo);
}
} else if (event instanceof AddColumnEvent) {
TableSchemaInfo schemaInfo = schemaMaps.get(event.tableId());
if (schemaInfo == null) {
throw new IllegalStateException(
"Cannot apply AddColumnEvent for table "
+ event.tableId()
+ ": table schema not found. Ensure CreateTableEvent is processed before AddColumnEvent.");
}
Schema schema = schemaInfo.upstreamCdcSchema;
if (!SchemaUtils.isSchemaChangeEventRedundant(schema, event)) {
Table table = connection.getTable(getTablePath(tableId));
TableSchemaInfo newSchemaInfo =
new TableSchemaInfo(
SchemaUtils.applySchemaChangeEvent(schema, event),
table.getTableInfo().getSchema());
schemaMaps.put(tableId, newSchemaInfo);
}
} else {
// TODO: Logics for altering tables are not supported yet.
// This is anticipated to be supported in Fluss version 0.8.0.
throw new RuntimeException(
"Schema change type not supported. Only CreateTableEvent is allowed at the moment.");
throw new UnsupportedOperationException(
String.format(
"Schema change type %s not supported. Only CreateTableEvent and AddColumnEvent are allowed at the moment.",
event.getClass()));
}
}

private FlussRowWithOp applyDataChangeEvent(DataChangeEvent record) {
OperationType op = record.op();
TableSchemaInfo tableSchemaInfo = tableInfoMap.get(record.tableId());
TableSchemaInfo tableSchemaInfo = schemaMaps.get(record.tableId());
Preconditions.checkNotNull(
tableSchemaInfo, "Table schema not found for table " + record.tableId());
int flussFieldCount =
tableSchemaInfo.downStreamFlusstreamSchema.getRowType().getFieldCount();
int flussFieldCount = tableSchemaInfo.downstreamFlussSchema.getRowType().getFieldCount();
boolean hasPrimaryKey = !tableSchemaInfo.upstreamCdcSchema.primaryKeys().isEmpty();
switch (op) {
case INSERT:
Expand All @@ -130,17 +149,17 @@ private TablePath getTablePath(TableId tableId) {

private static class TableSchemaInfo {
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema;
org.apache.fluss.metadata.Schema downStreamFlusstreamSchema;
org.apache.fluss.metadata.Schema downstreamFlussSchema;
Map<Integer, Integer> indexMapping;

private TableSchemaInfo(
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema,
org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) {
org.apache.fluss.metadata.Schema downstreamFlussSchema) {
this.upstreamCdcSchema = upstreamCdcSchema;
this.downStreamFlusstreamSchema = downStreamFlusstreamSchema;
this.downstreamFlussSchema = downstreamFlussSchema;
this.indexMapping =
sanityCheckAndGenerateIndexMapping(
toFlussSchema(upstreamCdcSchema), downStreamFlusstreamSchema);
toFlussSchema(upstreamCdcSchema), downstreamFlussSchema);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.flink.cdc.connectors.fluss.sink;

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.table.api.ValidationException;

Expand All @@ -31,12 +33,14 @@
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -47,6 +51,7 @@
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussTable;
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussType;

/** {@link MetadataApplier} for fluss. */
public class FlussMetaDataApplier implements MetadataApplier {
Expand All @@ -58,9 +63,6 @@ public class FlussMetaDataApplier implements MetadataApplier {
private Set<SchemaChangeEventType> enabledEventTypes =
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));

private transient Connection connection;
private transient Admin admin;

public FlussMetaDataApplier(
Configuration flussClientConfig,
Map<String, String> tableProperties,
Expand Down Expand Up @@ -92,22 +94,25 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
LOG.info("fluss metadata applier receive schemaChangeEvent {}", schemaChangeEvent);
Admin admin = getAdmin();
if (schemaChangeEvent instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
applyCreateTable(admin, createTableEvent);
applyCreateTable(createTableEvent);
} else if (schemaChangeEvent instanceof DropTableEvent) {
DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
applyDropTable(admin, dropTableEvent);
applyDropTable(dropTableEvent);
} else if (schemaChangeEvent instanceof AddColumnEvent) {
AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent;
applyAddColumnTable(addColumnEvent);
} else {
throw new IllegalArgumentException(
"fluss metadata applier only support CreateTableEvent now but receives "
"fluss metadata applier only supports CreateTableEvent and AddColumnEvent now but receives "
+ schemaChangeEvent);
}
}

private void applyCreateTable(Admin admin, CreateTableEvent event) {
try {
private void applyCreateTable(CreateTableEvent event) {
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
String tableIdentifier = tablePath.getDatabaseName() + "." + tablePath.getTableName();
Expand All @@ -129,8 +134,9 @@ private void applyCreateTable(Admin admin, CreateTableEvent event) {
}
}

private void applyDropTable(Admin admin, DropTableEvent event) {
try {
private void applyDropTable(DropTableEvent event) {
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
admin.dropTable(tablePath, true).get();
Expand All @@ -140,21 +146,36 @@ private void applyDropTable(Admin admin, DropTableEvent event) {
}
}

private Admin getAdmin() {
if (connection == null) {
connection = ConnectionFactory.createConnection(flussClientConfig);
admin = connection.getAdmin();
}
return admin;
}

@Override
public void close() throws Exception {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
private void applyAddColumnTable(AddColumnEvent event) {
List<TableChange> tableChanges = new ArrayList<>();
event.getAddedColumns()
.forEach(
columnWithPosition -> {
if (columnWithPosition.getPosition()
!= AddColumnEvent.ColumnPosition.LAST) {
throw new IllegalArgumentException(
"Fluss metadata applier only supports LAST position for adding columns now but receives "
+ columnWithPosition.getPosition()
+ ". Consider using 'schema.change.behavior' configuration with 'LENIENT' mode to handle schema changes more flexibly.");
}

Column column = columnWithPosition.getAddColumn();
tableChanges.add(
TableChange.addColumn(
column.getName(),
toFlussType(column.getType()),
column.getComment(),
TableChange.ColumnPosition.last()));
});

try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
TableId tableId = event.tableId();
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
admin.alterTable(tablePath, tableChanges, true).get();
} catch (Exception e) {
LOG.error("Failed to apply schema change {}", event, e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.util.CollectionUtil;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;

Expand Down Expand Up @@ -94,28 +93,25 @@ public static org.apache.fluss.metadata.Schema toFlussSchema(
schemBuilder.primaryKey(cdcSchema.primaryKeys());
}

Schema schema =
schemBuilder
.fromColumns(
cdcSchema.getColumns().stream()
.map(
column ->
new Schema.Column(
column.getName(),
toFlussType(column.getType()),
column.getComment()))
.collect(Collectors.toList()))
.build();
return schema;
// use schemBuilder.column rather than schemBuilder.fromColumns to reassign nested row id.
cdcSchema
.getColumns()
.forEach(
column ->
schemBuilder
.column(
column.getName(),
column.getType().accept(TO_FLUSS_TYPE_INSTANCE))
.withComment(column.getComment()));
return schemBuilder.build();
}

@VisibleForTesting
private static org.apache.fluss.types.DataType toFlussType(
public static org.apache.fluss.types.DataType toFlussType(
org.apache.flink.cdc.common.types.DataType flinkDataType) {
return flinkDataType.accept(TO_FLUSS_TYPE_INSTANCE);
}

public static Boolean sameCdcColumnsIgnoreCommentAndDefaultValue(
public static Boolean sameSchemaIgnoreCommentAndDefaultValue(
org.apache.flink.cdc.common.schema.Schema oldSchema,
org.apache.flink.cdc.common.schema.Schema newSchema) {
List<org.apache.flink.cdc.common.schema.Column> upstreamColumns = oldSchema.getColumns();
Expand Down
Loading
Loading