Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;

import java.util.List;
Expand Down Expand Up @@ -120,6 +121,9 @@ private static String toString(TableChange tableChange) {
ModifyDefinitionQuery definitionQuery = (ModifyDefinitionQuery) tableChange;
return String.format(
" MODIFY DEFINITION QUERY TO '%s'", definitionQuery.getDefinitionQuery());
} else if (tableChange instanceof ModifyStartMode) {
ModifyStartMode startMode = (ModifyStartMode) tableChange;
return String.format(" MODIFY START_MODE TO '%s'", startMode.getStartMode());
} else {
return AlterTableChangeOperation.toString(tableChange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.Column.MetadataColumn;
import org.apache.flink.table.catalog.StartMode;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableChange.AddColumn;
import org.apache.flink.table.catalog.TableChange.AddDistribution;
Expand All @@ -44,6 +45,7 @@
import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint;
import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
import org.apache.flink.table.catalog.TableChange.ResetOption;
Expand Down Expand Up @@ -84,6 +86,7 @@ public class MaterializedTableChangeHandler {
private int droppedPersistedCnt = 0;
private String originalQuery;
private String expandedQuery;
private StartMode startMode;
private final Map<String, String> options;
private final List<String> validationErrors = new ArrayList<>();

Expand All @@ -102,6 +105,7 @@ public MaterializedTableChangeHandler(CatalogMaterializedTable oldTable) {
}
originalQuery = oldTable.getOriginalQuery();
expandedQuery = oldTable.getExpandedQuery();
startMode = oldTable.getStartMode().orElse(null);
this.oldTable = oldTable;
this.options = new HashMap<>(oldTable.getOptions());
}
Expand Down Expand Up @@ -170,6 +174,7 @@ public static CatalogMaterializedTable buildNewMaterializedTable(
.refreshStatus(context.getRefreshStatus())
.refreshHandlerDescription(context.getRefreshHandlerDesc())
.serializedRefreshHandler(context.getRefreshHandlerBytes())
.startMode(context.getStartMode())
.build();
}

Expand Down Expand Up @@ -222,6 +227,8 @@ private static HandlerRegistry createHandlerRegistry() {
registry.register(SetOption.class, MaterializedTableChangeHandler::setTableOption);
registry.register(ResetOption.class, MaterializedTableChangeHandler::resetTableOption);

registry.register(ModifyStartMode.class, MaterializedTableChangeHandler::modifyStartMode);

return registry;
}

Expand Down Expand Up @@ -284,6 +291,10 @@ public byte[] getRefreshHandlerBytes() {
return refreshHandlerBytes;
}

public StartMode getStartMode() {
return startMode;
}

@Nullable
public String getRefreshHandlerDesc() {
return refreshHandlerDesc;
Expand Down Expand Up @@ -409,6 +420,10 @@ private void modifyRefreshStatus(ModifyRefreshStatus modifyRefreshStatus) {
refreshStatus = modifyRefreshStatus.getRefreshStatus();
}

private void modifyStartMode(ModifyStartMode modifyStartMode) {
startMode = modifyStartMode.getStartMode();
}

private void addDistribution(AddDistribution addDistribution) {
distribution = addDistribution.getDistribution();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -243,8 +242,8 @@ class Builder {
private Schema schema;
private String comment;
private TableDistribution distribution = null;
private List<String> partitionKeys = Collections.emptyList();
private Map<String, String> options = Collections.emptyMap();
private List<String> partitionKeys = List.of();
private Map<String, String> options = Map.of();
private @Nullable Long snapshot;
private String originalQuery;
private String expandedQuery;
Expand All @@ -254,6 +253,7 @@ class Builder {
private RefreshStatus refreshStatus;
private @Nullable String refreshHandlerDescription;
private @Nullable byte[] serializedRefreshHandler;
private StartMode startMode;

private Builder() {}

Expand Down Expand Up @@ -341,6 +341,11 @@ public Builder distribution(@Nullable TableDistribution distribution) {
return this;
}

public Builder startMode(StartMode startMode) {
this.startMode = startMode;
return this;
}

public CatalogMaterializedTable build() {
return new DefaultCatalogMaterializedTable(
schema,
Expand All @@ -356,7 +361,8 @@ public CatalogMaterializedTable build() {
refreshMode,
refreshStatus,
refreshHandlerDescription,
serializedRefreshHandler);
serializedRefreshHandler,
startMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class DefaultCatalogMaterializedTable implements CatalogMaterializedTable
private final RefreshStatus refreshStatus;
private final @Nullable String refreshHandlerDescription;
private final @Nullable byte[] serializedRefreshHandler;
private final @Nullable StartMode startMode;

protected DefaultCatalogMaterializedTable(
Schema schema,
Expand All @@ -67,7 +68,8 @@ protected DefaultCatalogMaterializedTable(
@Nullable RefreshMode refreshMode,
RefreshStatus refreshStatus,
@Nullable String refreshHandlerDescription,
@Nullable byte[] serializedRefreshHandler) {
@Nullable byte[] serializedRefreshHandler,
@Nullable StartMode startMode) {
this.schema = checkNotNull(schema, "Schema must not be null.");
this.comment = comment;
this.distribution = distribution;
Expand All @@ -83,6 +85,7 @@ protected DefaultCatalogMaterializedTable(
this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must not be null.");
this.refreshHandlerDescription = refreshHandlerDescription;
this.serializedRefreshHandler = serializedRefreshHandler;
this.startMode = startMode;

checkArgument(
options.entrySet().stream()
Expand Down Expand Up @@ -136,7 +139,8 @@ public CatalogBaseTable copy() {
refreshMode,
refreshStatus,
refreshHandlerDescription,
serializedRefreshHandler);
serializedRefreshHandler,
startMode);
}

@Override
Expand All @@ -155,7 +159,8 @@ public CatalogMaterializedTable copy(Map<String, String> options) {
refreshMode,
refreshStatus,
refreshHandlerDescription,
serializedRefreshHandler);
serializedRefreshHandler,
startMode);
}

@Override
Expand All @@ -177,7 +182,8 @@ public CatalogMaterializedTable copy(
refreshMode,
refreshStatus,
refreshHandlerDescription,
serializedRefreshHandler);
serializedRefreshHandler,
startMode);
}

@Override
Expand Down Expand Up @@ -225,6 +231,11 @@ public RefreshStatus getRefreshStatus() {
return refreshStatus;
}

@Override
public Optional<StartMode> getStartMode() {
return Optional.ofNullable(startMode);
}

@Override
public Optional<String> getRefreshHandlerDescription() {
return Optional.ofNullable(refreshHandlerDescription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateMaterializedTable;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.MaterializedTableConfigOptions;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
Expand All @@ -29,6 +30,7 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.StartMode;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
Expand Down Expand Up @@ -78,6 +80,8 @@ protected interface MergeContext {
ResolvedSchema getMergedQuerySchema();

RefreshMode getMergedRefreshMode();

StartMode getMergedStartMode();
}

protected abstract MergeContext getMergeContext(
Expand All @@ -99,6 +103,18 @@ protected final IntervalFreshness getDerivedFreshness(T sqlCreateMaterializedTab
.orElse(null);
}

protected final StartMode getStartMode(T sqlCreateMaterializedTable, ConvertContext context) {
StartMode startMode =
MaterializedTableUtils.getMaterializedTableStartMode(
sqlCreateMaterializedTable.getStartMode());
if (startMode != null) {
return startMode;
}
return StartMode.of(
context.getTableConfig()
.get(MaterializedTableConfigOptions.MATERIALIZED_TABLE_DEFAULT_START_MODE));
}

protected final ResolvedSchema getQueryResolvedSchema(
T sqlCreateMaterializedTable, ConvertContext context) {
SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
Expand Down Expand Up @@ -160,6 +176,8 @@ protected final ResolvedCatalogMaterializedTable getResolvedCatalogMaterializedT

final RefreshMode refreshMode = getDerivedRefreshMode(logicalRefreshMode);

final StartMode startMode = mergeContext.getMergedStartMode();

return context.getCatalogManager()
.resolveCatalogMaterializedTable(
CatalogMaterializedTable.newBuilder()
Expand All @@ -174,6 +192,7 @@ protected final ResolvedCatalogMaterializedTable getResolvedCatalogMaterializedT
.logicalRefreshMode(logicalRefreshMode)
.refreshMode(refreshMode)
.refreshStatus(RefreshStatus.INITIALIZING)
.startMode(startMode)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
import org.apache.flink.table.catalog.StartMode;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UniqueConstraint;
Expand Down Expand Up @@ -137,6 +138,15 @@ private Function<ResolvedCatalogMaterializedTable, List<TableChange>> buildTable
throw new ValidationException("Changing of REFRESH MODE is unsupported");
}

final StartMode newStartMode = mergeContext.getMergedStartMode();
final StartMode oldStartMode =
oldTable.getStartMode()
.orElseThrow(
() -> new ValidationException("START_MODE must not be null"));
if (!Objects.equals(oldStartMode, newStartMode)) {
changes.add(TableChange.modifyStartMode(newStartMode));
}

return changes;
};
}
Expand Down Expand Up @@ -213,7 +223,7 @@ private List<TableChange> getSchemaTableChanges(
private Optional<TableChange> getConstraintChange(
final ResolvedSchema oldSchema,
final ResolvedSchema newSchema,
boolean hasConstraintDefinition) {
final boolean hasConstraintDefinition) {
final UniqueConstraint oldConstraint = oldSchema.getPrimaryKey().orElse(null);
final UniqueConstraint newConstraint = newSchema.getPrimaryKey().orElse(null);
if (hasConstraintDefinition && !Objects.equals(oldConstraint, newConstraint)) {
Expand Down Expand Up @@ -279,11 +289,8 @@ public boolean hasSchemaDefinition() {

@Override
public boolean hasConstraintDefinition() {
if (!sqlCreateMaterializedTable.getTableConstraints().isEmpty()) {
return true;
}

return hasSchemaDefinition();
return !sqlCreateMaterializedTable.getTableConstraints().isEmpty()
|| hasSchemaDefinition();
}

@Override
Expand Down Expand Up @@ -354,6 +361,11 @@ public RefreshMode getMergedRefreshMode() {
return getDerivedRefreshMode(
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable));
}

@Override
public StartMode getMergedStartMode() {
return getStartMode(sqlCreateMaterializedTable, context);
}
};
}
}
Loading