diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ResourceBoundedExecutorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ResourceBoundedExecutorSuite.scala index b289d77de5f..d1da3e2a7f0 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ResourceBoundedExecutorSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ResourceBoundedExecutorSuite.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import java.util.concurrent.{Future => JFuture} +import java.util.concurrent.locks.{Lock, ReentrantLock} import scala.collection.mutable @@ -39,12 +40,18 @@ class ResourceBoundedExecutorSuite extends AnyFunSuite with RmmSparkRetrySuiteBa * Builds a dummy function that simulates the CPU task, which returns the timestamp of the * completion of the task. */ - private def buildDummyFn(sleepMs: Long = 5L): () => Long = { + private def buildDummyFn(startBlk: Lock): () => Long = { () => { - // Make sure the task execution slower than the task submission, so that the execution - // order will be determined by the task priority. - Thread.sleep(sleepMs) - System.nanoTime() + // Block the start of task execution until the lock is released. + startBlk.lock() + try { + // Make sure the task execution slower than the task submission, so that the execution + // order will be determined by the task priority. + Thread.sleep(5L) + System.nanoTime() + } finally { + startBlk.unlock() + } } } @@ -52,20 +59,22 @@ class ResourceBoundedExecutorSuite extends AnyFunSuite with RmmSparkRetrySuiteBa // different priorities and memory requirements, then verifying the actual execution sequence // matches the expected priority-based task scheduling behavior. test("AsyncCpuTask task priority") { + // Lock to control the start of task execution. + val lck = new ReentrantLock() // Create a single-threaded bounded executor to control the execution order of tasks. val executor = createSingleThreadedBoundedExecutor(maxThreadNumber = 1) // Test the task scheduling based on priority: // Tasks with higher priority should be executed first even if they are submitted later. // Execution order: 4, 2, 3, 5, 1 val executionPriority = Seq(2, 4, 3, 1, 5) - executor.submit(AsyncRunner.newCpuTask(buildDummyFn(),1 << 20, priority = 1)) - // Guarantee the first task is executed first to avoid uncertain execution order. - Thread.sleep(1) - var results = executionPriority.map { p => - p -> executor.submit(AsyncRunner.newCpuTask(buildDummyFn(), 1 << 20, priority = p)) - }.sortBy(_._1).map { - case (_, future) => future.get().data - } + lck.lock() // Block tasks from starting execution. + executor.submit(AsyncRunner.newUnboundedTask(buildDummyFn(lck))) + Thread.sleep(1L) + var sortedFut = executionPriority.map { p => + p -> executor.submit(AsyncRunner.newCpuTask(buildDummyFn(lck), 1 << 20, priority = p)) + }.sortBy(_._1).map(_._2) + lck.unlock() // Allow tasks to start executing. + var results = sortedFut.map(_.get().data) (0 until results.length - 1).foreach { i => require(results(i) > results(i + 1), s"Unexpected order of wallTime: ${results.toList}") } @@ -73,49 +82,52 @@ class ResourceBoundedExecutorSuite extends AnyFunSuite with RmmSparkRetrySuiteBa // Test priority penalty by memory usage: // Tasks requiring less resource should be executed first even if they are submitted later. // Execution order: 1, 2, 3, 4, 5 - results = (1 to 5).map { i => - executor.submit(AsyncRunner.newCpuTask(buildDummyFn(), memoryBytes = i << 20)) - }.map { - future => future.get().data + lck.lock() // Block tasks from starting execution. + executor.submit(AsyncRunner.newUnboundedTask(buildDummyFn(lck))) + Thread.sleep(1L) + sortedFut = (1 to 5).map { i => + executor.submit(AsyncRunner.newCpuTask(buildDummyFn(lck), memoryBytes = i << 20)) } + lck.unlock() // Allow tasks to start executing. + results = sortedFut.map(_.get().data) (0 until results.length - 1).foreach { i => require(results(i) < results(i + 1), s"Unexpected order of wallTime: ${results.toList}") } - // Execution order: 1, 5, 4, 3, 2 - executor.submit(AsyncRunner.newCpuTask(buildDummyFn(), memoryBytes = 5 << 20)) - // Guarantee the first task is executed first to avoid uncertain execution order. - Thread.sleep(1) - results = (4 to 1 by -1).map { i => - executor.submit(AsyncRunner.newCpuTask(buildDummyFn(), memoryBytes = i << 20)) - }.map { - future => future.get().data + // Execution order: 4, 3, 2, 1 + lck.lock() // Block tasks from starting execution. + executor.submit(AsyncRunner.newUnboundedTask(buildDummyFn(lck))) + Thread.sleep(1L) + sortedFut = (4 to 1 by -1).map { i => + executor.submit(AsyncRunner.newCpuTask(buildDummyFn(lck), memoryBytes = i << 20)) } + lck.unlock() // Allow tasks to start executing. + results = sortedFut.map(_.get().data) (0 until results.length - 1).foreach { i => require(results(i) > results(i + 1), s"Unexpected order of wallTime: ${results.toList}") } // Comprehensive test for task priority and memory usage (including unbounded tasks): - // Execution order: 1, 4, 6, 2, 5, 3 + // Execution order: 2(prior=4), 3(prior=2), 5(prior=-46), 1(prior=MAX), 4(prior=-3) val futures = mutable.ArrayBuffer[JFuture[AsyncResult[Long]]]() + lck.lock() // Block tasks from starting execution. + executor.submit(AsyncRunner.newUnboundedTask(buildDummyFn(lck))) + Thread.sleep(1L) // (1) priority = 5 - 1KB = 4 - futures += executor.submit(AsyncRunner.newCpuTask(buildDummyFn(), - memoryBytes = 100 << 10, priority = 5L)) - // Guarantee the first task is executed first to avoid uncertain execution order. - Thread.sleep(1) + futures += executor.submit(AsyncRunner.newCpuTask(buildDummyFn(lck), + memoryBytes = 1 << 10, priority = 5L)) // (2) priority = 3 - 1KB = 2 - futures += executor.submit(AsyncRunner.newCpuTask(buildDummyFn(), + futures += executor.submit(AsyncRunner.newCpuTask(buildDummyFn(lck), memoryBytes = 1 << 10, priority = 3L)) // (3) priority = 4 - 50KB = -46 - futures += executor.submit(AsyncRunner.newCpuTask(buildDummyFn(), + futures += executor.submit(AsyncRunner.newCpuTask(buildDummyFn(lck), memoryBytes = 50 << 10, priority = 4L)) // (4) Unbounded task with the highest priority - futures += executor.submit(AsyncRunner.newUnboundedTask(buildDummyFn())) + futures += executor.submit(AsyncRunner.newUnboundedTask(buildDummyFn(lck))) // (5) priority = 7 - 10KB = -3 - futures += executor.submit(AsyncRunner.newCpuTask(buildDummyFn(), + futures += executor.submit(AsyncRunner.newCpuTask(buildDummyFn(lck), memoryBytes = 10 << 10, priority = 7L)) - // (6) Unbounded task with the highest priority - futures += executor.submit(AsyncRunner.newUnboundedTask(buildDummyFn())) - results = Array(1, 4, 6, 2, 5, 3).zip(futures).sortBy(_._1).map { + lck.unlock() // Allow tasks to start executing. + results = Seq(2, 3, 5, 1, 4).zip(futures).sortBy(_._1).map { case (_, fut) => fut.get().data } (0 until results.length - 1).foreach { i =>