Skip to content

[KYUUBI #6832] Initial impl Spark DSv2 YARN Connector that supports reading YARN aggregated logs#7455

Closed
pan3793 wants to merge 9 commits into
apache:masterfrom
pan3793:yarn-agg-log
Closed

[KYUUBI #6832] Initial impl Spark DSv2 YARN Connector that supports reading YARN aggregated logs#7455
pan3793 wants to merge 9 commits into
apache:masterfrom
pan3793:yarn-agg-log

Conversation

@pan3793
Copy link
Copy Markdown
Member

@pan3793 pan3793 commented May 17, 2026

Why are the changes needed?

Close #6832. This connector gives the Hadoop administrator a chance to analyze YARN aggregated logs at the cluster level, for example, aggregate container logs across applications by host to diagnose potential host hardware issues

The current initial implementation has several limitations:

  • feat: only support TFile, but not IFile
  • perf: only supports pushing down app_id, user, host filters, does not support pushing down container_id, log_type, though they are supposed to be selective
  • perf: listing aggregated log files runs in a single thread during the planning phase - for a large cluster, it should run in parallel on the driver side or launch a job to do that on the executor side.
  • etc.

How was this patch tested?

UT is not added yet, as this requires a real YARN cluster with history agg logs.

$ kyuubi-beeline -u 'jdbc:kyuubi://spark-dev1.foo.bar:10009/default' \
  --conf spark.jars=/tmp/kyuubi-spark-connector-yarn_2.12-1.12.0-SNAPSHOT.jar \
  --conf spark.sql.catalog.yarn=org.apache.kyuubi.spark.connector.yarn.YarnCatalog
0: > select
. .>   mtime, app_id, container_id, host, log_type, message
. .> from yarn.app_logs
. .> where
. .>   user = 'hadoop'
. .>   and host = 'spark-dev2.foo.bar'
. .>   and message like '%ERROR%'
. .>   and message not like '%RECEIVED SIGNAL TERM%'
. .>   and message not like '%Aborting task%'
. .> limit 2;
...
+--------------------------+---------------------------------+-----------------------------------------+---------------------+-----------+----------------------------------------------------+
|          mtime           |             app_id              |              container_id               |        host         | log_type  |                      message                       |
+--------------------------+---------------------------------+-----------------------------------------+---------------------+-----------+----------------------------------------------------+
| 2025-04-03 18:07:18.893  | application_1743671377509_0001  | container_1743671377509_0001_01_000001  | spark-dev2.foo.bar  | stdout    | 25/04/03 18:07:15 ERROR ApplicationMaster$AMEndpoint: Driver terminated with exit code 1! Shutting down. spark-dev1.foo.bar:16601 |
| 2025-04-03 18:07:18.893  | application_1743671377509_0001  | container_1743671377509_0001_01_000001  | spark-dev2.foo.bar  | stdout    | 25/04/03 18:07:15 ERROR ApplicationMaster$AMEndpoint: Driver terminated with exit code 1! Shutting down. spark-dev1.foo.bar:16601 |
+--------------------------+---------------------------------+-----------------------------------------+---------------------+-----------+----------------------------------------------------+
2 rows selected (0.648 seconds)

Was this patch authored or co-authored using generative AI tooling?

Assisted-by: Claude Opus 4.7.


<artifactId>kyuubi-spark-connector-yarn_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Spark Hadoop YARN Connector</name>
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a generic name - we may extend it to include other interactions with YARN, for example, use YarnClient to retrieve app list from RM, implement STORE PROCEDUREs which are equivalent to the yarn commands

@pan3793
Copy link
Copy Markdown
Member Author

pan3793 commented May 17, 2026

This is a longstanding missing feature for Hadoop. YARN-1440 was raised in 2013 - Yarn aggregated logs are difficult for external tools to understand. Large-scale log processing is a typical use case for Hadoop, I'm surprised there is not an out-of-the-box solution in 2026 to analyze logs of applications run in Hadoop YARN in batch.

@aajisaka @wForget @cxzl25, could you please take a look? and also would like to know if you have any better ideas to process yarn aggregate logs across the application

@aajisaka
Copy link
Copy Markdown
Member

I agreed it's a long standing missing feature. I'll take a look further.

private def listFilesWithFilters(): Array[FileStatus] = {
val baseDir = remoteAppLogDir
val bucketDir = s"bucket-$remoteAppLogDirSuffix-tfile"
var path = s"$baseDir/{{USER}}/$bucketDir/{{BUCKET}}/{{APP_ID}}/{{HOST}}_*"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we should be able to support paths without YARN-6929, which does not have buckets.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for pointing this out! YARN-6929 landed in Hadoop 3.3.0, I'm reading the branch-3.3 code, so missing this part, will add a TODO here for now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bucket folder layout seems to fit exactly TABLESAMPLE SYSTEM introduced by SPARK-55978

logWarning(s"Unsupported filter: $f")
}
val globPath = path
.replace("{{BUCKET}}", "*") // TODO parallize bucket listing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an app_id is entered, the bucket can also be calculated in advance to improve list performance.

org.apache.hadoop.yarn.logaggregation.LogAggregationUtils#getRemoteBucketDir

    int bucket = appId.getId() % 10000;
    String bucketDir = String.format("%04d", bucket);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks for pointing this out

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in line 85-86

Copy link
Copy Markdown
Member

@aajisaka aajisaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a sample YARN aggregated log and create a unit test to verify?

Comment on lines +40 to +42
override def listTables(namespace: Array[String]): Array[Identifier] = {
Array(Identifier.of(namespace, "app_logs"))
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catalog is not aware of namespace and it makes catalog.any_namespace.app_logs resolve to the same table. We need to at least document the limitation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not intended, will limit it to catalog.app_logs

@pan3793
Copy link
Copy Markdown
Member Author

pan3793 commented May 18, 2026

Can we create a sample YARN aggregated log and create a unit test to verify?

yeah, I plan to do that.

Copy link
Copy Markdown
Member

@wForget wForget left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pan3793 , LGTM

@pan3793
Copy link
Copy Markdown
Member Author

pan3793 commented May 23, 2026

Can we create a sample YARN aggregated log and create a unit test to verify?

@aajisaka I collect a set of app logs by setting up a Hadoop cluster and runs 3 Spark applications, and then add some basic unit tests to verify the app_logs reading, with and without filters pushdown.

/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
def withSparkSession[T](sc: SparkSession)(f: SparkSession => T): T = {
/** Runs `f` by passing in `spark` and ensures that `spark` is stopped. */
def withSparkSession[T](spark: SparkSession)(f: SparkSession => T): T = {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's naming fix. we should follow the naming policy defined in spark-shell, sc refers to the SparkContext, spark refers to SparkSession

@pan3793 pan3793 self-assigned this May 26, 2026
@pan3793 pan3793 added this to the v1.12.0 milestone May 26, 2026
@pan3793
Copy link
Copy Markdown
Member Author

pan3793 commented May 26, 2026

thanks all, merging to master

@pan3793 pan3793 closed this in 36fd762 May 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Impl Spark DSv2 YARN Connector that supports reading YARN aggregation logs

4 participants