diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java index 6fd2914f4d929..a77d1bf7e51cc 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -44,12 +45,10 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class DynamicTableSinkSpec extends DynamicTableSpecBase { - public static final String FIELD_NAME_CATALOG_TABLE = "table"; public static final String FIELD_NAME_SINK_ABILITIES = "abilities"; public static final String FIELD_NAME_TARGET_COLUMNS = "targetColumns"; - private final ContextResolvedTable contextResolvedTable; private final @Nullable List sinkAbilities; private final @Nullable int[][] targetColumns; @@ -59,18 +58,14 @@ public class DynamicTableSinkSpec extends DynamicTableSpecBase { @JsonCreator public DynamicTableSinkSpec( @JsonProperty(FIELD_NAME_CATALOG_TABLE) ContextResolvedTable contextResolvedTable, + @Nullable @JsonProperty(FIELD_NAME_DYNAMIC_OPTIONS) Map dynamicOptions, @Nullable @JsonProperty(FIELD_NAME_SINK_ABILITIES) List sinkAbilities, @Nullable @JsonProperty(FIELD_NAME_TARGET_COLUMNS) int[][] targetColumns) { - this.contextResolvedTable = contextResolvedTable; + super(contextResolvedTable, dynamicOptions); this.sinkAbilities = sinkAbilities; this.targetColumns = targetColumns; } - @JsonGetter(FIELD_NAME_CATALOG_TABLE) - public ContextResolvedTable getContextResolvedTable() { - return contextResolvedTable; - } - @JsonGetter(FIELD_NAME_SINK_ABILITIES) @JsonInclude(JsonInclude.Include.NON_EMPTY) @Nullable @@ -120,6 +115,7 @@ public boolean equals(Object o) { } DynamicTableSinkSpec that = (DynamicTableSinkSpec) o; return Objects.equals(contextResolvedTable, that.contextResolvedTable) + && Objects.equals(dynamicOptions, that.dynamicOptions) && Objects.equals(sinkAbilities, that.sinkAbilities) && Objects.equals(tableSink, that.tableSink) && Objects.equals(targetColumns, that.targetColumns); @@ -127,7 +123,8 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(contextResolvedTable, sinkAbilities, targetColumns, tableSink); + return Objects.hash( + contextResolvedTable, dynamicOptions, sinkAbilities, targetColumns, tableSink); } @Override @@ -135,6 +132,8 @@ public String toString() { return "DynamicTableSinkSpec{" + "contextResolvedTable=" + contextResolvedTable + + ", dynamicOptions=" + + dynamicOptions + ", sinkAbilities=" + sinkAbilities + ", targetColumns=" diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java index 21d0e31354171..6556abcd70b7e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java @@ -42,6 +42,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -51,10 +52,8 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class DynamicTableSourceSpec extends DynamicTableSpecBase { - public static final String FIELD_NAME_CATALOG_TABLE = "table"; public static final String FIELD_NAME_SOURCE_ABILITIES = "abilities"; - private final ContextResolvedTable contextResolvedTable; private final @Nullable List sourceAbilities; private DynamicTableSource tableSource; @@ -62,9 +61,10 @@ public class DynamicTableSourceSpec extends DynamicTableSpecBase { @JsonCreator public DynamicTableSourceSpec( @JsonProperty(FIELD_NAME_CATALOG_TABLE) ContextResolvedTable contextResolvedTable, + @Nullable @JsonProperty(FIELD_NAME_DYNAMIC_OPTIONS) Map dynamicOptions, @Nullable @JsonProperty(FIELD_NAME_SOURCE_ABILITIES) List sourceAbilities) { - this.contextResolvedTable = contextResolvedTable; + super(contextResolvedTable, dynamicOptions); this.sourceAbilities = sourceAbilities; } @@ -143,11 +143,6 @@ public LookupTableSource getLookupTableSource( } } - @JsonGetter(FIELD_NAME_CATALOG_TABLE) - public ContextResolvedTable getContextResolvedTable() { - return contextResolvedTable; - } - @JsonGetter(FIELD_NAME_SOURCE_ABILITIES) @JsonInclude(JsonInclude.Include.NON_EMPTY) @Nullable @@ -169,13 +164,14 @@ public boolean equals(Object o) { } DynamicTableSourceSpec that = (DynamicTableSourceSpec) o; return Objects.equals(contextResolvedTable, that.contextResolvedTable) + && Objects.equals(dynamicOptions, that.dynamicOptions) && Objects.equals(sourceAbilities, that.sourceAbilities) && Objects.equals(tableSource, that.tableSource); } @Override public int hashCode() { - return Objects.hash(contextResolvedTable, sourceAbilities, tableSource); + return Objects.hash(contextResolvedTable, dynamicOptions, sourceAbilities, tableSource); } @Override @@ -183,6 +179,8 @@ public String toString() { return "DynamicTableSourceSpec{" + "contextResolvedTable=" + contextResolvedTable + + ", dynamicOptions=" + + dynamicOptions + ", sourceAbilities=" + sourceAbilities + ", tableSource=" diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSpecBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSpecBase.java index ff0babb3875fc..9e1510fd9d53a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSpecBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSpecBase.java @@ -24,12 +24,44 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; + +import javax.annotation.Nullable; + import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** Base class for {@link DynamicTableSinkSpec} and {@link DynamicTableSourceSpec}. */ class DynamicTableSpecBase { + public static final String FIELD_NAME_CATALOG_TABLE = "table"; + public static final String FIELD_NAME_DYNAMIC_OPTIONS = "dynamicOptions"; + + protected final ContextResolvedTable contextResolvedTable; + protected final @Nullable Map dynamicOptions; + + public DynamicTableSpecBase( + ContextResolvedTable contextResolvedTable, + @Nullable Map dynamicOptions) { + this.dynamicOptions = dynamicOptions; + this.contextResolvedTable = + computeContextResolvedTableWithDynamicOptions(contextResolvedTable, dynamicOptions); + } + + @JsonGetter(FIELD_NAME_CATALOG_TABLE) + public ContextResolvedTable getContextResolvedTable() { + return contextResolvedTable; + } + + @JsonGetter(FIELD_NAME_DYNAMIC_OPTIONS) + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @Nullable + public Map getDynamicOptions() { + return dynamicOptions; + } + Map loadOptionsFromCatalogTable( ContextResolvedTable contextResolvedTable, FlinkContext context) { // We need to load options from the catalog only if PLAN_RESTORE_CATALOG_OBJECTS == ALL and @@ -50,4 +82,15 @@ Map loadOptionsFromCatalogTable( .map(CatalogBaseTable::getOptions) .orElse(Collections.emptyMap()); } + + private static ContextResolvedTable computeContextResolvedTableWithDynamicOptions( + ContextResolvedTable oldResolvedTable, @Nullable Map dynamicOptions) { + if (dynamicOptions == null || dynamicOptions.isEmpty()) { + return oldResolvedTable; + } + Map newOptions = + new HashMap<>(oldResolvedTable.getResolvedTable().getOptions()); + newOptions.putAll(dynamicOptions); + return oldResolvedTable.copy(newOptions); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/TemporalTableSourceSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/TemporalTableSourceSpec.java index 6974d60d8a6e6..0266ea36beba6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/TemporalTableSourceSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/TemporalTableSourceSpec.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Map; /** * TemporalTableSpec describes how the right tale of lookupJoin ser/des. @@ -57,7 +58,8 @@ public class TemporalTableSourceSpec { @JsonIgnore private RelOptTable temporalTable; - public TemporalTableSourceSpec(RelOptTable temporalTable) { + public TemporalTableSourceSpec( + RelOptTable temporalTable, @Nullable Map extraOptions) { this.temporalTable = temporalTable; if (temporalTable instanceof TableSourceTable) { TableSourceTable tableSourceTable = (TableSourceTable) temporalTable; @@ -65,6 +67,7 @@ public TemporalTableSourceSpec(RelOptTable temporalTable) { this.tableSourceSpec = new DynamicTableSourceSpec( tableSourceTable.contextResolvedTable(), + extraOptions, Arrays.asList(tableSourceTable.abilitySpecs())); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 9128e1103463a..056e9f300e5b4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -26,6 +26,7 @@ import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.connectors.DynamicSourceUtils; +import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; @@ -35,6 +36,7 @@ import org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule; import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rex.RexBuilder; @@ -205,8 +207,20 @@ public List reuseDuplicatedScan(List relNodes) { } // 2.4 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. + List> deduplicatedDynamicOptions = + reusableNodes.stream() + .map(scan -> FlinkHints.getHintedOptions(scan.getHints())) + .distinct() + .collect(Collectors.toList()); + // We have ensured that only table scans with same hints can be reused. + // See more at ScanReuserUtils#getDigest + Preconditions.checkState(deduplicatedDynamicOptions.size() == 1); + DynamicTableSourceSpec tableSourceSpec = - new DynamicTableSourceSpec(pickTable.contextResolvedTable(), specs); + new DynamicTableSourceSpec( + pickTable.contextResolvedTable(), + deduplicatedDynamicOptions.get(0), + specs); ScanTableSource newTableSource = tableSourceSpec.getScanTableSource(flinkContext, flinkTypeFactory); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala index 1826e3b0c04b6..a75cf92215513 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec @@ -85,6 +86,7 @@ class BatchPhysicalDynamicFilteringTableSourceScan( override def translateToExecNode(): ExecNode[_] = { val tableSourceSpec = new DynamicTableSourceSpec( tableSourceTable.contextResolvedTable, + FlinkHints.getHintedOptions(getHints), util.Arrays.asList(tableSourceTable.abilitySpecs: _*)) tableSourceSpec.setTableSource(tableSourceTable.tableSource) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLookupJoin.scala index 364f71991c67d..8c29900c6965e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLookupJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLookupJoin.scala @@ -42,7 +42,8 @@ class BatchPhysicalLookupJoin( temporalTable: RelOptTable, tableCalcProgram: Option[RexProgram], joinInfo: JoinInfo, - joinType: JoinRelType) + joinType: JoinRelType, + dynamicOptionsOnTemporalTable: util.Map[String, String]) extends CommonPhysicalLookupJoin( cluster, traitSet, @@ -61,7 +62,8 @@ class BatchPhysicalLookupJoin( temporalTable, tableCalcProgram, joinInfo, - joinType) + joinType, + dynamicOptionsOnTemporalTable) } override def translateToExecNode(): ExecNode[_] = { @@ -78,7 +80,7 @@ class BatchPhysicalLookupJoin( JoinTypeUtil.getFlinkJoinType(joinType), finalPreFilterCondition.orNull, finalRemainingCondition.orNull, - new TemporalTableSourceSpec(temporalTable), + new TemporalTableSourceSpec(temporalTable, dynamicOptionsOnTemporalTable), allLookupKeys.map(item => (Int.box(item._1), item._2)).asJava, projectionOnTemporalTable, filterOnTemporalTable, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala index 8080ec163f3d3..0e7057d4dce8b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.catalog.ContextResolvedTable import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec import org.apache.flink.table.planner.plan.nodes.calcite.Sink import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} @@ -62,6 +63,7 @@ class BatchPhysicalSink( val tableSinkSpec = new DynamicTableSinkSpec( contextResolvedTable, + FlinkHints.getHintedOptions(hints), util.Arrays.asList(abilitySpecs: _*), targetColumns) tableSinkSpec.setTableSink(tableSink) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala index 37a8d270ad541..52dd7d075f012 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.plan.nodes.exec.ExecNode import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec @@ -72,6 +73,7 @@ class BatchPhysicalTableSourceScan( override def translateToExecNode(): ExecNode[_] = { val tableSourceSpec = new DynamicTableSourceSpec( tableSourceTable.contextResolvedTable, + FlinkHints.getHintedOptions(getHints), util.Arrays.asList(tableSourceTable.abilitySpecs: _*)) tableSourceSpec.setTableSource(tableSourceTable.tableSource) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala index cd4261803fa79..575084058388a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala @@ -47,7 +47,8 @@ class StreamPhysicalLookupJoin( joinInfo: JoinInfo, joinType: JoinRelType, lookupHint: Option[RelHint], - upsertMaterialize: Boolean) + upsertMaterialize: Boolean, + dynamicOptionsOnTemporalTable: util.Map[String, String]) extends CommonPhysicalLookupJoin( cluster, traitSet, @@ -72,7 +73,8 @@ class StreamPhysicalLookupJoin( joinInfo, joinType, lookupHint, - upsertMaterialize + upsertMaterialize, + dynamicOptionsOnTemporalTable ) } @@ -86,7 +88,8 @@ class StreamPhysicalLookupJoin( joinInfo, joinType, lookupHint, - upsertMaterialize + upsertMaterialize, + dynamicOptionsOnTemporalTable ) } @@ -104,7 +107,7 @@ class StreamPhysicalLookupJoin( JoinTypeUtil.getFlinkJoinType(joinType), finalPreFilterCondition.orNull, finalRemainingCondition.orNull, - new TemporalTableSourceSpec(temporalTable), + new TemporalTableSourceSpec(temporalTable, dynamicOptionsOnTemporalTable), allLookupKeys.map(item => (Int.box(item._1), item._2)).asJava, projectionOnTemporalTable, filterOnTemporalTable, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala index a058c4b6e6894..b9c1703ca297b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream import org.apache.flink.table.catalog.ContextResolvedTable import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.calcite.Sink @@ -85,6 +86,7 @@ class StreamPhysicalSink( val tableSinkSpec = new DynamicTableSinkSpec( contextResolvedTable, + FlinkHints.getHintedOptions(hints), util.Arrays.asList(abilitySpecs: _*), targetColumns) tableSinkSpec.setTableSink(tableSink) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala index 550e873dadc99..64aa6c50c3828 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.plan.nodes.exec.ExecNode import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan @@ -74,6 +75,7 @@ class StreamPhysicalTableSourceScan( override def translateToExecNode(): ExecNode[_] = { val tableSourceSpec = new DynamicTableSourceSpec( tableSourceTable.contextResolvedTable, + FlinkHints.getHintedOptions(getHints), util.Arrays.asList(tableSourceTable.abilitySpecs: _*)) tableSourceSpec.setTableSource(tableSource) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLookupJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLookupJoinRule.scala index 6acc226424027..bdb29ae56600d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLookupJoinRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLookupJoinRule.scala @@ -27,6 +27,8 @@ import org.apache.flink.table.planner.plan.rules.physical.common.{BaseSnapshotOn import org.apache.calcite.plan.{RelOptRule, RelOptTable} import org.apache.calcite.rex.RexProgram +import java.util + /** * Rules that convert [[FlinkLogicalJoin]] on a [[FlinkLogicalSnapshot]] into * [[BatchPhysicalLookupJoin]]. @@ -47,8 +49,9 @@ object BatchPhysicalLookupJoinRule { join: FlinkLogicalJoin, input: FlinkLogicalRel, temporalTable: RelOptTable, - calcProgram: Option[RexProgram]): CommonPhysicalLookupJoin = { - doTransform(join, input, temporalTable, calcProgram) + calcProgram: Option[RexProgram], + dynamicOptionsOnTemporalTable: util.Map[String, String]): CommonPhysicalLookupJoin = { + doTransform(join, input, temporalTable, calcProgram, dynamicOptionsOnTemporalTable) } } @@ -59,8 +62,9 @@ object BatchPhysicalLookupJoinRule { join: FlinkLogicalJoin, input: FlinkLogicalRel, temporalTable: RelOptTable, - calcProgram: Option[RexProgram]): CommonPhysicalLookupJoin = { - doTransform(join, input, temporalTable, calcProgram) + calcProgram: Option[RexProgram], + dynamicOptionsOnTemporalTable: util.Map[String, String]): CommonPhysicalLookupJoin = { + doTransform(join, input, temporalTable, calcProgram, dynamicOptionsOnTemporalTable) } } @@ -69,7 +73,8 @@ object BatchPhysicalLookupJoinRule { join: FlinkLogicalJoin, input: FlinkLogicalRel, temporalTable: RelOptTable, - calcProgram: Option[RexProgram]): BatchPhysicalLookupJoin = { + calcProgram: Option[RexProgram], + dynamicOptionsOnTemporalTable: util.Map[String, String]): BatchPhysicalLookupJoin = { val joinInfo = join.analyzeCondition val cluster = join.getCluster @@ -83,6 +88,7 @@ object BatchPhysicalLookupJoinRule { temporalTable, calcProgram, joinInfo, - join.getJoinType) + join.getJoinType, + dynamicOptionsOnTemporalTable) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala index 1435b42d828e5..1df46c6357ead 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.common import org.apache.flink.table.api.TableException import org.apache.flink.table.connector.source.LookupTableSource import org.apache.flink.table.legacy.sources.LookupableTableSource +import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.plan.nodes.logical._ import org.apache.flink.table.planner.plan.nodes.physical.common.{CommonPhysicalLegacyTableSourceScan, CommonPhysicalLookupJoin, CommonPhysicalTableSourceScan} import org.apache.flink.table.planner.plan.rules.common.CommonTemporalTableJoinRule @@ -112,7 +113,8 @@ trait CommonLookupJoinRule extends CommonTemporalTableJoinRule { join: FlinkLogicalJoin, input: FlinkLogicalRel, temporalTable: RelOptTable, - calcProgram: Option[RexProgram]): CommonPhysicalLookupJoin + calcProgram: Option[RexProgram], + dynamicOptionsOnTemporalTable: util.Map[String, String]): CommonPhysicalLookupJoin } abstract class BaseSnapshotOnTableScanRule(description: String) @@ -134,10 +136,12 @@ abstract class BaseSnapshotOnTableScanRule(description: String) override def onMatch(call: RelOptRuleCall): Unit = { val join = call.rel[FlinkLogicalJoin](0) val input = call.rel[FlinkLogicalRel](1) - val tableScan = call.rel[RelNode](3) + val tableScan = call.rel[TableScan](3) validateJoin(join) - val temporalJoin = transform(join, input, tableScan.getTable, None) + val dynamicOptionsOnTemporalTable = FlinkHints.getHintedOptions(tableScan.getHints) + val temporalJoin = + transform(join, input, tableScan.getTable, None, dynamicOptionsOnTemporalTable) call.transformTo(temporalJoin) } @@ -166,10 +170,16 @@ abstract class BaseSnapshotOnCalcTableScanRule(description: String) val join = call.rel[FlinkLogicalJoin](0) val input = call.rel[FlinkLogicalRel](1) val calc = call.rel[FlinkLogicalCalc](3) - val tableScan = call.rel[RelNode](4) + val tableScan = call.rel[TableScan](4) validateJoin(join) - val temporalJoin = transform(join, input, tableScan.getTable, Some(calc.getProgram)) + val dynamicOptionsOnTemporalTable = FlinkHints.getHintedOptions(tableScan.getHints) + val temporalJoin = transform( + join, + input, + tableScan.getTable, + Some(calc.getProgram), + dynamicOptionsOnTemporalTable) call.transformTo(temporalJoin) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala index b2f74b0ab353d..895077af34cfd 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala @@ -29,6 +29,8 @@ import org.apache.calcite.plan.{RelOptRule, RelOptTable} import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexProgram +import java.util + /** * Rules that convert [[FlinkLogicalJoin]] on a [[FlinkLogicalSnapshot]] into * [[StreamPhysicalLookupJoin]] @@ -49,8 +51,9 @@ object StreamPhysicalLookupJoinRule { join: FlinkLogicalJoin, input: FlinkLogicalRel, temporalTable: RelOptTable, - calcProgram: Option[RexProgram]): CommonPhysicalLookupJoin = { - doTransform(join, input, temporalTable, calcProgram) + calcProgram: Option[RexProgram], + dynamicOptionsOnTemporalTable: util.Map[String, String]): CommonPhysicalLookupJoin = { + doTransform(join, input, temporalTable, calcProgram, dynamicOptionsOnTemporalTable) } } @@ -61,8 +64,9 @@ object StreamPhysicalLookupJoinRule { join: FlinkLogicalJoin, input: FlinkLogicalRel, temporalTable: RelOptTable, - calcProgram: Option[RexProgram]): CommonPhysicalLookupJoin = { - doTransform(join, input, temporalTable, calcProgram) + calcProgram: Option[RexProgram], + dynamicOptionsOnTemporalTable: util.Map[String, String]): CommonPhysicalLookupJoin = { + doTransform(join, input, temporalTable, calcProgram, dynamicOptionsOnTemporalTable) } } @@ -70,7 +74,8 @@ object StreamPhysicalLookupJoinRule { join: FlinkLogicalJoin, input: FlinkLogicalRel, temporalTable: RelOptTable, - calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = { + calcProgram: Option[RexProgram], + dynamicOptionsOnTemporalTable: util.Map[String, String]): StreamPhysicalLookupJoin = { val joinInfo = join.analyzeCondition @@ -101,6 +106,7 @@ object StreamPhysicalLookupJoinRule { joinInfo, join.getJoinType, lookupHint, - false) + false, + dynamicOptionsOnTemporalTable) } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java index b0ccbd732ea93..038bce33dd921 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java @@ -22,10 +22,12 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.table.planner.utils.JsonPlanTestBase; import org.apache.flink.table.planner.utils.JsonTestUtils; import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.apache.flink.types.Row; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.BeforeEach; @@ -37,13 +39,16 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -421,6 +426,79 @@ void testCompileAndExecutePlanBatchMode() throws Exception { assertResult(DATA, sinkPath); } + @Test + void testCompileThenExecutePlanSqlWithDynamicOptionHints() throws Exception { + List srcOldData = new ArrayList<>(); + srcOldData.add(changelogRow("+I", 1, "hi")); + srcOldData.add(changelogRow("-D", 1, "hi")); + String srcOldDataId = TestValuesTableFactory.registerData(srcOldData); + + List srcNewData = new ArrayList<>(); + srcNewData.add(changelogRow("+I", 2, "hello")); + srcNewData.add(changelogRow("-D", 2, "hello")); + String srcNewDataId = TestValuesTableFactory.registerData(srcNewData); + + tableEnv.executeSql( + String.format( + "CREATE TABLE CdcSource( " + + " a int primary key not enforced, " + + " b string, " + + " pt as proctime() " + + ") WITH ( " + + " 'connector' = 'values', " + + " 'data-id' = '%s' " + + ")", + srcOldDataId)); + + List dimOldData = new ArrayList<>(); + dimOldData.add(changelogRow("+I", 1, "hi2Dim")); + String dimOldDataId = TestValuesTableFactory.registerData(dimOldData); + + List dimNewData = new ArrayList<>(); + dimNewData.add(changelogRow("+I", 2, "hello2Dim")); + String dimNewDataId = TestValuesTableFactory.registerData(dimNewData); + + tableEnv.executeSql( + String.format( + "CREATE TABLE Dim( " + + " a int primary key not enforced, " + + " c string " + + ") WITH ( " + + " 'connector' = 'values', " + + " 'data-id' = '%s' " + + ")", + dimOldDataId)); + + tableEnv.executeSql( + "CREATE TABLE Snk( " + + " a int primary key not enforced, " + + " b string," + + " c string" + + ") WITH ( " + + " 'connector' = 'values', " + + " 'sink-insert-only' = 'true' " + + ")"); + + String sql = + String.format( + "INSERT INTO Snk /*+ OPTIONS('sink-insert-only'='false') */ " + + "SELECT CdcSource.a, CdcSource.b, Dim.c FROM CdcSource /*+ OPTIONS('data-id' = '%s') */ " + + "JOIN Dim /*+ OPTIONS('data-id' = '%s') */ FOR SYSTEM_TIME AS OF CdcSource.pt " + + "ON CdcSource.a = Dim.a", + srcNewDataId, dimNewDataId); + + CompiledPlan compiledPlan = tableEnv.compilePlanSql(sql); + tableEnv.loadPlan(PlanReference.fromJsonString(compiledPlan.asJsonString())) + .execute() + .await(); + + List expected = Arrays.asList("+I[2, hello, hello2Dim]", "-D[2, hello, hello2Dim]"); + List actual = TestValuesTableFactory.getRawResultsAsStrings("Snk"); + Collections.sort(expected); + Collections.sort(actual); + assertThat(actual).isEqualTo(expected); + } + private File createSourceSinkTables() throws IOException { createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION); return createTestCsvSinkTable("sink", COLUMNS_DEFINITION); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java index 41a85088b3d28..237bef0ab27b2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java @@ -205,7 +205,7 @@ void testWithDynamicFilteringPlan() { BatchExecTableSourceScan scan = new BatchExecTableSourceScan( new Configuration(), - new DynamicTableSourceSpec(null, null), + new DynamicTableSourceSpec(null, null, null), InputProperty.DEFAULT, RowType.of(new IntType(), new IntType(), new IntType()), "DynamicFilteringTableSourceScan"); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java index dad7e47eefe95..bc4b0397180a7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java @@ -103,6 +103,7 @@ static Stream testDynamicTableSinkSpecSerde() { "MyTable"), new ResolvedCatalogTable(catalogTable1, resolvedSchema1)), null, + null, null); Map options2 = new HashMap<>(); @@ -133,6 +134,7 @@ static Stream testDynamicTableSinkSpecSerde() { CatalogManagerMocks.DEFAULT_DATABASE, "MyTable"), new ResolvedCatalogTable(catalogTable2, resolvedSchema2)), + Collections.singletonMap("path", "/tmp/hint"), Arrays.asList( new OverwriteSpec(true), new PartitioningSpec( @@ -170,6 +172,7 @@ static Stream testDynamicTableSinkSpecSerde() { CatalogManagerMocks.DEFAULT_DATABASE, "MyTable"), new ResolvedCatalogTable(catalogTable3, resolvedSchema3)), + Collections.emptyMap(), Collections.singletonList( new WritingMetadataSpec( Collections.singletonList("m"), @@ -200,6 +203,7 @@ void testDynamicTableSinkSpecSerde(DynamicTableSinkSpec spec) throws IOException spec.getContextResolvedTable().getIdentifier(), catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(), spec.getContextResolvedTable().getResolvedTable()), + spec.getDynamicOptions(), spec.getSinkAbilities(), null); @@ -262,6 +266,7 @@ void testDynamicTableSinkSpecSerdeWithEnrichmentOptions() throws Exception { identifier, catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(), planResolvedCatalogTable), + null, Collections.emptyList(), null); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java index 4dbfdd013a984..1dfe09c1579ef 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java @@ -121,6 +121,7 @@ public static Stream testDynamicTableSinkSpecSerde() { TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(), "MyTable"), new ResolvedCatalogTable(catalogTable1, resolvedSchema1)), + null, null); Map options2 = new HashMap<>(); @@ -163,6 +164,7 @@ public static Stream testDynamicTableSinkSpecSerde() { TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(), "MyTable"), new ResolvedCatalogTable(catalogTable2, resolvedSchema2)), + Collections.singletonMap("source.sleep-time", "1s"), Arrays.asList( new ProjectPushDownSpec( new int[][] {{0}, {1}, {4}, {6}}, @@ -268,6 +270,7 @@ void testDynamicTableSourceSpecSerde(DynamicTableSourceSpec spec) throws IOExcep spec.getContextResolvedTable().getIdentifier(), catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(), spec.getContextResolvedTable().getResolvedTable()), + spec.getDynamicOptions(), spec.getSourceAbilities()); String actualJson = toJson(serdeCtx, spec); @@ -275,6 +278,7 @@ void testDynamicTableSourceSpecSerde(DynamicTableSourceSpec spec) throws IOExcep toObject(serdeCtx, actualJson, DynamicTableSourceSpec.class); assertThat(actual.getContextResolvedTable()).isEqualTo(spec.getContextResolvedTable()); + assertThat(actual.getDynamicOptions()).isEqualTo(spec.getDynamicOptions()); assertThat(actual.getSourceAbilities()).isEqualTo(spec.getSourceAbilities()); assertThat( @@ -333,6 +337,7 @@ void testDynamicTableSourceSpecSerdeWithEnrichmentOptions() throws Exception { identifier, catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(), planResolvedCatalogTable), + null, Collections.emptyList()); String actualJson = toJson(serdeCtx, planSpec); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java index 7798e6a94e12c..a51ddee00b80e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java @@ -104,7 +104,7 @@ public static Stream testTemporalTableSourceSpecSerde() FACTORY, new SourceAbilitySpec[] {new LimitPushDownSpec(100)}); TemporalTableSourceSpec temporalTableSourceSpec1 = - new TemporalTableSourceSpec(tableSourceTable1); + new TemporalTableSourceSpec(tableSourceTable1, null); return Stream.of(temporalTableSourceSpec1); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LoadJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LoadJsonPlanTest.java new file mode 100644 index 0000000000000..2ef650aa6cc35 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/LoadJsonPlanTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.jsonplan; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec; +import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor; +import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitorImpl; +import org.apache.flink.table.planner.utils.JsonPlanTestBase; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.apache.flink.shaded.guava32.com.google.common.collect.Sets; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for loading plan to exec node graph. */ +@ExtendWith(ParameterizedTestExtension.class) +class LoadJsonPlanTest extends JsonPlanTestBase { + + private final TableConfigOptions.CatalogPlanCompilation compileCatalogObjectsLevel; + + @Parameters(name = "compileCatalogObjectsLevel = {0}") + private static Collection data() { + return Arrays.asList(TableConfigOptions.CatalogPlanCompilation.values()); + } + + LoadJsonPlanTest(TableConfigOptions.CatalogPlanCompilation compileCatalogObjectsLevel) { + this.compileCatalogObjectsLevel = compileCatalogObjectsLevel; + } + + @BeforeEach + protected void setup() throws Exception { + super.setup(); + + tableEnv.getConfig() + .set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, compileCatalogObjectsLevel); + + tableEnv.executeSql( + "CREATE TABLE src (\n" + + " a int,\n" + + " b varchar,\n" + + " c int,\n" + + " proctime as PROCTIME()" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false'\n" + + ")"); + tableEnv.executeSql( + "CREATE TABLE dim (\n" + + " a int,\n" + + " b varchar,\n" + + " c int\n" + + ") with (\n" + + " 'connector' = 'values'\n" + + ")"); + + tableEnv.executeSql( + "CREATE TABLE snk (\n" + + " a int,\n" + + " b varchar,\n" + + " c int\n" + + ") with (\n" + + " 'connector' = 'values'\n" + + ")"); + } + + @TestTemplate + void testLoadPlanWithHintsOnLookupSource() { + String sql = + "insert into snk select src.a, src.b, src.c from src " + + "join dim /*+ OPTIONS('async'='true') */ for system_time as of " + + "src.proctime on src.a = dim.a"; + ExecNodeGraph execNodeGraph = compileSqlAndLoadPlan(sql); + StreamExecLookupJoin lookupJoin = + getSingleSpecificExecNodes(execNodeGraph, StreamExecLookupJoin.class); + + DynamicTableSourceSpec spec = lookupJoin.getTemporalTableSourceSpec().getTableSourceSpec(); + assertThat(spec).isNotNull(); + Map expectedExtraOptions = Collections.singletonMap("async", "true"); + + // verify the dynamic options in spec + assertThat(spec.getDynamicOptions()).isNotNull().isEqualTo(expectedExtraOptions); + + // verify the table options in dim table + Map tableOptions = spec.getContextResolvedTable().getTable().getOptions(); + assertThat(tableOptions).containsAllEntriesOf(expectedExtraOptions); + } + + @TestTemplate + void testLoadPlanWithHintsOnScanSource() { + String sql = + "insert into snk select src.a, src.b, src.c from src /*+ OPTIONS('source.sleep-time'='10s') */ "; + + ExecNodeGraph execNodeGraph = compileSqlAndLoadPlan(sql); + StreamExecTableSourceScan source = + getSingleSpecificExecNodes(execNodeGraph, StreamExecTableSourceScan.class); + + DynamicTableSourceSpec spec = source.getTableSourceSpec(); + Map expectedExtraOptions = + Collections.singletonMap("source.sleep-time", "10s"); + + // verify the dynamic options in spec + assertThat(spec.getDynamicOptions()).isNotNull().isEqualTo(expectedExtraOptions); + + // verify the table options in table + Map tableOptions = spec.getContextResolvedTable().getTable().getOptions(); + assertThat(tableOptions).containsAllEntriesOf(expectedExtraOptions); + } + + @TestTemplate + void testLoadPlanWithHintsOnSink() { + String sql = + "insert into snk /*+ OPTIONS('sink-insert-only'='false') */ select src.a, src.b, src.c from src"; + + ExecNodeGraph execNodeGraph = compileSqlAndLoadPlan(sql); + StreamExecSink sink = getSingleSpecificExecNodes(execNodeGraph, StreamExecSink.class); + + DynamicTableSinkSpec spec = sink.getTableSinkSpec(); + Map expectedExtraOptions = + Collections.singletonMap("sink-insert-only", "false"); + + // verify the dynamic options in spec + assertThat(spec.getDynamicOptions()).isNotNull().isEqualTo(expectedExtraOptions); + + // verify the table options in table + Map tableOptions = spec.getContextResolvedTable().getTable().getOptions(); + assertThat(tableOptions).containsAllEntriesOf(expectedExtraOptions); + } + + private > T getSingleSpecificExecNodes( + ExecNodeGraph execNodeGraph, Class expectExecNodeClazz) { + Set collectResults = getSpecificExecNodes(execNodeGraph, expectExecNodeClazz); + if (collectResults.size() > 1) { + throw new IllegalArgumentException( + "There are more than one " + + expectExecNodeClazz.getSimpleName() + + " in the execNodeGraph"); + } + if (collectResults.isEmpty()) { + throw new IllegalArgumentException( + "There is no " + expectExecNodeClazz.getSimpleName() + " in the execNodeGraph"); + } + return collectResults.iterator().next(); + } + + private > Set getSpecificExecNodes( + ExecNodeGraph execNodeGraph, Class clazz) { + final Set collectResults = Sets.newIdentityHashSet(); + + final ExecNodeVisitor collector = + new ExecNodeVisitorImpl() { + @Override + public void visit(ExecNode node) { + if (clazz.isInstance(node)) { + collectResults.add((T) node); + } + super.visit(node); + } + }; + + execNodeGraph.getRootNodes().forEach(collector::visit); + return collectResults; + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java index 6d6dbcf438689..398cfdd00699d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.internal.CompiledPlanUtils; import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.types.Row; @@ -98,6 +99,14 @@ protected TableResult compileSqlAndExecutePlan( return newCompiledPlan.execute(); } + protected ExecNodeGraph compileSqlAndLoadPlan(String sql) { + CompiledPlan compiledPlan = tableEnv.compilePlanSql(sql); + checkTransformationUids(compiledPlan); + CompiledPlan newCompiledPlan = + tableEnv.loadPlan(PlanReference.fromJsonString(compiledPlan.asJsonString())); + return CompiledPlanUtils.unwrap(newCompiledPlan).getExecNodeGraph(); + } + protected void checkTransformationUids(CompiledPlan compiledPlan) { List> transformations = CompiledPlanUtils.toTransformations(tableEnv, compiledPlan); diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out index 9b5893b048ed2..c364506a1a214 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out @@ -27,6 +27,10 @@ "scan.parallelism" : "2" } } + }, + "dynamicOptions" : { + "bounded" : "true", + "scan.parallelism" : "2" } }, "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 0a818dfb0da3a..745b4ba938d3b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -2680,7 +2680,8 @@ class FlinkRelMdHandlerTestBase { batchScan.getTable, None, JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)), - JoinRelType.INNER + JoinRelType.INNER, + Collections.emptyMap() ) val streamSourceOp = new TableSourceQueryOperation[RowData](temporalTableSource, false) val streamScan = relBuilder.queryOperation(streamSourceOp).build().asInstanceOf[TableScan] @@ -2693,7 +2694,8 @@ class FlinkRelMdHandlerTestBase { JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)), JoinRelType.INNER, Option.empty[RelHint], - false + false, + Collections.emptyMap() ) (batchLookupJoin, streamLookupJoin) }