diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index d73a50b2e7c6..58edd4145602 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -101,6 +101,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts') testImplementation libs.sqlite.jdbc + testImplementation libs.awaitility } test { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java index 3b350bc91e72..cdbc198b6369 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -18,11 +18,14 @@ */ package org.apache.iceberg.spark.source; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.execution.ui.SQLAppStatusStore; import org.apache.spark.sql.execution.ui.SQLExecutionUIData; import org.apache.spark.sql.execution.ui.SQLPlanMetric; -import org.junit.Assert; +import org.awaitility.Awaitility; import scala.Option; public class SparkSQLExecutionHelper { @@ -42,28 +45,26 @@ public static String lastExecutedMetricValue(SparkSession spark, String metricNa SQLExecutionUIData lastExecution = statusStore.executionsList().last(); Option sqlPlanMetric = lastExecution.metrics().find(metric -> metric.name().equals(metricName)); - Assert.assertTrue( - String.format("Metric '%s' not found in last execution", metricName), - sqlPlanMetric.isDefined()); + assertThat(sqlPlanMetric.isDefined()) + .as(String.format("Metric '%s' not found in last execution", metricName)) + .isTrue(); long metricId = sqlPlanMetric.get().accumulatorId(); // Refresh metricValues, they will remain null until the execution is complete and metrics are // aggregated - int attempts = 3; - while (lastExecution.metricValues() == null && attempts > 0) { - try { - Thread.sleep(100); - attempts--; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + Awaitility.await() + .atMost(Duration.ofMillis(500)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted( + () -> assertThat(statusStore.execution(lastExecution.executionId()).get()).isNotNull()); - lastExecution = statusStore.execution(lastExecution.executionId()).get(); - } + SQLExecutionUIData exec = statusStore.execution(lastExecution.executionId()).get(); - Assert.assertNotNull("Metric values were not finalized", lastExecution.metricValues()); - String metricValue = lastExecution.metricValues().get(metricId).getOrElse(null); - Assert.assertNotNull(String.format("Metric '%s' was not finalized", metricName), metricValue); + assertThat(exec.metricValues()).as("Metric values were not finalized").isNotNull(); + String metricValue = exec.metricValues().get(metricId).getOrElse(null); + assertThat(metricValue) + .as(String.format("Metric '%s' was not finalized", metricName)) + .isNotNull(); return metricValue; } }