diff --git a/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala b/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala index ffe09ff1..f39a0151 100644 --- a/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala +++ b/openeo-geotrellis/src/main/java/org/openeo/sparklisteners/BatchJobProgressListener.scala @@ -1,15 +1,13 @@ package org.openeo.sparklisteners; -import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; -import org.apache.spark.util.AccumulatorV2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.Traversable; -import scala.collection.mutable.Map; - +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerStageCompleted, SparkListenerStageSubmitted} +import org.apache.spark.util.AccumulatorV2 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import scala.collection.mutable import java.time.Duration; object BatchJobProgressListener { @@ -22,22 +20,29 @@ class BatchJobProgressListener extends SparkListener { import BatchJobProgressListener.logger + private val stagesInformation = new mutable.LinkedHashMap[String,mutable.Map[String,Any]]() + private val executorInformation = new mutable.LinkedHashMap[String,Long] + override def onStageSubmitted( stageSubmitted:SparkListenerStageSubmitted):Unit = { logger.info(s"Starting stage: ${stageSubmitted.stageInfo.stageId} - ${stageSubmitted.stageInfo.name}. \nStages may combine multiple processes." ) } override def onStageCompleted( stageCompleted: SparkListenerStageCompleted):Unit = { val taskMetrics = stageCompleted.stageInfo.taskMetrics - + val stageInformation = new mutable.LinkedHashMap[String,Any]() + var logs = List[(String, String)]() if(stageCompleted.stageInfo.failureReason.isDefined){ + stageInformation += ("duration" -> Duration.ofMillis(taskMetrics.executorRunTime)) val message = f"""A part of the process graph failed, and will be retried, the reason was: "${stageCompleted.stageInfo.failureReason.get}" |Your job may still complete if the failure was caused by a transient error, but will take more time. A common cause of transient errors is too little executor memory (overhead). Too low executor-memory can be seen by a high 'garbage collection' time, which was: ${Duration.ofMillis(taskMetrics.jvmGCTime).toSeconds / 1000.0} seconds. |""".stripMargin - logger.warn(message) +// logger.warn(message) + logs = ("warn", message) :: logs }else{ val duration = Duration.ofMillis(taskMetrics.executorRunTime) + stageInformation += ("duration" -> Duration.ofMillis(taskMetrics.executorRunTime)) val timeString = if(duration.toSeconds>60) { duration.toMinutes + " minutes" } else { @@ -45,21 +50,77 @@ class BatchJobProgressListener extends SparkListener { } val megabytes = taskMetrics.shuffleWriteMetrics.bytesWritten.toFloat/(1024.0*1024.0) val name = stageCompleted.stageInfo.name - logger.info(f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - ${name}."); - + val message = f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - $name." +// logger.info(f"Stage ${stageCompleted.stageInfo.stageId} produced $megabytes%.2f MB in $timeString - ${name}."); + logs = ("info",message) :: logs val accumulators = stageCompleted.stageInfo.accumulables; val chunkCounts = accumulators.filter(_._2.name.get.startsWith("ChunkCount")); if (chunkCounts.nonEmpty) { val totalChunks = chunkCounts.head._2.value val megapixel = totalChunks.get.asInstanceOf[Long] * 256 * 256 / (1024 * 1024) if(taskMetrics.executorRunTime > 0) { - logger.info(f"load_collection: data was loaded with an average speed of: ${megapixel.toFloat/ duration.toSeconds.toFloat}%.3f Megapixel per second.") + val messageSpeed = f"load_collection: data was loaded with an average speed of: ${megapixel.toFloat/ duration.toSeconds.toFloat}%.3f Megapixel per second." +// logger.info(f"load_collection: data was loaded with an average speed of: ${megapixel.toFloat/ duration.toSeconds.toFloat}%.3f Megapixel per second.") + logs = ("info",messageSpeed) :: logs }; } } - + stageInformation += ("logs" -> logs) + stagesInformation += (stageCompleted.stageInfo.stageId.toString -> stageInformation) } + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + val startTime = executorAdded.time + val executorId = executorAdded.executorId + executorInformation += (executorId -> startTime) + + } + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + val executorId = executorRemoved.executorId + val executorTime = executorRemoved.time-executorInformation(executorId) + executorInformation += (executorId -> executorTime) + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd):Unit={ + val (totalStages, totalDuration) = stagesInformation.foldLeft((0,Duration.ZERO)){ (x,y) => + val duration = y._2.getOrElse("duration",0) match { + case n:Duration => n + } + (x._1 + 1, x._2.plus(duration)) + } + val executorTime = executorInformation.foldLeft(0L)((x,y) => { + x + (y._2) + }) + val ordered = stagesInformation.toSeq.sortWith((a,b) =>{ + val DurationA = a._2.getOrElse("duration",0) match { + case n:Duration => n} + val DurationB = b._2.getOrElse("duration",0) match {case n:Duration => n} + DurationA.toMillis < DurationB.toMillis + }) + val timeString = if(totalDuration.toSeconds>60) { + totalDuration.toMinutes + " minutes" + } else { + totalDuration.toMillis.toFloat / 1000.0 + " seconds" + } + logger.info(f" \nSummary of the executed stages: \nTotal number of stages: $totalStages \nTotal stage runtime: $timeString\nTotal executor allocation time: $executorTime \nLogs of the longest stages:") + var tempDuration = 0.0 + var i = 0 + var maxDurationToLog = ordered.head._2.getOrElse("duration",0) match {case n:Duration => n} + while (tempDuration < totalDuration.toMillis * 0.8){ + val stageInfo = ordered(i)._2 + val logs = stageInfo.getOrElse("logs","") match {case s:List[(String, String)] => s} + for (log <- logs){ + log match { + case ("warn",s) => logger.warn(s) + case ("info",s) => logger.info(s) + } + } + val duration = stageInfo.getOrElse("duration",0.0) match {case v:Duration => v} + tempDuration += duration.toMillis + maxDurationToLog = duration + i += 1 + } + } } diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala index f743b92a..1d487d4d 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/FileLayerProviderTest.scala @@ -33,7 +33,7 @@ import org.openeo.geotrelliscommon.{ConfigurableSpaceTimePartitioner, DataCubePa import org.openeo.opensearch.OpenSearchResponses.{CreoFeatureCollection, FeatureCollection, Link} import org.openeo.opensearch.backends.CreodiasClient import org.openeo.opensearch.{OpenSearchClient, OpenSearchResponses} -import org.openeo.sparklisteners.GetInfoSparkListener +import org.openeo.sparklisteners.{BatchJobProgressListener, GetInfoSparkListener} import ucar.nc2.NetcdfFile import ucar.nc2.util.CompareNetcdf2 @@ -1218,6 +1218,9 @@ class FileLayerProviderTest extends RasterMatchers{ val listener = new GetInfoSparkListener() sc.addSparkListener(listener) + val ProgressListener = new BatchJobProgressListener() + sc.addSparkListener(ProgressListener) + val (datacubeParams,result) = keysForLargeArea() val allTiles = result._1.collect() @@ -1251,6 +1254,9 @@ class FileLayerProviderTest extends RasterMatchers{ val listener = new GetInfoSparkListener() sc.addSparkListener(listener) + val ProgressListener = new BatchJobProgressListener() + sc.addSparkListener(ProgressListener) + val (datacubeParams,result) = keysForLargeArea(true) val allTiles = result._1.collect() diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala index 3d0c8108..e72cccfa 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/layers/Sentinel2FileLayerProviderTest.scala @@ -31,10 +31,10 @@ import org.openeo.geotrellis.TestImplicits._ import org.openeo.geotrellis.geotiff.{GTiffOptions, saveRDD} import org.openeo.geotrellis.netcdf.{NetCDFOptions, NetCDFRDDWriter} import org.openeo.geotrellis.{LayerFixtures, MergeCubesSpec, OpenEOProcessScriptBuilder, OpenEOProcesses, ProjectedPolygons, TestOpenEOProcessScriptBuilder} -import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ConfigurableSpaceTimePartitioner, SparseSpaceTimePartitioner, DataCubeParameters, ResampledTile} +import org.openeo.geotrelliscommon.{BatchJobMetadataTracker, ConfigurableSpaceTimePartitioner, DataCubeParameters, ResampledTile, SparseSpaceTimePartitioner} import org.openeo.opensearch.OpenSearchResponses.Link import org.openeo.opensearch.{OpenSearchClient, OpenSearchResponses} -import org.openeo.sparklisteners.GetInfoSparkListener +import org.openeo.sparklisteners.{BatchJobProgressListener, GetInfoSparkListener} import java.net.URI import java.time.LocalTime.MIDNIGHT @@ -545,7 +545,8 @@ class Sentinel2FileLayerProviderTest extends RasterMatchers { val crs = CRS.fromEpsgCode(32631) val boundingBox = ProjectedExtent(Extent(640860, 5676170, 666460, 5701770), crs) val dataCubeParameters = new DataCubeParameters - + val ProgressListener = new BatchJobProgressListener() + sc.addSparkListener(ProgressListener) val listener = new GetInfoSparkListener() SparkContext.getOrCreate().addSparkListener(listener) // dataCubeParameters.tileSize = 2048 (This requires increased spark.kryoserializer.buffer.max) @@ -713,6 +714,8 @@ class Sentinel2FileLayerProviderTest extends RasterMatchers { @Test def testL1CMultibandTileMask(): Unit = { + val listener = new BatchJobProgressListener() + sc.addSparkListener(listener) val dilationDistance = 5 val creoL1CLayerProvider = FileLayerProvider(