Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36675][table-planner] Always serialize dynamic options to exec json plan #25626

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -44,12 +45,10 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class DynamicTableSinkSpec extends DynamicTableSpecBase {

public static final String FIELD_NAME_CATALOG_TABLE = "table";
public static final String FIELD_NAME_SINK_ABILITIES = "abilities";

public static final String FIELD_NAME_TARGET_COLUMNS = "targetColumns";

private final ContextResolvedTable contextResolvedTable;
private final @Nullable List<SinkAbilitySpec> sinkAbilities;

private final @Nullable int[][] targetColumns;
Expand All @@ -59,18 +58,14 @@ public class DynamicTableSinkSpec extends DynamicTableSpecBase {
@JsonCreator
public DynamicTableSinkSpec(
@JsonProperty(FIELD_NAME_CATALOG_TABLE) ContextResolvedTable contextResolvedTable,
@Nullable @JsonProperty(FIELD_NAME_DYNAMIC_OPTIONS) Map<String, String> dynamicOptions,
@Nullable @JsonProperty(FIELD_NAME_SINK_ABILITIES) List<SinkAbilitySpec> sinkAbilities,
@Nullable @JsonProperty(FIELD_NAME_TARGET_COLUMNS) int[][] targetColumns) {
this.contextResolvedTable = contextResolvedTable;
super(contextResolvedTable, dynamicOptions);
this.sinkAbilities = sinkAbilities;
this.targetColumns = targetColumns;
}

@JsonGetter(FIELD_NAME_CATALOG_TABLE)
public ContextResolvedTable getContextResolvedTable() {
return contextResolvedTable;
}

@JsonGetter(FIELD_NAME_SINK_ABILITIES)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Nullable
Expand Down Expand Up @@ -120,21 +115,25 @@ public boolean equals(Object o) {
}
DynamicTableSinkSpec that = (DynamicTableSinkSpec) o;
return Objects.equals(contextResolvedTable, that.contextResolvedTable)
&& Objects.equals(dynamicOptions, that.dynamicOptions)
&& Objects.equals(sinkAbilities, that.sinkAbilities)
&& Objects.equals(tableSink, that.tableSink)
&& Objects.equals(targetColumns, that.targetColumns);
}

@Override
public int hashCode() {
return Objects.hash(contextResolvedTable, sinkAbilities, targetColumns, tableSink);
return Objects.hash(
contextResolvedTable, dynamicOptions, sinkAbilities, targetColumns, tableSink);
}

@Override
public String toString() {
return "DynamicTableSinkSpec{"
+ "contextResolvedTable="
+ contextResolvedTable
+ ", dynamicOptions="
+ dynamicOptions
+ ", sinkAbilities="
+ sinkAbilities
+ ", targetColumns="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -51,20 +52,19 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class DynamicTableSourceSpec extends DynamicTableSpecBase {

public static final String FIELD_NAME_CATALOG_TABLE = "table";
public static final String FIELD_NAME_SOURCE_ABILITIES = "abilities";

private final ContextResolvedTable contextResolvedTable;
private final @Nullable List<SourceAbilitySpec> sourceAbilities;

private DynamicTableSource tableSource;

@JsonCreator
public DynamicTableSourceSpec(
@JsonProperty(FIELD_NAME_CATALOG_TABLE) ContextResolvedTable contextResolvedTable,
@Nullable @JsonProperty(FIELD_NAME_DYNAMIC_OPTIONS) Map<String, String> dynamicOptions,
@Nullable @JsonProperty(FIELD_NAME_SOURCE_ABILITIES)
List<SourceAbilitySpec> sourceAbilities) {
this.contextResolvedTable = contextResolvedTable;
super(contextResolvedTable, dynamicOptions);
this.sourceAbilities = sourceAbilities;
}

Expand Down Expand Up @@ -143,11 +143,6 @@ public LookupTableSource getLookupTableSource(
}
}

@JsonGetter(FIELD_NAME_CATALOG_TABLE)
public ContextResolvedTable getContextResolvedTable() {
return contextResolvedTable;
}

@JsonGetter(FIELD_NAME_SOURCE_ABILITIES)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Nullable
Expand All @@ -169,20 +164,23 @@ public boolean equals(Object o) {
}
DynamicTableSourceSpec that = (DynamicTableSourceSpec) o;
return Objects.equals(contextResolvedTable, that.contextResolvedTable)
&& Objects.equals(dynamicOptions, that.dynamicOptions)
&& Objects.equals(sourceAbilities, that.sourceAbilities)
&& Objects.equals(tableSource, that.tableSource);
}

@Override
public int hashCode() {
return Objects.hash(contextResolvedTable, sourceAbilities, tableSource);
return Objects.hash(contextResolvedTable, dynamicOptions, sourceAbilities, tableSource);
}

@Override
public String toString() {
return "DynamicTableSourceSpec{"
+ "contextResolvedTable="
+ contextResolvedTable
+ ", dynamicOptions="
+ dynamicOptions
+ ", sourceAbilities="
+ sourceAbilities
+ ", tableSource="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,44 @@
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.planner.calcite.FlinkContext;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/** Base class for {@link DynamicTableSinkSpec} and {@link DynamicTableSourceSpec}. */
class DynamicTableSpecBase {

public static final String FIELD_NAME_CATALOG_TABLE = "table";
public static final String FIELD_NAME_DYNAMIC_OPTIONS = "dynamicOptions";

protected final ContextResolvedTable contextResolvedTable;
protected final @Nullable Map<String, String> dynamicOptions;

public DynamicTableSpecBase(
ContextResolvedTable contextResolvedTable,
@Nullable Map<String, String> dynamicOptions) {
this.dynamicOptions = dynamicOptions;
this.contextResolvedTable =
computeContextResolvedTableWithDynamicOptions(contextResolvedTable, dynamicOptions);
}

@JsonGetter(FIELD_NAME_CATALOG_TABLE)
public ContextResolvedTable getContextResolvedTable() {
return contextResolvedTable;
}

@JsonGetter(FIELD_NAME_DYNAMIC_OPTIONS)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Nullable
public Map<String, String> getDynamicOptions() {
return dynamicOptions;
}

Map<String, String> loadOptionsFromCatalogTable(
ContextResolvedTable contextResolvedTable, FlinkContext context) {
// We need to load options from the catalog only if PLAN_RESTORE_CATALOG_OBJECTS == ALL and
Expand All @@ -50,4 +82,15 @@ Map<String, String> loadOptionsFromCatalogTable(
.map(CatalogBaseTable::getOptions)
.orElse(Collections.emptyMap());
}

private static ContextResolvedTable computeContextResolvedTableWithDynamicOptions(
ContextResolvedTable oldResolvedTable, @Nullable Map<String, String> dynamicOptions) {
if (dynamicOptions == null || dynamicOptions.isEmpty()) {
return oldResolvedTable;
}
Map<String, String> newOptions =
new HashMap<>(oldResolvedTable.getResolvedTable().getOptions());
newOptions.putAll(dynamicOptions);
return oldResolvedTable.copy(newOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Map;

/**
* TemporalTableSpec describes how the right tale of lookupJoin ser/des.
Expand All @@ -57,14 +58,16 @@ public class TemporalTableSourceSpec {

@JsonIgnore private RelOptTable temporalTable;

public TemporalTableSourceSpec(RelOptTable temporalTable) {
public TemporalTableSourceSpec(
RelOptTable temporalTable, @Nullable Map<String, String> extraOptions) {
this.temporalTable = temporalTable;
if (temporalTable instanceof TableSourceTable) {
TableSourceTable tableSourceTable = (TableSourceTable) temporalTable;
outputType = tableSourceTable.getRowType();
this.tableSourceSpec =
new DynamicTableSourceSpec(
tableSourceTable.contextResolvedTable(),
extraOptions,
Arrays.asList(tableSourceTable.abilitySpecs()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
Expand All @@ -35,6 +36,7 @@
import org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexBuilder;
Expand Down Expand Up @@ -205,8 +207,20 @@ public List<RelNode> reuseDuplicatedScan(List<RelNode> relNodes) {
}

// 2.4 Create a new ScanTableSource. ScanTableSource can not be pushed down twice.
List<Map<String, String>> deduplicatedDynamicOptions =
reusableNodes.stream()
.map(scan -> FlinkHints.getHintedOptions(scan.getHints()))
.distinct()
.collect(Collectors.toList());
// We have ensured that only table scans with same hints can be reused.
// See more at ScanReuserUtils#getDigest
Preconditions.checkState(deduplicatedDynamicOptions.size() == 1);

DynamicTableSourceSpec tableSourceSpec =
new DynamicTableSourceSpec(pickTable.contextResolvedTable(), specs);
new DynamicTableSourceSpec(
pickTable.contextResolvedTable(),
deduplicatedDynamicOptions.get(0),
specs);
ScanTableSource newTableSource =
tableSourceSpec.getScanTableSource(flinkContext, flinkTypeFactory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch

import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec
Expand Down Expand Up @@ -85,6 +86,7 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
override def translateToExecNode(): ExecNode[_] = {
val tableSourceSpec = new DynamicTableSourceSpec(
tableSourceTable.contextResolvedTable,
FlinkHints.getHintedOptions(getHints),
util.Arrays.asList(tableSourceTable.abilitySpecs: _*))
tableSourceSpec.setTableSource(tableSourceTable.tableSource)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class BatchPhysicalLookupJoin(
temporalTable: RelOptTable,
tableCalcProgram: Option[RexProgram],
joinInfo: JoinInfo,
joinType: JoinRelType)
joinType: JoinRelType,
dynamicOptionsOnTemporalTable: util.Map[String, String])
extends CommonPhysicalLookupJoin(
cluster,
traitSet,
Expand All @@ -61,7 +62,8 @@ class BatchPhysicalLookupJoin(
temporalTable,
tableCalcProgram,
joinInfo,
joinType)
joinType,
dynamicOptionsOnTemporalTable)
}

override def translateToExecNode(): ExecNode[_] = {
Expand All @@ -78,7 +80,7 @@ class BatchPhysicalLookupJoin(
JoinTypeUtil.getFlinkJoinType(joinType),
finalPreFilterCondition.orNull,
finalRemainingCondition.orNull,
new TemporalTableSourceSpec(temporalTable),
new TemporalTableSourceSpec(temporalTable, dynamicOptionsOnTemporalTable),
allLookupKeys.map(item => (Int.box(item._1), item._2)).asJava,
projectionOnTemporalTable,
filterOnTemporalTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.table.catalog.ContextResolvedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
import org.apache.flink.table.planner.plan.nodes.calcite.Sink
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
Expand Down Expand Up @@ -62,6 +63,7 @@ class BatchPhysicalSink(
val tableSinkSpec =
new DynamicTableSinkSpec(
contextResolvedTable,
FlinkHints.getHintedOptions(hints),
util.Arrays.asList(abilitySpecs: _*),
targetColumns)
tableSinkSpec.setTableSink(tableSink)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch

import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec
Expand Down Expand Up @@ -72,6 +73,7 @@ class BatchPhysicalTableSourceScan(
override def translateToExecNode(): ExecNode[_] = {
val tableSourceSpec = new DynamicTableSourceSpec(
tableSourceTable.contextResolvedTable,
FlinkHints.getHintedOptions(getHints),
util.Arrays.asList(tableSourceTable.abilitySpecs: _*))
tableSourceSpec.setTableSource(tableSourceTable.tableSource)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class StreamPhysicalLookupJoin(
joinInfo: JoinInfo,
joinType: JoinRelType,
lookupHint: Option[RelHint],
upsertMaterialize: Boolean)
upsertMaterialize: Boolean,
dynamicOptionsOnTemporalTable: util.Map[String, String])
extends CommonPhysicalLookupJoin(
cluster,
traitSet,
Expand All @@ -72,7 +73,8 @@ class StreamPhysicalLookupJoin(
joinInfo,
joinType,
lookupHint,
upsertMaterialize
upsertMaterialize,
dynamicOptionsOnTemporalTable
)
}

Expand All @@ -86,7 +88,8 @@ class StreamPhysicalLookupJoin(
joinInfo,
joinType,
lookupHint,
upsertMaterialize
upsertMaterialize,
dynamicOptionsOnTemporalTable
)
}

Expand All @@ -104,7 +107,7 @@ class StreamPhysicalLookupJoin(
JoinTypeUtil.getFlinkJoinType(joinType),
finalPreFilterCondition.orNull,
finalRemainingCondition.orNull,
new TemporalTableSourceSpec(temporalTable),
new TemporalTableSourceSpec(temporalTable, dynamicOptionsOnTemporalTable),
allLookupKeys.map(item => (Int.box(item._1), item._2)).asJava,
projectionOnTemporalTable,
filterOnTemporalTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.table.catalog.ContextResolvedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.calcite.Sink
Expand Down Expand Up @@ -85,6 +86,7 @@ class StreamPhysicalSink(
val tableSinkSpec =
new DynamicTableSinkSpec(
contextResolvedTable,
FlinkHints.getHintedOptions(hints),
util.Arrays.asList(abilitySpecs: _*),
targetColumns)
tableSinkSpec.setTableSink(tableSink)
Expand Down
Loading