Skip to content

Commit aae9bd9

Browse files
committed
[FLINK-37720][table-planner] Enable sink reuse table optimizer by default and apply plan changes to existing test cases
1 parent 9fb7cc3 commit aae9bd9

File tree

7 files changed

+166
-210
lines changed

7 files changed

+166
-210
lines changed

docs/layouts/shortcodes/generated/optimizer_config_configuration.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
</tr>
8686
<tr>
8787
<td><h5>table.optimizer.reuse-sink-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
88-
<td style="word-wrap: break-word;">false</td>
88+
<td style="word-wrap: break-word;">true</td>
8989
<td>Boolean</td>
9090
<td>When it is true, the optimizer will try to find out duplicated table sinks and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.</td>
9191
</tr>

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public class OptimizerConfigOptions {
109109
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
110110
key("table.optimizer.reuse-sink-enabled")
111111
.booleanType()
112-
.defaultValue(false)
112+
.defaultValue(true)
113113
.withDescription(
114114
"When it is true, the optimizer will try to find out duplicated table sinks and "
115115
+ "reuse them. This works only when "

flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out

+7-27
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,15 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[first])
99

1010
== Optimized Physical Plan ==
1111
Sink(table=[default_catalog.default_database.MySink], fields=[last])
12-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
13-
14-
Sink(table=[default_catalog.default_database.MySink], fields=[first])
15-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
12+
+- Union(all=[true], union=[last])
13+
:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
14+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
1615

1716
== Optimized Execution Plan ==
1817
Sink(table=[default_catalog.default_database.MySink], fields=[last])
19-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
20-
21-
Sink(table=[default_catalog.default_database.MySink], fields=[first])
22-
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
18+
+- Union(all=[true], union=[last])
19+
:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
20+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
2321

2422
== Physical Execution Plan ==
2523
{
@@ -51,17 +49,6 @@ Sink(table=[default_catalog.default_database.MySink], fields=[first])
5149
"ship_strategy" : "FORWARD",
5250
"side" : "second"
5351
} ]
54-
}, {
55-
"id" : ,
56-
"type" : "StreamingFileWriter",
57-
"pact" : "Operator",
58-
"contents" : "StreamingFileWriter",
59-
"parallelism" : 1,
60-
"predecessors" : [ {
61-
"id" : ,
62-
"ship_strategy" : "FORWARD",
63-
"side" : "second"
64-
} ]
6552
}, {
6653
"id" : ,
6754
"type" : "Source: Custom File source",
@@ -100,14 +87,7 @@ Sink(table=[default_catalog.default_database.MySink], fields=[first])
10087
"id" : ,
10188
"ship_strategy" : "FORWARD",
10289
"side" : "second"
103-
} ]
104-
}, {
105-
"id" : ,
106-
"type" : "end: Writer",
107-
"pact" : "Operator",
108-
"contents" : "end: Writer",
109-
"parallelism" : 1,
110-
"predecessors" : [ {
90+
}, {
11191
"id" : ,
11292
"ship_strategy" : "FORWARD",
11393
"side" : "second"

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml

+10-15
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,10 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
157157
<![CDATA[
158158
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
159159
+- Union(all=[true], union=[a, b, c])
160-
:- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=0}]]])
161-
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
162-
163-
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
164-
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
160+
:- Union(all=[true], union=[a, b, c])
161+
: :- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=0}]]])
162+
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
163+
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
165164
]]>
166165
</Resource>
167166
</TestCase>
@@ -196,13 +195,10 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
196195
</Resource>
197196
<Resource name="optimized exec plan">
198197
<![CDATA[
199-
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])(reuse_id=[1])
200-
201-
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
202-
+- Reused(reference_id=[1])
203-
204198
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
205-
+- Reused(reference_id=[1])
199+
+- Union(all=[true], union=[a, b, c])
200+
:- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])(reuse_id=[1])
201+
+- Reused(reference_id=[1])
206202
]]>
207203
</Resource>
208204
</TestCase>
@@ -221,10 +217,9 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
221217
<Resource name="optimized exec plan">
222218
<![CDATA[
223219
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
224-
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
225-
226-
Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
227-
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
220+
+- Union(all=[true], union=[a, b, c])
221+
:- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=1}]]])
222+
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{source.num-element-to-skip=2}]]])
228223
]]>
229224
</Resource>
230225
</TestCase>

0 commit comments

Comments
 (0)