[FLINK-38579] [table-planner] Fix incorrect UB drop for filter on non-upsert key #27371
+372
−80
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is the purpose
Linked issue: close https://issues.apache.org/jira/browse/FLINK-38579
This pull request fixes incorrect changelog mode inference when filters or non-equi join conditions are pushed down on non-upsert key columns. Without this fix, Flink incorrectly drops UPDATE_BEFORE events in these scenarios, leading to phantom rows in the output.
Problem: When a filter like
c < 2(wherecis a non-upsert key column) is pushed down to a changelog source, and a row withc=1is updated toc=2, the old rowc=1matches the filter but the new rowc=2doesn't. The planner was incorrectly allowingDropUpdateBeforeoptimization, which caused the UPDATE_BEFORE event to be lost. Without the UPDATE_BEFORE, downstream operators couldn't retract the oldc=1row, leaving it incorrectly in the result.Solution: The fix prevents
ONLY_UPDATE_AFTERandDELETE_BY_KEYchangelog mode when filters or non-equi join conditions reference non-upsert key columns, ensuring UPDATE_BEFORE events are preserved for correct retraction semantics.Brief change log
referencesNonUpsertKeyColumns()helper to check if RexNodes reference non-upsert key columnshasNonUpsertKeyFilterPushedDown()to detect filters on non-upsert keys in TableSourceScanhasNonUpsertKeyNonEquiCondition()to detect non-equi join conditions on non-upsert keys, with precise left/right input analysisSatisfyUpdateKindTraitVisitorto rejectONLY_UPDATE_AFTERfor TableSourceScan when filter references non-upsert keysSatisfyUpdateKindTraitVisitorto rejectONLY_UPDATE_AFTERfor StreamPhysicalJoin when non-equi condition references non-upsert keysVerifying this change
This change added tests and can be verified as follows:
ChangelogSourceITCase.testFilterPushedDownOnNonUpsertKey()- Tests filter pushed down on non-upsert key column with UPDATE_BEFORE preservation (7 parameterized configurations, 6 pass, 1 skipped for incompatible CDC duplicate + MiniBatch)ChangelogSourceITCase.testJoinWithNonEquivConditionOnNonUpsertKey()- Tests non-equi join condition on left side non-upsert key columnChangelogSourceITCase.testJoinWithNonEquivConditionOnRightNonUpsertKey()- Tests non-equi join condition on right side non-upsert key column to validate left/right split logicNote:
The CDC duplicate + MiniBatch configuration correctly fails to generate an execution plan for filter pushdown with this fix. In this case, the filter is evaluated at the source, where ChangelogNormalize enforces ONLY_UPDATE_AFTER to deduplicate updates, while filtering on a non-upsert key requires UPDATE_BEFORE for correctness. These requirements conflict at the source level, so the planner correctly rejects this configuration.
Join non-equi conditions are evaluated downstream in the join operator, so they do not conflict with ChangelogNormalize.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation