Skip to content

Commit b92435e

Browse files
committed
[FLINK-37720][table-planner] Enable sink reuse table optimizer by default and apply plan changes to existing test cases
1 parent 9fb7cc3 commit b92435e

File tree

9 files changed

+100
-151
lines changed

9 files changed

+100
-151
lines changed

docs/layouts/shortcodes/generated/optimizer_config_configuration.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
</tr>
8686
<tr>
8787
<td><h5>table.optimizer.reuse-sink-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
88-
<td style="word-wrap: break-word;">false</td>
88+
<td style="word-wrap: break-word;">true</td>
8989
<td>Boolean</td>
9090
<td>When it is true, the optimizer will try to find out duplicated table sinks and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.</td>
9191
</tr>

flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q

+6-10
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,15 @@ LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP
105105

106106
== Optimized Physical Plan ==
107107
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
108-
+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
109-
110-
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
111-
+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
108+
+- Union(all=[true], union=[EXPR$0, EXPR$1])
109+
:- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
110+
+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
112111

113112
== Optimized Execution Plan ==
114-
Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
115-
116113
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
117-
+- Reused(reference_id=[1])
118-
119-
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
120-
+- Reused(reference_id=[1])
114+
+- Union(all=[true], union=[EXPR$0, EXPR$1])
115+
:- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
116+
+- Reused(reference_id=[1])
121117
!ok
122118

123119
EXECUTE STATEMENT SET BEGIN

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public class OptimizerConfigOptions {
109109
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
110110
key("table.optimizer.reuse-sink-enabled")
111111
.booleanType()
112-
.defaultValue(false)
112+
.defaultValue(true)
113113
.withDescription(
114114
"When it is true, the optimizer will try to find out duplicated table sinks and "
115115
+ "reuse them. This works only when "

flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out

+7-27
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,15 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[first])
99

1010
== Optimized Physical Plan ==
1111
Sink(table=[default_catalog.default_database.MySink], fields=[last])
12-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
13-
14-
Sink(table=[default_catalog.default_database.MySink], fields=[first])
15-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
12+
+- Union(all=[true], union=[last])
13+
:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
14+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
1615

1716
== Optimized Execution Plan ==
1817
Sink(table=[default_catalog.default_database.MySink], fields=[last])
19-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
20-
21-
Sink(table=[default_catalog.default_database.MySink], fields=[first])
22-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
18+
+- Union(all=[true], union=[last])
19+
:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
20+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
2321

2422
== Physical Execution Plan ==
2523
{
@@ -51,17 +49,6 @@ Sink(table=[default_catalog.default_database.MySink], fields=[first])
5149
"ship_strategy" : "FORWARD",
5250
"side" : "second"
5351
} ]
54-
}, {
55-
"id" : ,
56-
"type" : "StreamingFileWriter",
57-
"pact" : "Operator",
58-
"contents" : "StreamingFileWriter",
59-
"parallelism" : 1,
60-
"predecessors" : [ {
61-
"id" : ,
62-
"ship_strategy" : "FORWARD",
63-
"side" : "second"
64-
} ]
6552
}, {
6653
"id" : ,
6754
"type" : "Source: Custom File source",
@@ -100,14 +87,7 @@ Sink(table=[default_catalog.default_database.MySink], fields=[first])
10087
"id" : ,
10188
"ship_strategy" : "FORWARD",
10289
"side" : "second"
103-
} ]
104-
}, {
105-
"id" : ,
106-
"type" : "end: Writer",
107-
"pact" : "Operator",
108-
"contents" : "end: Writer",
109-
"parallelism" : 1,
110-
"predecessors" : [ {
90+
}, {
11191
"id" : ,
11292
"ship_strategy" : "FORWARD",
11393
"side" : "second"

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml

+10-15
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,10 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
157157
<![CDATA[
158158
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
159159
+- Union(all=[true], union=[a, b, c])
160-
:- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=0}]]])
161-
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
162-
163-
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
164-
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
160+
:- Union(all=[true], union=[a, b, c])
161+
: :- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=0}]]])
162+
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
163+
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
165164
]]>
166165
</Resource>
167166
</TestCase>
@@ -196,13 +195,10 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
196195
</Resource>
197196
<Resource name="optimized exec plan">
198197
<![CDATA[
199-
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])(reuse_id=[1])
200-
201-
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
202-
+- Reused(reference_id=[1])
203-
204198
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
205-
+- Reused(reference_id=[1])
199+
+- Union(all=[true], union=[a, b, c])
200+
:- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])(reuse_id=[1])
201+
+- Reused(reference_id=[1])
206202
]]>
207203
</Resource>
208204
</TestCase>
@@ -221,10 +217,9 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
221217
<Resource name="optimized exec plan">
222218
<![CDATA[
223219
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
224-
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
225-
226-
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
227-
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
220+
+- Union(all=[true], union=[a, b, c])
221+
:- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
222+
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
228223
]]>
229224
</Resource>
230225
</TestCase>

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml

+25-26
Original file line numberDiff line numberDiff line change
@@ -367,32 +367,31 @@ LogicalSink(table=[default_catalog.default_database.sink2], fields=[number, word
367367
<Resource name="optimized rel plan">
368368
<![CDATA[
369369
Sink(table=[default_catalog.default_database.sink1], fields=[number, word], changelogMode=[NONE])
370-
+- Calc(select=[number, word], where=[>(word, 'a')], changelogMode=[I,UB,UA])
371-
+- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
372-
:- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
373-
: +- Calc(select=[number, word], changelogMode=[I,UB,UA])
374-
: +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
375-
: +- Exchange(distribution=[hash[word]], changelogMode=[I])
376-
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
377-
+- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
378-
+- Calc(select=[number, word], changelogMode=[I,UB,UA])
379-
+- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
380-
+- Exchange(distribution=[hash[word]], changelogMode=[I])
381-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
382-
383-
Sink(table=[default_catalog.default_database.sink1], fields=[number, word], changelogMode=[NONE])
384-
+- Calc(select=[number, word], where=[<(word, 'a')], changelogMode=[I,UB,UA])
385-
+- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
386-
:- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
387-
: +- Calc(select=[number, word], changelogMode=[I,UB,UA])
388-
: +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
389-
: +- Exchange(distribution=[hash[word]], changelogMode=[I])
390-
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
391-
+- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
392-
+- Calc(select=[number, word], changelogMode=[I,UB,UA])
393-
+- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
394-
+- Exchange(distribution=[hash[word]], changelogMode=[I])
395-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
370+
+- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
371+
:- Calc(select=[number, word], where=[>(word, 'a')], changelogMode=[I,UB,UA])
372+
: +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
373+
: :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
374+
: : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
375+
: : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
376+
: : +- Exchange(distribution=[hash[word]], changelogMode=[I])
377+
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
378+
: +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
379+
: +- Calc(select=[number, word], changelogMode=[I,UB,UA])
380+
: +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
381+
: +- Exchange(distribution=[hash[word]], changelogMode=[I])
382+
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
383+
+- Calc(select=[number, word], where=[<(word, 'a')], changelogMode=[I,UB,UA])
384+
+- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
385+
:- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
386+
: +- Calc(select=[number, word], changelogMode=[I,UB,UA])
387+
: +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
388+
: +- Exchange(distribution=[hash[word]], changelogMode=[I])
389+
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
390+
+- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
391+
+- Calc(select=[number, word], changelogMode=[I,UB,UA])
392+
+- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
393+
+- Exchange(distribution=[hash[word]], changelogMode=[I])
394+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
396395
397396
Sink(table=[default_catalog.default_database.sink2], fields=[number, word], changelogMode=[NONE])
398397
+- Calc(select=[number, word], changelogMode=[I,UB,UA])

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml

+23-35
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,13 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
129129
</Resource>
130130
<Resource name="optimized exec plan">
131131
<![CDATA[
132-
Exchange(distribution=[hash[name]])(reuse_id=[1])
133-
+- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
134-
135-
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
136-
+- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), _UTF-16LE'a')], uid=[a], select=[name,out], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
137-
+- Reused(reference_id=[1])
138-
139132
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
140-
+- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), _UTF-16LE'b')], uid=[b], select=[name,out], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
141-
+- Reused(reference_id=[1])
133+
+- Union(all=[true], union=[name, out])
134+
:- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), _UTF-16LE'a')], uid=[a], select=[name,out], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
135+
: +- Exchange(distribution=[hash[name]])(reuse_id=[1])
136+
: +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
137+
+- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), _UTF-16LE'b')], uid=[b], select=[name,out], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])
138+
+- Reused(reference_id=[1])
142139
]]>
143140
</Resource>
144141
</TestCase>
@@ -179,15 +176,12 @@ LogicalSink(table=[default_catalog.default_database.t_sink], fields=[out])
179176
</Resource>
180177
<Resource name="optimized exec plan">
181178
<![CDATA[
182-
Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])(reuse_id=[1])
183-
184-
Sink(table=[default_catalog.default_database.t_sink], fields=[out])
185-
+- ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT(), DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
186-
+- Reused(reference_id=[1])
187-
188179
Sink(table=[default_catalog.default_database.t_sink], fields=[out])
189-
+- ProcessTableFunction(invocation=[f(TABLE(#0), 42, DEFAULT(), DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
190-
+- Reused(reference_id=[1])
180+
+- Union(all=[true], union=[out])
181+
:- ProcessTableFunction(invocation=[f(TABLE(#0), 1, DEFAULT(), DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
182+
: +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])(reuse_id=[1])
183+
+- ProcessTableFunction(invocation=[f(TABLE(#0), 42, DEFAULT(), DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
184+
+- Reused(reference_id=[1])
191185
]]>
192186
</Resource>
193187
</TestCase>
@@ -252,15 +246,12 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
252246
</Resource>
253247
<Resource name="optimized exec plan">
254248
<![CDATA[
255-
ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), _UTF-16LE'same')], uid=[same], select=[name,out], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
256-
+- Exchange(distribution=[hash[name]])
257-
+- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
258-
259-
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
260-
+- Reused(reference_id=[1])
261-
262249
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
263-
+- Reused(reference_id=[1])
250+
+- Union(all=[true], union=[name, out])
251+
:- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), _UTF-16LE'same')], uid=[same], select=[name,out], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
252+
: +- Exchange(distribution=[hash[name]])
253+
: +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
254+
+- Reused(reference_id=[1])
264255
]]>
265256
</Resource>
266257
</TestCase>
@@ -286,17 +277,14 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
286277
</Resource>
287278
<Resource name="optimized exec plan">
288279
<![CDATA[
289-
ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), _UTF-16LE'same')], uid=[same], select=[name,out], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
290-
+- Exchange(distribution=[hash[name]])
291-
+- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
292-
293-
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
294-
+- Calc(select=['Bob' AS name, out], where=[(name = 'Bob')])
295-
+- Reused(reference_id=[1])
296-
297280
Sink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, out])
298-
+- Calc(select=['Alice' AS name, out], where=[(name = 'Alice')])
299-
+- Reused(reference_id=[1])
281+
+- Union(all=[true], union=[name, out])
282+
:- Calc(select=['Bob' AS name, out], where=[(name = 'Bob')])
283+
: +- ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), 1, DEFAULT(), _UTF-16LE'same')], uid=[same], select=[name,out], rowType=[RecordType(VARCHAR(5) name, VARCHAR(2147483647) out)])(reuse_id=[1])
284+
: +- Exchange(distribution=[hash[name]])
285+
: +- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])
286+
+- Calc(select=['Alice' AS name, out], where=[(name = 'Alice')])
287+
+- Reused(reference_id=[1])
300288
]]>
301289
</Resource>
302290
</TestCase>

0 commit comments

Comments
 (0)