Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Oct 23, 2025

What changes were proposed in this pull request?

This PR aims to reenable SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession are isolated.

#48736 disabled this test because of it was flaky. In this test, futures ran on threads managed by ForkJoinPool.
Each future invokes SparkSession#addTag and SparkSession#getTag, and tags are implemented using InheritableThreadLocal. So the root cause of this issue is same as #52417.

But #48906 replaced ForkJoinPool with Executors.newFixedThreadPool(3) so I believe this issue no longer occurs.
In fact, this issue can be reproduced by replacing Executors.newFixedThreadPool(3) with new ForkJoinPool(3) and inserting a sleep like as follows.

     // global ExecutionContext has only 2 threads in Apache Spark CI
     // create own thread pool for four Futures used in this test
-    val threadPool = Executors.newFixedThreadPool(3)
+    val threadPool = new ForkJoinPool(3)

...

+      Thread.sleep(1000)
       val jobB = Future {
         sessionB = globalSession.cloneSession()
         import globalSession.implicits._

Then, run the test as follows.

$ build/sbt 'sql/testOnly org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite -- -z "Cancellation APIs in Spark\
Session are isolated"'
info] - Cancellation APIs in SparkSession are isolated *** FAILED *** (2 seconds, 726 milliseconds)
[info]   ArrayBuffer({"spark.app.startTime"="1761192376305", "spark.rdd.scope"="{"id":"3","name":"Exchange"}", "spark.hadoop.fs.s3a.vectored.read.min.seek.size"="128K", "spark.hadoop.hadoop.caller.context.enabled"="true", "spark.memory.debugFill"="true", "spark.master.rest.enabled"="false", "spark.sql.warehouse.dir"="file:/Users/sarutak/oss/spark/sql/core/spark-warehouse", "spark.master"="local[2]", "spark.job.interruptOnCancel"="true", "spark.app.name"="test", "spark.driver.host"="192.168.1.109", "spark.app.id"="local-1761192376735", "spark.job.tags"="spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c,spark-session-8c09c25f-089c-41ee-add1-1de463658349-thread-6b832f9d-3a55-4d1f-b47d-418fc2ed05e4-one,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-execution-root-id-0,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-thread-a4b5b347-6e56-4416-b3a5-37a312bdfe34-one,spark-session-8c09c25f-089c-41ee-add1-1de463658349,spark-session-8c09c25f-089c-41ee-add1-1de463658349-thread-6b832f9d-3a55-4d1f-b47d-418fc2ed05e4-two", "spark.unsafe.exceptionOnMemoryLeak"="true", "spark.sql.execution.root.id"="0", "spark.ui.showConsoleProgress"="false", "spark.driver.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled -Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE --enable-native-access=ALL-UNNAMED", "spark.driver.port"="56972", "spark.testing"="true", "spark.hadoop.fs.s3a.vectored.read.max.merged.size"="2M", "spark.sql.execution.id"="1", "spark.rdd.scope.noOverride"="true", "spark.executor.id"="driver", "spark.port.maxRetries"="100", "spark.executor.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled -Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE --enable-native-access=ALL-UNNAMED", "spark.test.home"="/Users/sarutak/oss/spark", "spark.ui.enabled"="false"}, {"spark.app.startTime"="1761192376305", "spark.rdd.scope"="{"id":"5","name":"Exchange"}", "spark.hadoop.fs.s3a.vectored.read.min.seek.size"="128K", "spark.hadoop.hadoop.caller.context.enabled"="true", "spark.memory.debugFill"="true", "spark.master.rest.enabled"="false", "spark.sql.warehouse.dir"="file:/Users/sarutak/oss/spark/sql/core/spark-warehouse", "spark.master"="local[2]", "spark.job.interruptOnCancel"="true", "spark.app.name"="test", "spark.driver.host"="192.168.1.109", "spark.app.id"="local-1761192376735", "spark.job.tags"="spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-execution-root-id-0,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-thread-a4b5b347-6e56-4416-b3a5-37a312bdfe34-one,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c", "spark.unsafe.exceptionOnMemoryLeak"="true", "spark.sql.execution.root.id"="0", "spark.ui.showConsoleProgress"="false", "spark.driver.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled -Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE --enable-native-access=ALL-UNNAMED", "spark.driver.port"="56972", "spark.testing"="true", "spark.hadoop.fs.s3a.vectored.read.max.merged.size"="2M", "spark.sql.execution.id"="0", "spark.rdd.scope.noOverride"="true", "spark.executor.id"="driver", "spark.port.maxRetries"="100", "spark.executor.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled -Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE --enable-native-access=ALL-UNNAMED", "spark.test.home"="/Users/sarutak/oss/spark", "spark.ui.enabled"="false"}) had size 2 instead of expected size 1 (SparkSessionJobTaggingAndCancellationSuite.scala:229)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$13(SparkSessionJobTaggingAndCancellationSuite.scala:229)
[info]   at scala.collection.immutable.List.foreach(List.scala:323)
[info]   at org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$6(SparkSessionJobTaggingAndCancellationSuite.scala:226)
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:323)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)

On the other hand, if inserting sleep but leaving Executors.newFixedThreadPool(3) as it is, this test always seems to pass.
So, we can now reenable this test.

Why are the changes needed?

For better test coverage.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

The test always passes on my dev environment even if inserting sleep like explained above.

Was this patch authored or co-authored using generative AI tooling?

No.

@sarutak sarutak marked this pull request as ready for review October 23, 2025 11:07
@github-actions github-actions bot added the SQL label Oct 23, 2025
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @sarutak .

I agree with his assessment and we can verify more in our various CIs by merging this PR.

Merged to master for Apache Spark 4.1.0-preview3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants