Skip to content

[FLINK-37604] Generate static UIDs for pipeline operators #3977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 30, 2025

Conversation

morozov
Copy link
Contributor

@morozov morozov commented Apr 2, 2025

This PR adds a new configuration parameter operator.uid.prefix1. Once it's specified, the pipeline composer will generate static UIDs for all its operators using this prefix. This way, for a given job, all operator UIDs will be preserved if the job graph changes.

By default, the operator UID prefix is not set. In this case, the pipeline composer will not generate operator UIDs in order to preserve backward compatibility of the existing jobs with their state that includes operator UIDs generated by Flink.

It should be recommended to set this parameter as it is recommended by Flink.

Please let me know if this a directionally right change, and I will update the documentation.

Footnotes

  1. A similar approach is used in Apache Iceberg's FlinkSink.Builder (reference).

@yuxiqian
Copy link
Member

yuxiqian commented Apr 3, 2025

It seems we already have the pipeline option schema.operator.uid to configure UID, but only for Schema Operators. So maybe the description of operator.uid.prefix isn't precise as it doesn't apply to “all pipeline operators”.

Agree that keeping all operators UID fixed (not only for schema operators) is the right thing, so perhaps we can deprecate schema.operator.uid and favor operator.uid.prefix? We may keep state backwards compatibility with extra checking:

schema.operator.uid set schema.operator.uid not set
operator.uid.prefix set Incompatible configurations
Throw exceptions (?)
Set fixed UID for all operators
(including schema operators)
operator.uid.prefix not set Only set UID for schema operators
(behavior unchanged for state compatibility)
Only set UID for schema operators
with the default value of schema.operator.uid
(behavior unchanged for state compatibility)

and remove schema.operator.uid as a breaking change later. WDYT?

@morozov
Copy link
Contributor Author

morozov commented Apr 3, 2025

@yuxiqian that sounds like a plan. I will work on making these changes.

@lvyanquan
Copy link
Contributor

Hi @yuxiqian please take a look at this if you have time.

@yuxiqian
Copy link
Member

Thanks for @morozov's quick response. Do you think we need an IT case to verify if operator UIDs are correctly set by examining Flink execution plan JSON (like #3887)?

@morozov
Copy link
Contributor Author

morozov commented Apr 23, 2025

Do you think we need an IT case to verify if operator UIDs are correctly set by examining Flink execution plan JSON (like #3887)?

@yuxiqian, it doesn't look like the execution plan JSON contains operator UIDs. It contains IDs (the numeric identifiers that start with 1 and increment by 1). For example (from the test you referenced):

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Distributed Source",
    "pact" : "Data Source",
    "contents" : "Source: Distributed Source",
    "parallelism" : 9
  }, {
    "id" : 2,
    "type" : "Partitioning",
    "pact" : "Operator",
    "contents" : "Partitioning",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "REBALANCE",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "SchemaMapper",
    "pact" : "Operator",
    "contents" : "SchemaMapper",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "CUSTOM",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "Sink: Sink Writer: Value Sink",
    "pact" : "Data Sink",
    "contents" : "Sink: Sink Writer: Value Sink",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "REBALANCE",
      "side" : "second"
    } ]
  } ]
}

@morozov morozov force-pushed the FLINK-37604-operator-uid-prefix branch from aeb5798 to 481dccb Compare April 23, 2025 01:31
@github-actions github-actions bot added the build label Apr 23, 2025
@morozov morozov force-pushed the FLINK-37604-operator-uid-prefix branch from 481dccb to 0343c5a Compare April 23, 2025 01:38
@github-actions github-actions bot removed the build label Apr 23, 2025
@yuxiqian
Copy link
Member

Thanks for double-checking this; it seems we could only get the hashed VertexId in the JobGraph, and could not verify this in unit tests reliably. It should not be a blocker.

@morozov morozov force-pushed the FLINK-37604-operator-uid-prefix branch from 0343c5a to 26410c2 Compare April 23, 2025 16:42
@github-actions github-actions bot added the docs Improvements or additions to documentation label Apr 23, 2025
@morozov
Copy link
Contributor Author

morozov commented Apr 23, 2025

It looks like there's some active development happening in the code modified by this PR. Recently, I resolved conflicts with the changes from #3812, and then with the ones from #3986.

@yuxiqian, is there anything else I can do before the merge?

I added the new parameter to the documentation in English and can update the Chinese if I know what to put there.

@yuxiqian
Copy link
Member

Thanks for @morozov's nice work, LGTM. Would @lvyanquan like to take a further look?


Suggested Chinese translation:

Index: docs/content.zh/docs/core-concept/data-pipeline.md
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/docs/content.zh/docs/core-concept/data-pipeline.md b/docs/content.zh/docs/core-concept/data-pipeline.md
--- a/docs/content.zh/docs/core-concept/data-pipeline.md	(revision 54768703ec976b712e62f4184b7b24c8319f8e69)
+++ b/docs/content.zh/docs/core-concept/data-pipeline.md	(date 1745460782234)
@@ -111,9 +111,10 @@
 # Pipeline 配置
 下面 是 Data Pipeline 的一些可选配置:
 
-| 参数                      | 含义                                                  | optional/required |
-|-------------------------|-----------------------------------------------------|-------------------|
-| name                    | 这个 pipeline 的名称,会用在 Flink 集群中作为作业的名称。               | optional          |
-| parallelism             | pipeline的全局并发度,默认值是1。                               | optional          |
-| local-time-zone         | 作业级别的本地时区。                                          | optional          |
-| execution.runtime-mode  | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional          |
\ No newline at end of file
+| 参数                     | 含义                                                  | optional/required |
+|------------------------|-----------------------------------------------------|-------------------|
+| name                   | 这个 Pipeline 的名称,会用在 Flink 集群中作为作业的名称。               | optional          |
+| parallelism            | Pipeline 的全局并发度,默认值是1。                              | optional          |
+| local-time-zone        | 作业级别的本地时区。                                          | optional          |
+| execution.runtime-mode | Pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional          |
+| operator.uid.prefix    | Pipeline 中算子 UID 的前缀。如果不设置,Flink 会为每个算子生成唯一的 UID。   | optional          |
\ No newline at end of file

@morozov morozov force-pushed the FLINK-37604-operator-uid-prefix branch from 26410c2 to 105c741 Compare April 24, 2025 17:10
@morozov morozov force-pushed the FLINK-37604-operator-uid-prefix branch from 105c741 to 3ed00f4 Compare April 29, 2025 16:41
@lvyanquan lvyanquan self-assigned this May 27, 2025
.noDefaultValue()
.withDescription(
"The prefix to use for all pipeline operator UIDs. If not set, all pipeline operator UIDs will be generated by Flink.");

public static final ConfigOption<String> PIPELINE_SCHEMA_OPERATOR_UID =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an annotation of @deprecated.

| `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes. | optional |
| `operator.uid.prefix` | The prefix to use for all pipeline operator UIDs. If not set, all pipeline operator UIDs will be generated by Flink. | optional |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should clarify that this is a recommended parameter to set and explain the reason behind it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

......
It is recommended to set this parameter to ensure stable and recognizable operator UIDs, which can help with stateful upgrades, troubleshooting, and Flink UI diagnostics.

| `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes. | optional |
| `operator.uid.prefix` | The prefix to use for all pipeline operator UIDs. If not set, all pipeline operator UIDs will be generated by Flink. | optional |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

......
It is recommended to set this parameter to ensure stable and recognizable operator UIDs, which can help with stateful upgrades, troubleshooting, and Flink UI diagnostics.

| local-time-zone | 作业级别的本地时区。 | optional |
| execution.runtime-mode | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional |
| operator.uid.prefix | Pipeline 中算子 UID 的前缀。如果不设置,Flink 会为每个算子生成唯一的 UID。 | optional |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

......
建议设置这个参数以提供稳定和可识别的算子 ID,这有助于有状态升级、问题排查和在 Flink UI 上的诊断。

Copy link
Contributor

@lvyanquan lvyanquan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@lvyanquan lvyanquan merged commit e8f9ff0 into apache:master May 30, 2025
23 checks passed
@morozov morozov deleted the FLINK-37604-operator-uid-prefix branch May 30, 2025 06:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants