Skip to content

[FLINK-37730][Job Manager] Expose JM exception as K8s exceptions #978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

vsantwana
Copy link

@vsantwana vsantwana commented May 6, 2025

What is the purpose of the change

(For example: This pull request adds a new feature to periodically create and maintain savepoints through the FlinkDeployment custom resource.)

This pull requests adds a new feature to periodically check for job exceptions using the FLINK REST API for getting the exceptions and raise them as kubernetes events. This feature will be helpful for monitoring systems that want to do a post processing on the job exceptions.

This is ONLY introduced for Application mode and NOT Session mode.

Brief change log

(for example:)

  • Periodic pulling of job exceptions (only done when the job manager is NOT in a terminal state)
  • New SYSTEM_ADVANCED config for configuring the max number of exceptions reported and max lenght for stacktrace (defaults are 5 and 10 respectively)
  • Most of the information is in the k8s event
  • Introduced new utility method to trigger the event with annotations
  • Introduced new test method to help testing the configuration

Verifying this change

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changes to the CustomResourceDescriptors: (yes / no)
  • Core observer or reconciler logic that is regularly executed: Yes, this changes the observer.

Documentation

  • Does this pull request introduce a new feature? Yes
  • If yes, how is the feature documented? I have to figure out the documentation part

Copy link

@rmetzger rmetzger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this draft a lot.

@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a look?

@gyfora
Copy link
Contributor

gyfora commented May 6, 2025

I like this draft a lot.

@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a

I like this draft a lot.

@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a look?

Thank you!
I will try to take a look in the next 1-2 days :)

@morhidi
Copy link
Contributor

morhidi commented May 6, 2025

I like this draft a lot.
@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a

I like this draft a lot.
@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a look?

Thank you! I will try to take a look in the next 1-2 days :)

I like this draft a lot.
@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a

I like this draft a lot.
@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a look?

Thank you! I will try to take a look in the next 1-2 days :)

I like this draft a lot.
@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a

I like this draft a lot.
@gyfora can you take a quick look as well, or would you prefer this to be fully ready before you take a look?

Thank you! I will try to take a look in the next 1-2 days :)

It'd be great to catch and turn every job exception into a k8s event, not just for terminal job failures. It'd simplify collecting historical diagnostic data before an actual crash occurs.

@vsantwana
Copy link
Author

It'd be great to catch and turn every job exception into a k8s event, not just for terminal job failures. It'd simplify collecting historical diagnostic data before an actual crash occurs.

@morhidi Sorry I do not understand this. I am not checking for only terminal job failures. I am checking for all the failures, when the job is not in one of the terminal states.
Lmk if I have misunderstood your comment.

@morhidi
Copy link
Contributor

morhidi commented May 7, 2025

It'd be great to catch and turn every job exception into a k8s event, not just for terminal job failures. It'd simplify collecting historical diagnostic data before an actual crash occurs.

@morhidi Sorry I do not understand this. I am not checking for only terminal job failures. I am checking for all the failures, when the job is not in one of the terminal states. Lmk if I have misunderstood your comment.

nm I miss-read it at first glance

@vsantwana vsantwana marked this pull request as ready for review May 16, 2025 09:09
@vsantwana vsantwana requested review from gyfora, rmetzger and morhidi May 16, 2025 09:09
Comment on lines 137 to 139
public static boolean createIfNotExists(
KubernetesClient client,
HasMetadata target,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we call createWithAnnotationsIfNotExists() from this method to avoid code duplication?

Copy link
Author

@vsantwana vsantwana May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought about it but I did not do it because of the event time. In our case we had decided to set the exception time as event time, but I am not aware of how should it happen for other k8s events, so I kept them separate with the cost of duplicated code.
cc @gyfora

@vsantwana vsantwana requested review from gyfora and rmetzger May 19, 2025 10:28
Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is looking pretty good now, I added a few minor comments still

@vsantwana vsantwana requested a review from gyfora May 20, 2025 17:10
Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! As a future followup we could think about reducing the number of REST API calls we make to fetch exceptions.

At the moment this is done on every step but based on the job details that we get in previous steps in the observers we may be able to deduct that the job did not fail since the last time we checked so exceptions do not need to be queried.

If you could open a follow up ticket for that I think that would be nice :)

@gyfora
Copy link
Contributor

gyfora commented May 21, 2025

You need to regenerate the docs: mvn clean install -DskipTests -Pgenerate-doc

@gyfora
Copy link
Contributor

gyfora commented May 21, 2025

I hit the following error while running locally:

2025-05-21 15:42:22,998 o.a.f.k.o.o.JobStatusObserver  [WARN ][default/basic-example] Failed to fetch JobManager exception info.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://10.96.0.1:443/api/v1/namespaces/default/events. Message: Event in version "v1" cannot be handled as a Event: parsing time "2025-05-21T15:41:41.602Z[UTC]" as "2006-01-02T15:04:05.000000Z07:00": cannot parse ".602Z[UTC]" as ".000000". Received status: Status(apiVersion=v1, code=400, details=null, kind=Status, message=Event in version "v1" cannot be handled as a Event: parsing time "2025-05-21T15:41:41.602Z[UTC]" as "2006-01-02T15:04:05.000000Z07:00": cannot parse ".602Z[UTC]" as ".000000", metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=BadRequest, status=Failure, additionalProperties={}).

So something seems to be off with the time handling

Comment on lines +161 to +171
for (var exception : exceptions) {
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
if (lastRecorded != null && exceptionTime.isBefore(lastRecorded)) {
continue;
}

emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
if (++count >= maxEvents) {
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should take the last maxEvents exceptions from the list and start with those first so that new errors are always reported, but that may cause us to not report some later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants