Skip to content

Commit

Permalink
Fix the flaky test in ThrottlingExecutorSuite (#12094)
Browse files Browse the repository at this point in the history
The `test task metrics` test is flaky. This test verifies the max
throttle time metric in the `ThrottlingExecutor` while there are two
tasks in the executor, one running and another waiting. The way it
measures the expected max of the max throttle time is not quite right.
Here is a brief summary of what this test does:

- The main thread submits a task. This task runs until it is told to
stop.
- Another thread submits a task. This task waits until the running task
finishes.
- The main thread sleeps for a certain time.
- The main thread stops the running task. This will let the waiting task
run.
- The main thread verifies the max throttle time by comparing it to the
_sleep_ time.

The last verification can flake because the sleep time may not reflect
the actual wait time in the `ThrottlingExecutor`. Submitting a task can
take some time which will reduce the actual wait time in the executor.
This PR fixes this test by using the actual wait time to validate the
metric.

---------

Signed-off-by: Jihoon Son <[email protected]>
  • Loading branch information
jihoonson authored Feb 11, 2025
1 parent 15baa74 commit fb13fb8
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids.io.async

import java.util.concurrent.{Callable, CountDownLatch, ExecutionException, Executors, Future, RejectedExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicLong

import com.nvidia.spark.rapids.{GpuMetric, RapidsConf}
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -174,10 +175,13 @@ class ThrottlingExecutorSuite extends AnyFunSuite with BeforeAndAfterEach {
val runnableSubmitted = new CountDownLatch(1)
// Latch indicating that waitingTask has been submitted to ThrottlingExecutor.
val waitingTaskSubmitted = new CountDownLatch(1)
val actualWaitTimeNs = new AtomicLong(0)
exec.submit(new Runnable {
override def run(): Unit = {
runnableSubmitted.countDown()
val before = System.nanoTime()
executor.submit(waitingTask, 100)
actualWaitTimeNs.set(System.nanoTime() - before)
waitingTaskSubmitted.countDown()
}
})
Expand All @@ -195,7 +199,7 @@ class ThrottlingExecutorSuite extends AnyFunSuite with BeforeAndAfterEach {

// Skip the check on the min throttle time as the first task never waits.

assert(TimeUnit.MILLISECONDS.toNanos(sleepTimeMs) <=
assert(actualWaitTimeNs.get() >=
taskMetrics(GpuWriteJobStatsTracker.ASYNC_WRITE_MAX_THROTTLE_TIME_KEY).value
)

Expand Down

0 comments on commit fb13fb8

Please sign in to comment.