Skip to content

[FLINK-39377][table] Add initial implementation of ProcessTableFunctionTestHarness#27928

Open
autophagy wants to merge 1 commit intoapache:masterfrom
autophagy:FLINK-39377
Open

[FLINK-39377][table] Add initial implementation of ProcessTableFunctionTestHarness#27928
autophagy wants to merge 1 commit intoapache:masterfrom
autophagy:FLINK-39377

Conversation

@autophagy
Copy link
Copy Markdown
Contributor

@autophagy autophagy commented Apr 14, 2026

What is the purpose of the change

This change introduces an initial implementation of a test harness for PTFs, according to FLIP-567, for use in unit tests that do not require a running Flink cluster.

At a basic level, the harness allows users to set up test conditions with a builder API, which on build performs both test and PTF validation, and provides an auto-closable harness (that manages open/close). With this harness, users are able to pipe input rows into their PTF, and observe the collected output.

A motivating example:

@DataTypeHint("ROW<value INT>")
public class FilterPTF extends ProcessTableFunction<Row> {
    public void eval(
            @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
            @ArgumentHint(ArgumentTrait.SCALAR) int threshold) {
        int value = input.getFieldAs("value");
        if (value > threshold) {
            collect(Row.of(value));
        }
    }
}

@Test
void testFilter() throws Exception {
    try (ProcessTableFunctionTestHarness<Row> harness =
            ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
                    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
                    .withScalarArgument("threshold", 50)  // Configure scalar value
                    .build()) {

        harness.processElement(Row.of(30));
        harness.processElement(Row.of(70));

        List<Row> output = harness.getOutput();
        assertThat(output).containsExactly(Row.of(70));
    }
}

It currently supports:

  • Row-semantic tables (ROW_SEMANTIC_TABLE)
  • Set-semantic tables (SET_SEMANTIC_TABLE) with partition-by configuration
  • Scalar arguments (including scalar-only PTFs via invoke())
  • Multi-table PTFs with per-table element routing
  • Inline type annotations via @ArgumentHint(type = ...)
  • Supports PTFs that use Rows and structured types as their input and output types.
  • PASS_COLUMNS_THROUGH and OPTIONAL_PARTITION_BY argument traits.

The harness currently does not support the following, which will be added in subsequent PRs:

  • State with @StateHint
  • Timers (and the use of Context in general)
  • Update traits like SUPPORTS_UPDATES, REQUIRE_UPDATE_BEFORE

The missing feature set has been documented, along with some quickstart examples.

Brief change log

(for example:)

  • Added ProcessTableFunctionTestHarness
  • Added tests to validate behaviour of PTF test harness
  • Added initial user facing documentation

Verifying this change

This change added tests and can be verified as follows:

  • Added behaviour tests via ProcessTableFunctionTestHarnessTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs / JavaDocs)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 14, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@autophagy autophagy marked this pull request as ready for review April 15, 2026 07:13
…onTestHarness

This commit introduces an initial implementation of a test harness for PTFs, for use
in unit tests that do not require a running Flink cluster.

The implementation supports setting up the harness by configuration various test
parameters, like fixtures for scalar arguments, datatypes for table arguments and
partition settings for table arguments with set semantics.

The harness on build does type and structure validation, as well as ensuring the
test setup can handle the arguments defined on the PTF.

It supports PTFs that use scalar, set semantic table and row semantic table arguments,
as well as PTFs that have multiple of each. It supports PASS_COLUMN_THROUGH and
OPTIONAL_PARTITION_BY traits.

It currently does not support State, or Context (so no timers). It also does not enforce
some static argument traits like SUPPORTS_UPDATES, REQUIRE_UPDATE_BEFORE.
Copy link
Copy Markdown
Contributor

@spuru9 spuru9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits from early review. PTAL

private final HarnessCollector collector;

private final String defaultTableArgument;
private final java.lang.reflect.Method evalMethod;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.reflect.Method can be imported in the top section.
Same can be done for all others from reflect.*.

@@ -0,0 +1,361 @@
---
Copy link
Copy Markdown
Contributor

@spuru9 spuru9 Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The testing/ directory is missing an _index.md (needed to show the section in the sidebar). Could you add one following the pattern of the sibling directories like functions/?

&& activeTableArg.partitionColumnNames.length > 0
? String.format(
", partition columns: %s",
java.util.Arrays.toString(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import java.util.Arrays

* <pre>{@code
* ProcessTableFunctionTestHarness<Row> harness =
* ProcessTableFunctionTestHarness.ofClass(MyPTF.class)
* .withTableArgumentType("input", DataTypes.of("ROW<id INT, name STRING>"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* .withTableArgumentType("input", DataTypes.of("ROW<id INT, name STRING>"))
* .withTableArgument("input", DataTypes.of("ROW<id INT, name STRING>"))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants