Skip to content

Commit d6be0aa

Browse files
committed
[FLINK-39261][table] Add Row Semantic for retract and Set Semantic for upsert mode
1 parent 3e9e416 commit d6be0aa

File tree

17 files changed

+234
-61
lines changed

17 files changed

+234
-61
lines changed

docs/content/docs/sql/reference/queries/changelog.md

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,19 @@ This is useful when consuming Change Data Capture (CDC) streams from systems lik
4343

4444
```sql
4545
SELECT * FROM FROM_CHANGELOG(
46-
input => TABLE source_table PARTITION BY key_col,
46+
input => TABLE source_table [PARTITION BY key_col],
4747
[op => DESCRIPTOR(op_column_name),]
4848
[op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
4949
)
5050
```
5151

52+
`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract mode). It is required when the mapping produces upsert mode (no `UPDATE_BEFORE`), because downstream operators need a key for updates and deletes. When provided, records are distributed by the partition key for parallel processing.
53+
5254
### Parameters
5355

5456
| Parameter | Required | Description |
5557
|:-------------|:---------|:------------|
56-
| `input` | Yes | The input table. Must be append-only and include `PARTITION BY` for parallel execution. |
58+
| `input` | Yes | The input table. Must be append-only. `PARTITION BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert mode (without `UPDATE_BEFORE`). |
5759
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. |
5860
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined operation codes to change operation names. Keys are user codes (e.g., `'c'`, `'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. |
5961

@@ -73,7 +75,7 @@ When `op_mapping` is omitted, the following standard names are used:
7375
The output columns are ordered as:
7476

7577
```
76-
[partition_key_columns, remaining_columns_without_op]
78+
[all_input_columns_without_op]
7779
```
7880

7981
The op column is removed from the output. Output rows carry the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
@@ -90,7 +92,7 @@ The op column is removed from the output. Output rows carry the appropriate chan
9092
-- +I[id:2, op:'DELETE', name:'Bob']
9193

9294
SELECT * FROM FROM_CHANGELOG(
93-
input => TABLE cdc_stream PARTITION BY id
95+
input => TABLE cdc_stream
9496
)
9597

9698
-- Output (updating table):
@@ -122,7 +124,7 @@ SELECT * FROM FROM_CHANGELOG(
122124

123125
```sql
124126
SELECT * FROM FROM_CHANGELOG(
125-
input => TABLE cdc_stream PARTITION BY id,
127+
input => TABLE cdc_stream,
126128
op => DESCRIPTOR(operation)
127129
)
128130
-- The operation column named 'operation' is used instead of 'op'
@@ -131,12 +133,11 @@ SELECT * FROM FROM_CHANGELOG(
131133
#### Table API
132134

133135
```java
134-
// Default: reads 'op' column with standard change operation names
135-
Table result = cdcStream.partitionBy($("id")).fromChangelog();
136+
// Default (retract mode): reads 'op' column with standard change operation names
137+
Table result = cdcStream.fromChangelog();
136138

137-
// Debezium-style mapping
138-
Table result = cdcStream.partitionBy($("id")).fromChangelog(
139-
descriptor("__op").asArgument("op"),
139+
// Upsert mode requires PARTITION BY — use the generic process() method
140+
Table result = cdcStream.partitionBy($("id")).process("FROM_CHANGELOG",
140141
map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
141142
);
142143
```

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,33 @@ default TableResult executeInsert(
13231323
*/
13241324
PartitionedTable partitionBy(Expression... fields);
13251325

1326+
/**
1327+
* Converts this append-only table with an explicit operation code column into a dynamic table
1328+
* using the built-in {@code FROM_CHANGELOG} process table function.
1329+
*
1330+
* <p>Each input row is expected to have a string operation code column (default: {@code "op"})
1331+
* that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The
1332+
* output table is a dynamic table backed by a changelog stream.
1333+
*
1334+
* <p>Optional arguments can be passed using named expressions:
1335+
*
1336+
* <pre>{@code
1337+
* // Default: reads 'op' column with standard change operation names
1338+
* table.fromChangelog();
1339+
*
1340+
* // Custom op column name and mapping (Debezium-style codes)
1341+
* table.fromChangelog(
1342+
* descriptor("__op").asArgument("op"),
1343+
* map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
1344+
* );
1345+
* }</pre>
1346+
*
1347+
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
1348+
* @return a dynamic {@link Table} with the op column removed and proper change operation
1349+
* semantics
1350+
*/
1351+
Table fromChangelog(Expression... arguments);
1352+
13261353
/**
13271354
* Converts this table object into a named argument.
13281355
*

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@ public PartitionedTable partitionBy(Expression... fields) {
497497
return new PartitionedTableImpl(this, Arrays.asList(fields));
498498
}
499499

500+
@Override
501+
public Table fromChangelog(Expression... arguments) {
502+
return process(BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), (Object[]) arguments);
503+
}
504+
500505
@Override
501506
public ApiExpression asArgument(String name) {
502507
return createArgumentExpression(operationTree, tableEnvironment, name);

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -137,15 +137,6 @@ public boolean isInternal() {
137137
return isInternal;
138138
}
139139

140-
/**
141-
* Returns the optional changelog mode resolver for declaring the output changelog mode of a
142-
* built-in process table function (e.g., FROM_CHANGELOG). Receives the {@link ChangelogContext}
143-
* so it can inspect function arguments to dynamically determine the changelog mode.
144-
*/
145-
public Optional<Function<ChangelogContext, ChangelogMode>> getChangelogModeResolver() {
146-
return Optional.ofNullable(changelogModeResolver);
147-
}
148-
149140
public String getQualifiedName() {
150141
if (isInternal) {
151142
return name;

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,8 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
823823
false,
824824
EnumSet.of(
825825
StaticArgumentTrait.TABLE,
826-
StaticArgumentTrait.SET_SEMANTIC_TABLE)),
826+
StaticArgumentTrait.SET_SEMANTIC_TABLE,
827+
StaticArgumentTrait.OPTIONAL_PARTITION_BY)),
827828
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
828829
StaticArgument.scalar(
829830
"op_mapping",

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@
6363
* </ol>
6464
*
6565
* <p>Emitting changelogs is only valid for PTFs that take table arguments with set semantics (see
66-
* {@link ArgumentTrait#SET_SEMANTIC_TABLE}). In case of upserts, the upsert key must be equal to
67-
* the PARTITION BY key.
66+
* {@link ArgumentTrait#SET_SEMANTIC_TABLE}). When using {@code OPTIONAL_PARTITION_BY}, the
67+
* PARTITION BY clause can be omitted for retract mode (with {@link RowKind#UPDATE_BEFORE}), since
68+
* the stream is self-describing. In case of upserts, the upsert key must be equal to the PARTITION
69+
* BY key.
6870
*
6971
* <p>It is perfectly valid for a {@link ChangelogFunction} implementation to return a fixed {@link
7072
* ChangelogMode}, regardless of the {@link ChangelogContext}. This approach may be appropriate when

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.flink.table.types.inference.TypeStrategy;
3737
import org.apache.flink.table.types.logical.LogicalTypeFamily;
3838
import org.apache.flink.types.ColumnList;
39-
import org.apache.flink.types.RowKind;
4039

4140
import java.util.HashSet;
4241
import java.util.List;
@@ -166,6 +165,16 @@ private static Optional<List<DataType>> validateInputs(
166165
}
167166
}
168167

168+
// Upsert mode (no UPDATE_BEFORE in mapping) requires PARTITION BY for key-based routing
169+
final boolean isUpsertMode =
170+
opMapping.isPresent() && !containsUpdateBefore(opMapping.get());
171+
final boolean hasPartitionBy = tableSemantics.partitionByColumns().length > 0;
172+
if (isUpsertMode && !hasPartitionBy) {
173+
throw new ValidationException(
174+
"Upsert changelog mode (without UPDATE_BEFORE) requires a PARTITION BY clause. "
175+
+ "Either add PARTITION BY or include UPDATE_BEFORE in the op_mapping.");
176+
}
177+
169178
return Optional.of(callContext.getArgumentDataTypes());
170179
}
171180

@@ -223,6 +232,13 @@ private static String resolveOpColumnName(final CallContext callContext) {
223232
.orElse(DEFAULT_OP_COLUMN_NAME);
224233
}
225234

235+
/** Returns true if the op_mapping values contain UPDATE_BEFORE. */
236+
@SuppressWarnings("rawtypes")
237+
private static boolean containsUpdateBefore(final Map opMapping) {
238+
return opMapping.values().stream()
239+
.anyMatch(v -> v instanceof String && "UPDATE_BEFORE".equals(((String) v).trim()));
240+
}
241+
226242
private static List<Field> buildOutputFields(
227243
final TableSemantics tableSemantics, final String opColumnName) {
228244
final Set<Integer> partitionKeys =
@@ -246,18 +262,16 @@ private static List<Field> buildOutputFields(
246262
* (default) or includes UPDATE_BEFORE, returns retract mode (all). Otherwise, returns upsert
247263
* mode (no UPDATE_BEFORE).
248264
*/
249-
@SuppressWarnings({"unchecked", "rawtypes"})
265+
@SuppressWarnings("rawtypes")
250266
public static ChangelogMode resolveChangelogMode(final ChangelogContext changelogContext) {
251267
final Optional<Map> opMapping = changelogContext.getArgumentValue(2, Map.class);
252268
if (opMapping.isEmpty()) {
253269
// Default mapping includes UPDATE_BEFORE -> retract mode
254270
return ChangelogMode.all();
255271
}
256-
final boolean hasUpdateBefore =
257-
((Map<String, String>) opMapping.get())
258-
.values().stream()
259-
.anyMatch(v -> RowKind.UPDATE_BEFORE.name().equals(v.trim()));
260-
return hasUpdateBefore ? ChangelogMode.all() : ChangelogMode.upsert(false);
272+
return containsUpdateBefore(opMapping.get())
273+
? ChangelogMode.all()
274+
: ChangelogMode.upsert(false);
261275
}
262276

263277
private FromChangelogTypeStrategy() {}

flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,15 @@ class FromChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase {
4646
@Override
4747
protected Stream<TestSpec> testData() {
4848
return Stream.of(
49-
// Valid: all three arguments with Debezium-style mapping
50-
TestSpec.forStrategy("Valid with all arguments", FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
49+
// Valid: upsert mapping with PARTITION BY
50+
TestSpec.forStrategy(
51+
"Valid upsert with partition by",
52+
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
5153
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE)
52-
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
54+
.calledWithTableSemanticsAt(
55+
0,
56+
new TableSemanticsMock(
57+
TABLE_TYPE, new int[] {0}, new int[0], -1, null))
5358
.calledWithLiteralAt(1, ColumnList.of(List.of("op")))
5459
.calledWithLiteralAt(
5560
2,
@@ -126,6 +131,21 @@ protected Stream<TestSpec> testData() {
126131
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
127132
.calledWithLiteralAt(1, ColumnList.of(List.of("op")))
128133
.calledWithLiteralAt(2, Map.of("c", "INSERT", "r", "INSERT"))
129-
.expectErrorMessage("Duplicate change operation: 'INSERT'"));
134+
.expectErrorMessage("Duplicate change operation: 'INSERT'"),
135+
136+
// Error: upsert mode without PARTITION BY
137+
TestSpec.forStrategy(
138+
"Upsert without PARTITION BY", FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
139+
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE)
140+
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
141+
.calledWithLiteralAt(1, ColumnList.of(List.of("op")))
142+
.calledWithLiteralAt(
143+
2,
144+
Map.of(
145+
"c", "INSERT",
146+
"u", "UPDATE_AFTER",
147+
"d", "DELETE"))
148+
.expectErrorMessage(
149+
"Upsert changelog mode (without UPDATE_BEFORE) requires a PARTITION BY"));
130150
}
131151
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1713,7 +1713,10 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
17131713
val changelogContext =
17141714
toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode)
17151715
val changelogMode = changelogFunction.getChangelogMode(changelogContext)
1716-
if (!changelogMode.containsOnly(RowKind.INSERT)) {
1716+
if (
1717+
!changelogMode.containsOnly(RowKind.INSERT) &&
1718+
!changelogMode.contains(RowKind.UPDATE_BEFORE)
1719+
) {
17171720
verifyPtfTableArgsForUpdates(call)
17181721
}
17191722
toTraitSet(changelogMode)
@@ -1722,6 +1725,15 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
17221725
}
17231726
}
17241727

1728+
/**
1729+
* Verifies that PTFs with upsert output (without UPDATE_BEFORE) use set semantics.
1730+
*
1731+
* Retract mode (with UPDATE_BEFORE) is self-describing — each update carries both the old and new
1732+
* value, so downstream can process it without a key. Row semantics is safe.
1733+
*
1734+
* Upsert mode (without UPDATE_BEFORE) requires a key to look up previous values, so set semantics
1735+
* with PARTITION BY is required.
1736+
*/
17251737
private def verifyPtfTableArgsForUpdates(call: RexCall): Unit = {
17261738
StreamPhysicalProcessTableFunction
17271739
.getProvidedInputArgs(call)
@@ -1730,7 +1742,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
17301742
tableArg =>
17311743
if (tableArg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) {
17321744
throw new ValidationException(
1733-
s"PTFs that take table arguments with row semantics don't support updating output. " +
1745+
s"PTFs that take table arguments with row semantics don't support upsert output. " +
17341746
s"Table argument '${tableArg.getName}' of function '${call.getOperator.toString}' " +
17351747
s"must use set semantics.")
17361748
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ public List<TableTestProgram> programs() {
4343
FromChangelogTestPrograms.DEBEZIUM_MAPPING,
4444
FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
4545
FromChangelogTestPrograms.CUSTOM_OP_NAME,
46-
FromChangelogTestPrograms.TABLE_API_DEFAULT,
47-
FromChangelogTestPrograms.MISSING_PARTITION_BY
46+
FromChangelogTestPrograms.TABLE_API_DEFAULT
47+
// TODO: UPSERT_WITHOUT_PARTITION_BY validation is tested in
48+
// FromChangelogInputTypeStrategyTest
4849
// TODO: enable after TO_CHANGELOG switches to row semantics (PR #27911)
4950
// FromChangelogTestPrograms.ROUND_TRIP
5051
);

0 commit comments

Comments
 (0)