Skip to content

[FLINK-39256][table] Support ORDER BY clause in Process Table Functions#27929

Open
twalthr wants to merge 1 commit intomasterfrom
FLINK-39256
Open

[FLINK-39256][table] Support ORDER BY clause in Process Table Functions#27929
twalthr wants to merge 1 commit intomasterfrom
FLINK-39256

Conversation

@twalthr
Copy link
Copy Markdown
Contributor

@twalthr twalthr commented Apr 14, 2026

What is the purpose of the change

This PR implements ORDER BY support for Process Table Functions (PTFs), enabling guaranteed ordered processing of rows within partitions based on time attributes and additional sort keys.

Brief change log

  1. ORDER BY Clause Support
  • PTFs can now specify ORDER BY in table arguments with set semantics
  • First column must be a time attribute (TIMESTAMP/TIMESTAMP_LTZ) in ascending order
  • Supports secondary sort columns with flexible ASC/DESC and NULLS FIRST/LAST options
  • Example: SELECT * FROM my_ptf(input => TABLE t PARTITION BY k ORDER BY (ts ASC, priority DESC))
  1. Enhanced Time Context API
  • Added TimeContext#tableWatermark() - returns the watermark of the specific input table currently being processed
  • Improved TimeContext#currentWatermark() documentation to clarify it returns the global minimum watermark across all inputs
  • Critical for multi-input scenarios where each table has independent watermarks
  1. Runtime Implementation
  • InputSortBuffer: Timer-based sorting service that buffers rows by timestamp and emits in sorted order when watermarks advance
  • MailboxPartialWatermarkProcessor: Manages partial watermark tracking per input table for multi-input PTFs
  • Optimized timer usage from O(K × T) to O(K) where K = keys and T = unique timestamps
  • Late events (arriving after watermark) are automatically dropped to maintain ordering guarantees
  1. TableSemantics Enhancements
  • Added orderByColumns() - returns ORDER BY column indices
  • Added orderByDirections() - returns sort directions with null handling
  • Added SortDirection enum (ASC_NULLS_FIRST, ASC_NULLS_LAST, DESC_NULLS_FIRST, DESC_NULLS_LAST)
  • Enables runtime access to ordering metadata within PTF implementations
  1. Planner Integration
  • Extended RexTableArgCall to track ORDER BY information
  • Updated type inference system to validate and propagate ordering constraints
  • Enhanced serialization/deserialization for plan persistence

Verifying this change

This change added tests and can be verified as follows:

ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY
ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY_TABLE_API
ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY_SINGLE_PARTITION_TABLE_API
ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY_RESTORE
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE

and various unit tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 14, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

```java
// Function that processes events in order and captures the ordering
public static class OrderedProcessor extends ProcessTableFunction<List<Event>> {
public static class Event {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it work with Event as record?
I wonder if we should apply it to examples as well

Optional<ChangelogMode> changelogMode();

/** Sort direction for ORDER BY columns in table arguments with set semantics. */
enum SortDirection implements Serializable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need it to have Serializable?
aren't java's enums serializable by default?

Comment on lines +134 to +144
/** Ascending order with nulls first. */
ASC_NULLS_FIRST(false, true),
/** Ascending order with nulls last. */
ASC_NULLS_LAST(false, false),
/** Descending order with nulls first. */
DESC_NULLS_FIRST(true, true),
/** Descending order with nulls last. */
DESC_NULLS_LAST(true, false);

private final boolean descending;
private final boolean nullsFirst;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@snuyanzin snuyanzin Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I guess in case we know something more about data like if it is monotonic, then order by might be optimized (Calcite's RelFieldCollation already has some functionality for that)

}

@Override
public PartitionedTable orderBy(Expression... fields) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public PartitionedTable orderBy(Expression... fields) {
public PartitionedTable orderBy(Expression... expressions) {

in general it could be not only fields, right?

Comment on lines +63 to +64
* @param fields expressions for ordering (e.g., {@code $("ts").asc(), $("score").desc()})
* @return a partitioned and ordered table
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some weird tests with expressions in ORDER BY like for instance

"SELECT d, h, dense_rank() over (order by cast(d as tinyint), cast(h as smallint) desc)" +
" FROM Table5",

are there any limitations for PTF case?

can there be order by by some PTF?
do we have any validation around this?


public PartitionQueryOperation(
List<ResolvedExpression> partitionExpressions, QueryOperation child) {
this(partitionExpressions, Collections.emptyList(), child);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this(partitionExpressions, Collections.emptyList(), child);
this(partitionExpressions, List.of(), child);

private final List<Expression> orderKeys;

private PartitionedTableImpl(TableImpl table, List<Expression> partitionKeys) {
this(table, partitionKeys, Collections.emptyList());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this(table, partitionKeys, Collections.emptyList());
this(table, partitionKeys, List.of());


final MailboxPartialWatermarkProcessor processor =
new MailboxPartialWatermarkProcessor(
"test", mailboxExecutor, Collections.emptyList(), consumedWatermarks::add);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"test", mailboxExecutor, Collections.emptyList(), consumedWatermarks::add);
"test", mailboxExecutor, List.of(), consumedWatermarks::add);

new MailboxPartialWatermarkProcessor(
"test",
mailboxExecutor,
Arrays.asList(service1, service2, service3),
Copy link
Copy Markdown
Contributor

@snuyanzin snuyanzin Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Arrays.asList(service1, service2, service3),
List.of(service1, service2, service3),

new MailboxPartialWatermarkProcessor(
"test",
mailboxExecutor,
Collections.singletonList(timerService),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collections.singletonList(timerService),
List.of(timerService),

new MailboxPartialWatermarkProcessor(
"test",
mailboxExecutor,
Collections.singletonList(timerService),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collections.singletonList(timerService),
List.of(timerService),

new MailboxPartialWatermarkProcessor(
"test",
mailboxExecutor,
Collections.singletonList(timerService),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collections.singletonList(timerService),
List.of(timerService),

new MailboxPartialWatermarkProcessor(
"test",
mailboxExecutor,
Collections.singletonList(timerService),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collections.singletonList(timerService),
List.of(timerService),

new MailboxPartialWatermarkProcessor(
"test",
mailboxExecutor,
Collections.singletonList(timerService),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collections.singletonList(timerService),
List.of(timerService),

new MailboxPartialWatermarkProcessor(
"test",
mailboxExecutor,
Collections.singletonList(timerService),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Collections.singletonList(timerService),
List.of(timerService),

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants