From 60b82a32c877bd6aebc9cad3bf65ba963d926fba Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Tue, 21 May 2019 17:59:07 +0530 Subject: [PATCH 1/9] Added driver metrics: maxHeapCommited, maxHeapUsed, heapMax, totalCPUTime, totalGCTime, totalGCCount --- .../qubole/sparklens/QuboleJobListener.scala | 38 +++++++++++++++++++ .../sparklens/common/AggregateMetrics.scala | 14 ++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 52c5bf0..af305fd 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -17,7 +17,9 @@ package com.qubole.sparklens +import java.lang.management.ManagementFactory import java.net.URI +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import com.qubole.sparklens.analyzer._ import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} @@ -49,6 +51,17 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { protected val failedStages = new ListBuffer[String] protected val appMetrics = new AggregateMetrics() + private var threadExecutor: ScheduledExecutorService = _ + + private val updateDriverMemMetrics = new Runnable { + def run() = { + val memUsage = java.lang.management.ManagementFactory.getMemoryMXBean.getHeapMemoryUsage + appMetrics.updateMetric(AggregateMetrics.driverHeapMax, memUsage.getMax) + appMetrics.updateMetric(AggregateMetrics.driverHeapMax, memUsage.getCommitted) + appMetrics.updateMetric(AggregateMetrics.driverHeapMax, memUsage.getUsed) + } + } + private def hostCount():Int = hostMap.size private def executorCount(): Int = executorMap.size @@ -91,6 +104,25 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { Some(jobTime/totalTime) } + private def collectDriverMetrics(): Unit = { + appMetrics.updateMetric(AggregateMetrics.driverCPUTime, + ManagementFactory.getThreadMXBean.getCurrentThreadCpuTime) + + var gcCount: Long = 0 + var gcTime: Long = 0 + val iter = ManagementFactory.getGarbageCollectorMXBeans.iterator() + while (iter.hasNext) { + val current = iter.next() + gcCount += current.getCollectionCount + gcTime += current.getCollectionTime + } + appMetrics.updateMetric(AggregateMetrics.driverGCTime, gcCount) + appMetrics.updateMetric(AggregateMetrics.driverGCCount, gcTime) + + // The thread should be killed automatically. Do we need this? + threadExecutor.shutdown() + } + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val taskMetrics = taskEnd.taskMetrics val taskInfo = taskEnd.taskInfo @@ -144,6 +176,10 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { //println(s"Application ${applicationStart.appId} started at ${applicationStart.time}") appInfo.applicationID = applicationStart.appId.getOrElse("NA") appInfo.startTime = applicationStart.time + + // Start a thread to collect the driver JVM memory stats every 10 seconds + threadExecutor = Executors.newSingleThreadScheduledExecutor + threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 10, TimeUnit.SECONDS) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { @@ -169,6 +205,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { AppAnalyzer.startAnalyzers(appContext) } } + + collectDriverMetrics() } override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { val executorTimeSpan = executorMap.get(executorAdded.executorId) diff --git a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala index 99a2a79..3caaa04 100644 --- a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala @@ -100,6 +100,12 @@ class AggregateMetrics() { formatterMap(AggregateMetrics.diskBytesSpilled)= formatBytes formatterMap(AggregateMetrics.peakExecutionMemory)= formatBytes formatterMap(AggregateMetrics.taskDuration)= formatMillisTime + formatterMap(AggregateMetrics.driverHeapMax)= formatMillisTime + formatterMap(AggregateMetrics.driverMaxHeapCommitted)= formatMillisTime + formatterMap(AggregateMetrics.driverMaxHeapUsed)= formatMillisTime + formatterMap(AggregateMetrics.driverCPUTime)= formatMillisTime + formatterMap(AggregateMetrics.driverGCTime)= formatMillisTime + formatterMap(AggregateMetrics.driverGCCount)= formatMillisTime @transient val numberFormatter = java.text.NumberFormat.getIntegerInstance @@ -238,7 +244,13 @@ object AggregateMetrics extends Enumeration { memoryBytesSpilled, diskBytesSpilled, peakExecutionMemory, - taskDuration + taskDuration, + driverHeapMax, + driverMaxHeapCommitted, + driverMaxHeapUsed, + driverCPUTime, + driverGCCount, + driverGCTime = Value def getAggregateMetrics(json: JValue): AggregateMetrics = { From 3f9f68a580ed703c5c88e3369181b4f5eb13cab6 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Wed, 22 May 2019 23:54:12 +0530 Subject: [PATCH 2/9] Moved Driver Metrics to separate class --- .../qubole/sparklens/QuboleJobListener.scala | 26 ++-- .../sparklens/QuboleNotebookListener.scala | 1 + .../analyzer/SimpleAppAnalyzer.scala | 1 + .../sparklens/common/AggregateMetrics.scala | 109 +---------------- .../sparklens/common/AggregateValue.scala | 56 +++++++++ .../qubole/sparklens/common/AppContext.scala | 4 + .../sparklens/common/DriverMetrics.scala | 111 ++++++++++++++++++ .../sparklens/common/MetricsHelper.scala | 63 ++++++++++ .../analyzer/JobOverlapAnalyzerSuite.scala | 3 +- .../PQParallelStageSchedulerSuite.scala | 3 +- 10 files changed, 259 insertions(+), 118 deletions(-) create mode 100644 src/main/scala/com/qubole/sparklens/common/AggregateValue.scala create mode 100644 src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala create mode 100644 src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index af305fd..9899294 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -22,7 +22,7 @@ import java.net.URI import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import com.qubole.sparklens.analyzer._ -import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics, MetricsHelper} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -50,15 +50,16 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { protected val stageIDToJobID = new mutable.HashMap[Int, Long] protected val failedStages = new ListBuffer[String] protected val appMetrics = new AggregateMetrics() + protected val driverMetrics = new DriverMetrics() private var threadExecutor: ScheduledExecutorService = _ private val updateDriverMemMetrics = new Runnable { def run() = { val memUsage = java.lang.management.ManagementFactory.getMemoryMXBean.getHeapMemoryUsage - appMetrics.updateMetric(AggregateMetrics.driverHeapMax, memUsage.getMax) - appMetrics.updateMetric(AggregateMetrics.driverHeapMax, memUsage.getCommitted) - appMetrics.updateMetric(AggregateMetrics.driverHeapMax, memUsage.getUsed) + driverMetrics.updateMetric(DriverMetrics.driverHeapMax, memUsage.getMax) + driverMetrics.updateMetric(DriverMetrics.driverMaxHeapCommitted, memUsage.getCommitted) + driverMetrics.updateMetric(DriverMetrics.driverMaxHeapUsed, memUsage.getUsed) } } @@ -105,7 +106,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { } private def collectDriverMetrics(): Unit = { - appMetrics.updateMetric(AggregateMetrics.driverCPUTime, + println("Collecting the driver metrics") + driverMetrics.updateMetric(DriverMetrics.driverCPUTime, ManagementFactory.getThreadMXBean.getCurrentThreadCpuTime) var gcCount: Long = 0 @@ -116,10 +118,10 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { gcCount += current.getCollectionCount gcTime += current.getCollectionTime } - appMetrics.updateMetric(AggregateMetrics.driverGCTime, gcCount) - appMetrics.updateMetric(AggregateMetrics.driverGCCount, gcTime) - - // The thread should be killed automatically. Do we need this? + println(s"gcCount: ${gcCount}") + println(s"gcTime: ${gcTime}") + driverMetrics.updateMetric(DriverMetrics.driverGCTime, gcCount) + driverMetrics.updateMetric(DriverMetrics.driverGCCount, gcTime) threadExecutor.shutdown() } @@ -179,15 +181,17 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { // Start a thread to collect the driver JVM memory stats every 10 seconds threadExecutor = Executors.newSingleThreadScheduledExecutor - threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 10, TimeUnit.SECONDS) + threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 10, TimeUnit.MILLISECONDS) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { //println(s"Application ${appInfo.applicationID} ended at ${applicationEnd.time}") appInfo.endTime = applicationEnd.time + collectDriverMetrics() val appContext = new AppContext(appInfo, appMetrics, + driverMetrics, hostMap, executorMap, jobMap, @@ -205,8 +209,6 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { AppAnalyzer.startAnalyzers(appContext) } } - - collectDriverMetrics() } override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { val executorTimeSpan = executorMap.get(executorAdded.executorId) diff --git a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala index b7a2fc4..d801f9f 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala @@ -120,6 +120,7 @@ class QuboleNotebookListener(sparkConf: SparkConf) extends QuboleJobListener(spa val appContext = new AppContext(appInfo, appMetrics, + driverMetrics, hostMap, executorMap, jobMap, diff --git a/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala index 56f7584..7d41c76 100644 --- a/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala +++ b/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala @@ -34,6 +34,7 @@ class SimpleAppAnalyzer extends AppAnalyzer { "task-level granularity and aggregated across the app (all tasks, stages, and jobs).\n") ac.appMetrics.print("Application Metrics", out) out.println("\n") + ac.driverMetrics.print("Driver Metrics", out) out.toString() } } diff --git a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala index 3caaa04..7f72807 100644 --- a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala @@ -21,62 +21,14 @@ import java.util.Locale import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.TaskInfo + +import com.qubole.sparklens.common.MetricsHelper._ + import org.json4s.DefaultFormats import org.json4s.JsonAST.JValue import scala.collection.mutable -/* -Keeps track of min max sum mean and variance for any metric at any level -Reference to incremental updates of variance: -https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_Online_algorithm - */ - -class AggregateValue { - var value: Long = 0L - var min: Long = Long.MaxValue - var max: Long = Long.MinValue - var mean: Double = 0.0 - var variance: Double = 0.0 - var m2: Double = 0.0 - - override def toString(): String = { - s"""{ - | "value": ${value}, - | "min": ${min}, - | "max": ${max}, - | "mean": ${mean}, - | "m2": ${m2} - | "variance": ${variance} - }""".stripMargin - } - - def getMap(): Map[String, Any] = { - Map("value" -> value, - "min" -> min, - "max" -> max, - "mean" -> mean, - "m2" -> m2, - "variance" -> variance) - } -} - -object AggregateValue { - def getValue(json: JValue): AggregateValue = { - implicit val formats = DefaultFormats - - val value = new AggregateValue - value.value = (json \ "value").extract[Long] - value.min = (json \ "min").extract[Long] - value.max = (json \ "max").extract[Long] - value.mean = (json \ "mean").extract[Double] - value.variance = (json \ "variance").extract[Double] - //making it optional for backward compatibility with sparklens.json files - value.m2 = (json \ "m2").extractOrElse[Double](0.0) - value - } -} - class AggregateMetrics() { var count = 0L val map = new mutable.HashMap[AggregateMetrics.Metric, AggregateValue]() @@ -100,54 +52,9 @@ class AggregateMetrics() { formatterMap(AggregateMetrics.diskBytesSpilled)= formatBytes formatterMap(AggregateMetrics.peakExecutionMemory)= formatBytes formatterMap(AggregateMetrics.taskDuration)= formatMillisTime - formatterMap(AggregateMetrics.driverHeapMax)= formatMillisTime - formatterMap(AggregateMetrics.driverMaxHeapCommitted)= formatMillisTime - formatterMap(AggregateMetrics.driverMaxHeapUsed)= formatMillisTime - formatterMap(AggregateMetrics.driverCPUTime)= formatMillisTime - formatterMap(AggregateMetrics.driverGCTime)= formatMillisTime - formatterMap(AggregateMetrics.driverGCCount)= formatMillisTime @transient val numberFormatter = java.text.NumberFormat.getIntegerInstance - def bytesToString(size: Long): String = { - val TB = 1L << 40 - val GB = 1L << 30 - val MB = 1L << 20 - val KB = 1L << 10 - - val (value, unit) = { - if (Math.abs(size) >= 1*TB) { - (size.asInstanceOf[Double] / TB, "TB") - } else if (Math.abs(size) >= 1*GB) { - (size.asInstanceOf[Double] / GB, "GB") - } else if (Math.abs(size) >= 1*MB) { - (size.asInstanceOf[Double] / MB, "MB") - } else { - (size.asInstanceOf[Double] / KB, "KB") - } - } - "%.1f %s".formatLocal(Locale.US, value, unit) - } - - def toMillis(size:Long): String = { - val MS = 1000000L - val SEC = 1000 * MS - val MT = 60 * SEC - val HR = 60 * MT - - val (value, unit) = { - if (size >= 1*HR) { - (size.asInstanceOf[Double] / HR, "hh") - } else if (size >= 1*MT) { - (size.asInstanceOf[Double] / MT, "mm") - } else if (size >= 1*SEC) { - (size.asInstanceOf[Double] / SEC, "ss") - } else { - (size.asInstanceOf[Double] / MS, "ms") - } - } - "%.1f %s".formatLocal(Locale.US, value, unit) - } def formatNanoTime(x: (AggregateMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { sb.append(f" ${x._1}%-30s${toMillis(x._2.value)}%20s${toMillis(x._2.min)}%15s${toMillis(x._2.max)}%15s${toMillis(x._2.mean.toLong)}%20s") @@ -209,7 +116,7 @@ class AggregateMetrics() { } def print(caption: String, sb: mutable.StringBuilder):Unit = { - sb.append(s" AggregateMetrics (${caption}) total measurements ${count} ") + sb.append(s" AggregateMetrics (${caption}) total measurements ${count} ") .append("\n") sb.append(f" NAME SUM MIN MAX MEAN ") .append("\n") @@ -244,13 +151,7 @@ object AggregateMetrics extends Enumeration { memoryBytesSpilled, diskBytesSpilled, peakExecutionMemory, - taskDuration, - driverHeapMax, - driverMaxHeapCommitted, - driverMaxHeapUsed, - driverCPUTime, - driverGCCount, - driverGCTime + taskDuration = Value def getAggregateMetrics(json: JValue): AggregateMetrics = { diff --git a/src/main/scala/com/qubole/sparklens/common/AggregateValue.scala b/src/main/scala/com/qubole/sparklens/common/AggregateValue.scala new file mode 100644 index 0000000..5d85290 --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/common/AggregateValue.scala @@ -0,0 +1,56 @@ +package com.qubole.sparklens.common + +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JValue + +/* +Keeps track of min max sum mean and variance for any metric at any level +Reference to incremental updates of variance: +https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_Online_algorithm + */ + +class AggregateValue { + var value: Long = 0L + var min: Long = Long.MaxValue + var max: Long = Long.MinValue + var mean: Double = 0.0 + var variance: Double = 0.0 + var m2: Double = 0.0 + + override def toString(): String = { + s"""{ + | "value": ${value}, + | "min": ${min}, + | "max": ${max}, + | "mean": ${mean}, + | "m2": ${m2} + | "variance": ${variance} + }""".stripMargin + } + + def getMap(): Map[String, Any] = { + Map("value" -> value, + "min" -> min, + "max" -> max, + "mean" -> mean, + "m2" -> m2, + "variance" -> variance) + } +} + +object AggregateValue { + def getValue(json: JValue): AggregateValue = { + implicit val formats = DefaultFormats + + val value = new AggregateValue + value.value = (json \ "value").extract[Long] + value.min = (json \ "min").extract[Long] + value.max = (json \ "max").extract[Long] + value.mean = (json \ "mean").extract[Double] + value.variance = (json \ "variance").extract[Double] + //making it optional for backward compatibility with sparklens.json files + value.m2 = (json \ "m2").extractOrElse[Double](0.0) + value + } +} + diff --git a/src/main/scala/com/qubole/sparklens/common/AppContext.scala b/src/main/scala/com/qubole/sparklens/common/AppContext.scala index d5263b8..f92de77 100644 --- a/src/main/scala/com/qubole/sparklens/common/AppContext.scala +++ b/src/main/scala/com/qubole/sparklens/common/AppContext.scala @@ -26,6 +26,7 @@ import scala.collection.mutable case class AppContext(appInfo: ApplicationInfo, appMetrics: AggregateMetrics, + driverMetrics: DriverMetrics, hostMap: mutable.HashMap[String, HostTimeSpan], executorMap: mutable.HashMap[String, ExecutorTimeSpan], jobMap: mutable.HashMap[Long, JobTimeSpan], @@ -36,6 +37,7 @@ case class AppContext(appInfo: ApplicationInfo, def filterByStartAndEndTime(startTime: Long, endTime: Long): AppContext = { new AppContext(appInfo, appMetrics, + driverMetrics, hostMap, executorMap .filter(x => x._2.endTime == 0 || //still running @@ -56,6 +58,7 @@ case class AppContext(appInfo: ApplicationInfo, val map = Map( "appInfo" -> appInfo.getMap(), "appMetrics" -> appMetrics.getMap(), + "driverMetrics" -> driverMetrics.getMap(), "hostMap" -> AppContext.getMap(hostMap), "executorMap" -> AppContext.getMap(executorMap), "jobMap" -> AppContext.getMap(jobMap), @@ -129,6 +132,7 @@ object AppContext { new AppContext( ApplicationInfo.getObject((json \ "appInfo").extract[JValue]), AggregateMetrics.getAggregateMetrics((json \ "appMetrics").extract[JValue]), + DriverMetrics.getDriverMetrics((json \ "driverMetrics").extract[JValue]), HostTimeSpan.getTimeSpan((json \ "hostMap").extract[Map[String, JValue]]), ExecutorTimeSpan.getTimeSpan((json \ "executorMap").extract[Map[String, JValue]]), JobTimeSpan.getTimeSpan((json \ "jobMap").extract[Map[String, JValue]]), diff --git a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala b/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala new file mode 100644 index 0000000..69c1dec --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala @@ -0,0 +1,111 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.qubole.sparklens.common + +import com.qubole.sparklens.common.MetricsHelper._ + +import scala.collection.mutable + +class DriverMetrics { + + var count = 0L + val map = new mutable.HashMap[DriverMetrics.Metric, AggregateValue]() + @transient val formatterMap = new mutable.HashMap[DriverMetrics.Metric, ((DriverMetrics + .Metric, AggregateValue), mutable.StringBuilder) => Unit]() + + formatterMap(DriverMetrics.driverHeapMax) = formatStaticBytes + formatterMap(DriverMetrics.driverMaxHeapCommitted) = formatStaticBytes + formatterMap(DriverMetrics.driverMaxHeapUsed) = formatStaticBytes + formatterMap(DriverMetrics.driverCPUTime) = formatStaticMillisTime + formatterMap(DriverMetrics.driverGCTime) = formatStaticMillisTime + formatterMap(DriverMetrics.driverGCCount) = formatStaticMillisTime + + def updateMetric(metric: DriverMetrics.Metric, newValue: Long): Unit = { + val aggregateValue = map.getOrElse(metric, new AggregateValue) + if (count == 0) { + map(metric) = aggregateValue + } + aggregateValue.value = math.max(aggregateValue.max, newValue) + count += 1 + } + + def getMap(): Map[String, Any] = { + Map("count" -> count, "map" -> map.keys.map(key => (key.toString, map.get(key).get.getMap())).toMap) + } + + def formatStaticMillisTime(x: (DriverMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { + def addUnits(x: Long): String = { + toMillis(x * 1000000) + } + sb.append(f" ${x._1}%-30s${addUnits(x._2.value)}%20s") + .append("\n") + } + + def formatStaticBytes(x: (DriverMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { + sb.append(f" ${x._1}%-30s${bytesToString(x._2.value)}%20s") + .append("\n") + } + + def print(sb: mutable.StringBuilder): Unit = { + map.toBuffer.sortWith((a, b) => a._1.toString < b._1.toString).foreach(x => { + formatterMap(x._1)(x, sb) + }) + } + + def print(caption: String, sb: mutable.StringBuilder):Unit = { + sb.append(s" DriverMetrics (${caption}) total measurements ${count} ") + .append("\n") + sb.append(f" NAME Value ") + .append("\n") + print(sb) + } +} + +object DriverMetrics extends Enumeration { + import org.json4s._ + + type Metric = Value + + val driverHeapMax, + driverMaxHeapCommitted, + driverMaxHeapUsed, + driverCPUTime, + driverGCCount, + driverGCTime + = Value + + def getDriverMetrics(json: JValue): DriverMetrics = { + try { + implicit val formats = DefaultFormats + + val metrics = new DriverMetrics() + metrics.count = (json \ "count").extract[Int] + val map = (json \ "map").extract[Map[String, JValue]] + + map.keys.foreach(key => metrics.map.put(withName(key), + AggregateValue.getValue(map.get(key).get))) + + metrics + } catch { + case e: MappingException => + new DriverMetrics() + case e: Exception => + throw(e) + } + } +} diff --git a/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala b/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala new file mode 100644 index 0000000..0c7c87f --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala @@ -0,0 +1,63 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.qubole.sparklens.common + +import java.util.Locale + +object MetricsHelper { + + def bytesToString(size: Long): String = { + val TB = 1L << 40 + val GB = 1L << 30 + val MB = 1L << 20 + val KB = 1L << 10 + + val (value, unit) = { + if (Math.abs(size) >= 1*TB) { + (size.asInstanceOf[Double] / TB, "TB") + } else if (Math.abs(size) >= 1*GB) { + (size.asInstanceOf[Double] / GB, "GB") + } else if (Math.abs(size) >= 1*MB) { + (size.asInstanceOf[Double] / MB, "MB") + } else { + (size.asInstanceOf[Double] / KB, "KB") + } + } + "%.1f %s".formatLocal(Locale.US, value, unit) + } + + def toMillis(size:Long): String = { + val MS = 1000000L + val SEC = 1000 * MS + val MT = 60 * SEC + val HR = 60 * MT + + val (value, unit) = { + if (size >= 1*HR) { + (size.asInstanceOf[Double] / HR, "hh") + } else if (size >= 1*MT) { + (size.asInstanceOf[Double] / MT, "mm") + } else if (size >= 1*SEC) { + (size.asInstanceOf[Double] / SEC, "ss") + } else { + (size.asInstanceOf[Double] / MS, "ms") + } + } + "%.1f %s".formatLocal(Locale.US, value, unit) + } +} \ No newline at end of file diff --git a/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala b/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala index f129413..6442931 100644 --- a/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala +++ b/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala @@ -18,7 +18,7 @@ package com.qubole.sparklens.analyzer -import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} import com.qubole.sparklens.helper.JobOverlapHelper @@ -61,6 +61,7 @@ class JobOverlapAnalyzerSuite extends FunSuite { new AppContext(new ApplicationInfo(), new AggregateMetrics(), + new DriverMetrics(), mutable.HashMap[String, HostTimeSpan](), mutable.HashMap[String, ExecutorTimeSpan](), jobMap, diff --git a/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala b/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala index 86fbe77..e6f3a77 100644 --- a/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala +++ b/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala @@ -17,7 +17,7 @@ package com.qubole.sparklens.scheduler -import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} import org.scalatest.FunSuite @@ -261,6 +261,7 @@ class PQParallelStageSchedulerSuite extends FunSuite { val ac = new AppContext(new ApplicationInfo(), new AggregateMetrics(), + new DriverMetrics(), mutable.HashMap[String, HostTimeSpan](), mutable.HashMap[String, ExecutorTimeSpan](), jobMap, From b882c3acc42ce06a08775be77d5d04b03a0c9785 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Wed, 22 May 2019 23:58:35 +0530 Subject: [PATCH 3/9] nit --- src/main/scala/com/qubole/sparklens/QuboleJobListener.scala | 6 ++---- .../scala/com/qubole/sparklens/common/MetricsHelper.scala | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 9899294..8803551 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -22,7 +22,7 @@ import java.net.URI import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import com.qubole.sparklens.analyzer._ -import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics, MetricsHelper} +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -118,11 +118,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { gcCount += current.getCollectionCount gcTime += current.getCollectionTime } - println(s"gcCount: ${gcCount}") - println(s"gcTime: ${gcTime}") driverMetrics.updateMetric(DriverMetrics.driverGCTime, gcCount) driverMetrics.updateMetric(DriverMetrics.driverGCCount, gcTime) - threadExecutor.shutdown() } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { @@ -188,6 +185,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { //println(s"Application ${appInfo.applicationID} ended at ${applicationEnd.time}") appInfo.endTime = applicationEnd.time collectDriverMetrics() + threadExecutor.shutdown() val appContext = new AppContext(appInfo, appMetrics, diff --git a/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala b/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala index 0c7c87f..0c1f730 100644 --- a/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala +++ b/src/main/scala/com/qubole/sparklens/common/MetricsHelper.scala @@ -60,4 +60,4 @@ object MetricsHelper { } "%.1f %s".formatLocal(Locale.US, value, unit) } -} \ No newline at end of file +} From ac626f9b77fe40f323eda8fcdd0c856291bb4424 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 23 May 2019 08:25:50 +0530 Subject: [PATCH 4/9] styling, import fixes --- .../com/qubole/sparklens/common/AggregateMetrics.scala | 2 +- .../com/qubole/sparklens/common/DriverMetrics.scala | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala index 7f72807..8cf7a17 100644 --- a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala @@ -115,7 +115,7 @@ class AggregateMetrics() { count += 1 } - def print(caption: String, sb: mutable.StringBuilder):Unit = { + def print(caption: String, sb: mutable.StringBuilder): Unit = { sb.append(s" AggregateMetrics (${caption}) total measurements ${count} ") .append("\n") sb.append(f" NAME SUM MIN MAX MEAN ") diff --git a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala b/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala index 69c1dec..0d48eb8 100644 --- a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala @@ -61,18 +61,14 @@ class DriverMetrics { .append("\n") } - def print(sb: mutable.StringBuilder): Unit = { - map.toBuffer.sortWith((a, b) => a._1.toString < b._1.toString).foreach(x => { - formatterMap(x._1)(x, sb) - }) - } - def print(caption: String, sb: mutable.StringBuilder):Unit = { sb.append(s" DriverMetrics (${caption}) total measurements ${count} ") .append("\n") sb.append(f" NAME Value ") .append("\n") - print(sb) + map.toBuffer.sortWith((a, b) => a._1.toString < b._1.toString).foreach(x => { + formatterMap(x._1)(x, sb) + }) } } From 66b73565f91d69814ba3ce881df4c9b390f4b335 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 23 May 2019 09:51:52 +0530 Subject: [PATCH 5/9] fixes in formatting --- .../qubole/sparklens/QuboleJobListener.scala | 3 +-- .../qubole/sparklens/common/DriverMetrics.scala | 17 ++++++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 8803551..d74ba96 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -106,7 +106,6 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { } private def collectDriverMetrics(): Unit = { - println("Collecting the driver metrics") driverMetrics.updateMetric(DriverMetrics.driverCPUTime, ManagementFactory.getThreadMXBean.getCurrentThreadCpuTime) @@ -178,7 +177,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { // Start a thread to collect the driver JVM memory stats every 10 seconds threadExecutor = Executors.newSingleThreadScheduledExecutor - threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 10, TimeUnit.MILLISECONDS) + threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 500, TimeUnit.MILLISECONDS) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { diff --git a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala b/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala index 0d48eb8..dd0e28d 100644 --- a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala @@ -23,7 +23,6 @@ import scala.collection.mutable class DriverMetrics { - var count = 0L val map = new mutable.HashMap[DriverMetrics.Metric, AggregateValue]() @transient val formatterMap = new mutable.HashMap[DriverMetrics.Metric, ((DriverMetrics .Metric, AggregateValue), mutable.StringBuilder) => Unit]() @@ -33,19 +32,18 @@ class DriverMetrics { formatterMap(DriverMetrics.driverMaxHeapUsed) = formatStaticBytes formatterMap(DriverMetrics.driverCPUTime) = formatStaticMillisTime formatterMap(DriverMetrics.driverGCTime) = formatStaticMillisTime - formatterMap(DriverMetrics.driverGCCount) = formatStaticMillisTime + formatterMap(DriverMetrics.driverGCCount) = formatCount def updateMetric(metric: DriverMetrics.Metric, newValue: Long): Unit = { val aggregateValue = map.getOrElse(metric, new AggregateValue) - if (count == 0) { + if (!map.contains(metric)) { map(metric) = aggregateValue } aggregateValue.value = math.max(aggregateValue.max, newValue) - count += 1 } def getMap(): Map[String, Any] = { - Map("count" -> count, "map" -> map.keys.map(key => (key.toString, map.get(key).get.getMap())).toMap) + Map("map" -> map.keys.map(key => (key.toString, map.get(key).get.getMap())).toMap) } def formatStaticMillisTime(x: (DriverMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { @@ -61,11 +59,17 @@ class DriverMetrics { .append("\n") } + def formatCount(x: (DriverMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { + sb.append(f" ${x._1}%-30s${x._2.value}%20s") + .append("\n") + } + def print(caption: String, sb: mutable.StringBuilder):Unit = { - sb.append(s" DriverMetrics (${caption}) total measurements ${count} ") + sb.append(s" DriverMetrics (${caption}) ") .append("\n") sb.append(f" NAME Value ") .append("\n") + map.toBuffer.sortWith((a, b) => a._1.toString < b._1.toString).foreach(x => { formatterMap(x._1)(x, sb) }) @@ -90,7 +94,6 @@ object DriverMetrics extends Enumeration { implicit val formats = DefaultFormats val metrics = new DriverMetrics() - metrics.count = (json \ "count").extract[Int] val map = (json \ "map").extract[Map[String, JValue]] map.keys.foreach(key => metrics.map.put(withName(key), From f9fc58d890c3d894cee200b6af1cec09e9882e8b Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 23 May 2019 11:13:31 +0530 Subject: [PATCH 6/9] fixed wrong assignment to the metrics --- .../scala/com/qubole/sparklens/QuboleJobListener.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index d74ba96..17cc32f 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -105,7 +105,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { Some(jobTime/totalTime) } - private def collectDriverMetrics(): Unit = { + private def collectDriverGCMetrics(): Unit = { driverMetrics.updateMetric(DriverMetrics.driverCPUTime, ManagementFactory.getThreadMXBean.getCurrentThreadCpuTime) @@ -117,8 +117,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { gcCount += current.getCollectionCount gcTime += current.getCollectionTime } - driverMetrics.updateMetric(DriverMetrics.driverGCTime, gcCount) - driverMetrics.updateMetric(DriverMetrics.driverGCCount, gcTime) + driverMetrics.updateMetric(DriverMetrics.driverGCTime, gcTime) + driverMetrics.updateMetric(DriverMetrics.driverGCCount, gcCount) } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { @@ -183,7 +183,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { //println(s"Application ${appInfo.applicationID} ended at ${applicationEnd.time}") appInfo.endTime = applicationEnd.time - collectDriverMetrics() + collectDriverGCMetrics() threadExecutor.shutdown() val appContext = new AppContext(appInfo, From e691d50fe8c9db7b78f1ce7e193f1cc681dcb19f Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 27 Jun 2019 16:30:45 +0530 Subject: [PATCH 7/9] Commit 2 --- .../qubole/sparklens/QuboleJobListener.scala | 38 ++--------------- .../sparklens/common/DriverMetrics.scala | 42 +++++++++++++++++++ 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 17cc32f..7ca2c6b 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -17,9 +17,7 @@ package com.qubole.sparklens -import java.lang.management.ManagementFactory import java.net.URI -import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import com.qubole.sparklens.analyzer._ import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} @@ -52,17 +50,6 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { protected val appMetrics = new AggregateMetrics() protected val driverMetrics = new DriverMetrics() - private var threadExecutor: ScheduledExecutorService = _ - - private val updateDriverMemMetrics = new Runnable { - def run() = { - val memUsage = java.lang.management.ManagementFactory.getMemoryMXBean.getHeapMemoryUsage - driverMetrics.updateMetric(DriverMetrics.driverHeapMax, memUsage.getMax) - driverMetrics.updateMetric(DriverMetrics.driverMaxHeapCommitted, memUsage.getCommitted) - driverMetrics.updateMetric(DriverMetrics.driverMaxHeapUsed, memUsage.getUsed) - } - } - private def hostCount():Int = hostMap.size private def executorCount(): Int = executorMap.size @@ -105,22 +92,6 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { Some(jobTime/totalTime) } - private def collectDriverGCMetrics(): Unit = { - driverMetrics.updateMetric(DriverMetrics.driverCPUTime, - ManagementFactory.getThreadMXBean.getCurrentThreadCpuTime) - - var gcCount: Long = 0 - var gcTime: Long = 0 - val iter = ManagementFactory.getGarbageCollectorMXBeans.iterator() - while (iter.hasNext) { - val current = iter.next() - gcCount += current.getCollectionCount - gcTime += current.getCollectionTime - } - driverMetrics.updateMetric(DriverMetrics.driverGCTime, gcTime) - driverMetrics.updateMetric(DriverMetrics.driverGCCount, gcCount) - } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val taskMetrics = taskEnd.taskMetrics val taskInfo = taskEnd.taskInfo @@ -174,17 +145,14 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { //println(s"Application ${applicationStart.appId} started at ${applicationStart.time}") appInfo.applicationID = applicationStart.appId.getOrElse("NA") appInfo.startTime = applicationStart.time - - // Start a thread to collect the driver JVM memory stats every 10 seconds - threadExecutor = Executors.newSingleThreadScheduledExecutor - threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 500, TimeUnit.MILLISECONDS) + driverMetrics.scheduleMetricsCollection() } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { //println(s"Application ${appInfo.applicationID} ended at ${applicationEnd.time}") appInfo.endTime = applicationEnd.time - collectDriverGCMetrics() - threadExecutor.shutdown() + driverMetrics.collectGCMetrics() + driverMetrics.terminateMetricsCollection() val appContext = new AppContext(appInfo, appMetrics, diff --git a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala b/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala index dd0e28d..fcb391a 100644 --- a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala @@ -17,6 +17,9 @@ package com.qubole.sparklens.common +import java.lang.management.ManagementFactory +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + import com.qubole.sparklens.common.MetricsHelper._ import scala.collection.mutable @@ -34,6 +37,45 @@ class DriverMetrics { formatterMap(DriverMetrics.driverGCTime) = formatStaticMillisTime formatterMap(DriverMetrics.driverGCCount) = formatCount + + private val threadExecutor = Executors.newSingleThreadScheduledExecutor + threadExecutor + + val updateDriverMemMetrics = new Runnable { + def run() = { + val memUsage = java.lang.management.ManagementFactory.getMemoryMXBean.getHeapMemoryUsage + updateMetric(DriverMetrics.driverHeapMax, memUsage.getMax) + updateMetric(DriverMetrics.driverMaxHeapCommitted, memUsage.getCommitted) + updateMetric(DriverMetrics.driverMaxHeapUsed, memUsage.getUsed) + } + } + + def collectGCMetrics(): Unit = { + updateMetric(DriverMetrics.driverCPUTime, + ManagementFactory.getThreadMXBean.getCurrentThreadCpuTime) + + var gcCount: Long = 0 + var gcTime: Long = 0 + val iter = ManagementFactory.getGarbageCollectorMXBeans.iterator() + while (iter.hasNext) { + val current = iter.next() + gcCount += current.getCollectionCount + gcTime += current.getCollectionTime + } + updateMetric(DriverMetrics.driverGCTime, gcTime) + updateMetric(DriverMetrics.driverGCCount, gcCount) + } + + // Start a thread to collect the driver JVM memory stats every 10 seconds + def scheduleMetricsCollection(): Unit = { + + threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 10, TimeUnit.SECONDS) + } + + def terminateMetricsCollection(): Unit = { + threadExecutor.shutdown() + } + def updateMetric(metric: DriverMetrics.Metric, newValue: Long): Unit = { val aggregateValue = map.getOrElse(metric, new AggregateValue) if (!map.contains(metric)) { From a062a244041ec9ed45e40b35c902d89a26d45c50 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 25 Jul 2019 17:33:32 +0530 Subject: [PATCH 8/9] Added a generic framework to make it easier to add new metrices to sparklens --- .../qubole/sparklens/QuboleJobListener.scala | 20 ++++-- .../sparklens/QuboleNotebookListener.scala | 4 +- .../analyzer/SimpleAppAnalyzer.scala | 7 +- .../qubole/sparklens/common/AppContext.scala | 17 ++--- .../pluggable/ComplimentaryMetrics.scala | 70 +++++++++++++++++++ .../{common => pluggable}/DriverMetrics.scala | 27 ++++--- .../analyzer/JobOverlapAnalyzerSuite.scala | 6 +- .../PQParallelStageSchedulerSuite.scala | 5 +- 8 files changed, 126 insertions(+), 30 deletions(-) create mode 100644 src/main/scala/com/qubole/sparklens/pluggable/ComplimentaryMetrics.scala rename src/main/scala/com/qubole/sparklens/{common => pluggable}/DriverMetrics.scala (84%) diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 7ca2c6b..f078861 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -20,8 +20,9 @@ package com.qubole.sparklens import java.net.URI import com.qubole.sparklens.analyzer._ -import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} +import com.qubole.sparklens.pluggable.{ComplimentaryMetrics, DriverMetrics} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf @@ -48,7 +49,8 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { protected val stageIDToJobID = new mutable.HashMap[Int, Long] protected val failedStages = new ListBuffer[String] protected val appMetrics = new AggregateMetrics() - protected val driverMetrics = new DriverMetrics() + val pluggableMetricsMap = new mutable.HashMap[String, ComplimentaryMetrics]() + private def hostCount():Int = hostMap.size @@ -145,24 +147,28 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { //println(s"Application ${applicationStart.appId} started at ${applicationStart.time}") appInfo.applicationID = applicationStart.appId.getOrElse("NA") appInfo.startTime = applicationStart.time - driverMetrics.scheduleMetricsCollection() + pluggableMetricsMap("driverMetrics") = new DriverMetrics() + pluggableMetricsMap.foreach(x => + x._2.onApplicationStart(applicationStart) + ) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { //println(s"Application ${appInfo.applicationID} ended at ${applicationEnd.time}") appInfo.endTime = applicationEnd.time - driverMetrics.collectGCMetrics() - driverMetrics.terminateMetricsCollection() + pluggableMetricsMap.foreach(x => + x._2.onApplicationEnd(applicationEnd) + ) val appContext = new AppContext(appInfo, appMetrics, - driverMetrics, hostMap, executorMap, jobMap, jobSQLExecIDMap, stageMap, - stageIDToJobID) + stageIDToJobID, + pluggableMetricsMap) asyncReportingEnabled(sparkConf) match { case true => { diff --git a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala index d801f9f..61a3900 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala @@ -120,13 +120,13 @@ class QuboleNotebookListener(sparkConf: SparkConf) extends QuboleJobListener(spa val appContext = new AppContext(appInfo, appMetrics, - driverMetrics, hostMap, executorMap, jobMap, jobSQLExecIDMap, stageMap, - stageIDToJobID) + stageIDToJobID, + pluggableMetricsMap) val out = new mutable.StringBuilder() diff --git a/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala index 7d41c76..25c70a3 100644 --- a/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala +++ b/src/main/scala/com/qubole/sparklens/analyzer/SimpleAppAnalyzer.scala @@ -34,7 +34,12 @@ class SimpleAppAnalyzer extends AppAnalyzer { "task-level granularity and aggregated across the app (all tasks, stages, and jobs).\n") ac.appMetrics.print("Application Metrics", out) out.println("\n") - ac.driverMetrics.print("Driver Metrics", out) + + ac.pluggableMetricsMap.foreach(x => { + x._2.print(x._1, out) + out.println("\n") + }) + out.println("\n") out.toString() } } diff --git a/src/main/scala/com/qubole/sparklens/common/AppContext.scala b/src/main/scala/com/qubole/sparklens/common/AppContext.scala index f92de77..22d7e92 100644 --- a/src/main/scala/com/qubole/sparklens/common/AppContext.scala +++ b/src/main/scala/com/qubole/sparklens/common/AppContext.scala @@ -17,6 +17,7 @@ package com.qubole.sparklens.common import com.qubole.sparklens.timespan._ +import com.qubole.sparklens.pluggable.ComplimentaryMetrics import org.json4s.DefaultFormats import org.json4s.JsonAST.JValue import org.json4s.jackson.Serialization @@ -26,18 +27,17 @@ import scala.collection.mutable case class AppContext(appInfo: ApplicationInfo, appMetrics: AggregateMetrics, - driverMetrics: DriverMetrics, hostMap: mutable.HashMap[String, HostTimeSpan], executorMap: mutable.HashMap[String, ExecutorTimeSpan], jobMap: mutable.HashMap[Long, JobTimeSpan], jobSQLExecIdMap:mutable.HashMap[Long, Long], stageMap: mutable.HashMap[Int, StageTimeSpan], - stageIDToJobID: mutable.HashMap[Int, Long]) { + stageIDToJobID: mutable.HashMap[Int, Long], + pluggableMetricsMap: mutable.HashMap[String, ComplimentaryMetrics]) { def filterByStartAndEndTime(startTime: Long, endTime: Long): AppContext = { new AppContext(appInfo, appMetrics, - driverMetrics, hostMap, executorMap .filter(x => x._2.endTime == 0 || //still running @@ -50,7 +50,8 @@ case class AppContext(appInfo: ApplicationInfo, stageMap .filter(x => x._2.startTime >= startTime && x._2.endTime <= endTime), - stageIDToJobID) + stageIDToJobID, + pluggableMetricsMap) } override def toString(): String = { @@ -58,13 +59,13 @@ case class AppContext(appInfo: ApplicationInfo, val map = Map( "appInfo" -> appInfo.getMap(), "appMetrics" -> appMetrics.getMap(), - "driverMetrics" -> driverMetrics.getMap(), "hostMap" -> AppContext.getMap(hostMap), "executorMap" -> AppContext.getMap(executorMap), "jobMap" -> AppContext.getMap(jobMap), "jobSQLExecIdMap" -> jobSQLExecIdMap, "stageMap" -> AppContext.getMap(stageMap), - "stageIDToJobID" -> stageIDToJobID + "stageIDToJobID" -> stageIDToJobID, + "pluggableMetricsMap" -> ComplimentaryMetrics.getMap(pluggableMetricsMap) ) Serialization.writePretty(map) } @@ -132,13 +133,13 @@ object AppContext { new AppContext( ApplicationInfo.getObject((json \ "appInfo").extract[JValue]), AggregateMetrics.getAggregateMetrics((json \ "appMetrics").extract[JValue]), - DriverMetrics.getDriverMetrics((json \ "driverMetrics").extract[JValue]), HostTimeSpan.getTimeSpan((json \ "hostMap").extract[Map[String, JValue]]), ExecutorTimeSpan.getTimeSpan((json \ "executorMap").extract[Map[String, JValue]]), JobTimeSpan.getTimeSpan((json \ "jobMap").extract[Map[String, JValue]]), getJobSQLExecIdMap(json, new mutable.HashMap[Long, Long]), StageTimeSpan.getTimeSpan((json \ "stageMap").extract[Map[String, JValue]]), - getJobToStageMap((json \ "stageIDToJobID").extract[Map[Int, JValue]]) + getJobToStageMap((json \ "stageIDToJobID").extract[Map[Int, JValue]]), + ComplimentaryMetrics.getMetricsMap((json \ "pluggableMetricsMap").extract[Map[String, JValue]]) ) } diff --git a/src/main/scala/com/qubole/sparklens/pluggable/ComplimentaryMetrics.scala b/src/main/scala/com/qubole/sparklens/pluggable/ComplimentaryMetrics.scala new file mode 100644 index 0000000..d250495 --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/pluggable/ComplimentaryMetrics.scala @@ -0,0 +1,70 @@ +package com.qubole.sparklens.pluggable + +import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart} +import org.json4s.{DefaultFormats, MappingException} +import org.json4s.JsonAST.JValue + +import scala.collection.mutable + +trait ComplimentaryMetrics { + def getMap(): Map[String, _ <: Any] = { + throw new NotImplementedError(s"getMap() method is not implemented.") + } + + def getObject(json: JValue): ComplimentaryMetrics = { + throw new NotImplementedError(s"getObject() method is not implemented.") + } + + def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + throw new NotImplementedError(s"getObject() method is not implemented.") + } + + def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + throw new NotImplementedError(s"getObject() method is not implemented.") + } + + def print(caption: String, sb: mutable.StringBuilder): Unit = { + throw new NotImplementedError(s"getObject() method is not implemented.") + } +} + +object ComplimentaryMetrics { + + /** + * Returns object which extends [[ComplimentaryMetrics]] by matching input string + */ + def fromString(value: String): ComplimentaryMetrics = { + value.toLowerCase match { + case "drivermetrics" => DriverMetrics + case _ => throw new Exception(s"Object ${value} not found.") + } + } + + /** + * Used for for extracting the pluggableMetricsMap in [[com.qubole.sparklens.QuboleJobListener]] + * to construct [[com.qubole.sparklens.common.AppContext]] from the JSON. + */ + def getMetricsMap(json: Map[String, JValue]): mutable.HashMap[String, ComplimentaryMetrics] = { + val metricsMap = new mutable.HashMap[String, ComplimentaryMetrics] + try { + implicit val formats = DefaultFormats + val metricsMap = new mutable.HashMap[String, ComplimentaryMetrics] + json.keys.map(key => { + val value = json.get(key).get + metricsMap.put(key, fromString(key).getObject(value)) + }) + } catch { + case e: Exception if !e.isInstanceOf[MappingException] => + throw(e) + } + metricsMap + } + + /** + * Used for for converting the pluggableMetricsMap in [[com.qubole.sparklens.QuboleJobListener]] + * to a formatted map which is then dumped in the JSON file/printed on console. + */ + def getMap(metricsMap: mutable.HashMap[String, _ <: ComplimentaryMetrics]): Map[String, Any] = { + metricsMap.keys.map(key => (key.toString, metricsMap(key).getMap)).toMap + } +} \ No newline at end of file diff --git a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala b/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala similarity index 84% rename from src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala rename to src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala index fcb391a..4f244e9 100644 --- a/src/main/scala/com/qubole/sparklens/common/DriverMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala @@ -15,16 +15,19 @@ * limitations under the License. */ -package com.qubole.sparklens.common +package com.qubole.sparklens.pluggable import java.lang.management.ManagementFactory import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} +import com.qubole.sparklens.common.AggregateValue import com.qubole.sparklens.common.MetricsHelper._ +import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart} +import org.json4s import scala.collection.mutable -class DriverMetrics { +class DriverMetrics extends ComplimentaryMetrics { val map = new mutable.HashMap[DriverMetrics.Metric, AggregateValue]() @transient val formatterMap = new mutable.HashMap[DriverMetrics.Metric, ((DriverMetrics @@ -66,9 +69,17 @@ class DriverMetrics { updateMetric(DriverMetrics.driverGCCount, gcCount) } + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + scheduleMetricsCollection() + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + collectGCMetrics() + terminateMetricsCollection() + } + // Start a thread to collect the driver JVM memory stats every 10 seconds def scheduleMetricsCollection(): Unit = { - threadExecutor.scheduleAtFixedRate(updateDriverMemMetrics, 0, 10, TimeUnit.SECONDS) } @@ -84,7 +95,7 @@ class DriverMetrics { aggregateValue.value = math.max(aggregateValue.max, newValue) } - def getMap(): Map[String, Any] = { + override def getMap(): Map[String, Any] = { Map("map" -> map.keys.map(key => (key.toString, map.get(key).get.getMap())).toMap) } @@ -97,7 +108,7 @@ class DriverMetrics { } def formatStaticBytes(x: (DriverMetrics.Metric, AggregateValue), sb: mutable.StringBuilder): Unit = { - sb.append(f" ${x._1}%-30s${bytesToString(x._2.value)}%20s") + sb.append(f" ${x._1}%-30s${bytesToString(x._2.value)}%20s") .append("\n") } @@ -106,7 +117,7 @@ class DriverMetrics { .append("\n") } - def print(caption: String, sb: mutable.StringBuilder):Unit = { + override def print(caption: String, sb: mutable.StringBuilder):Unit = { sb.append(s" DriverMetrics (${caption}) ") .append("\n") sb.append(f" NAME Value ") @@ -118,7 +129,7 @@ class DriverMetrics { } } -object DriverMetrics extends Enumeration { +object DriverMetrics extends Enumeration with ComplimentaryMetrics { import org.json4s._ type Metric = Value @@ -131,7 +142,7 @@ object DriverMetrics extends Enumeration { driverGCTime = Value - def getDriverMetrics(json: JValue): DriverMetrics = { + override def getObject(json: json4s.JValue): ComplimentaryMetrics = { try { implicit val formats = DefaultFormats diff --git a/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala b/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala index 6442931..c017723 100644 --- a/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala +++ b/src/test/scala/com/qubole/sparklens/analyzer/JobOverlapAnalyzerSuite.scala @@ -21,6 +21,7 @@ package com.qubole.sparklens.analyzer import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} import com.qubole.sparklens.helper.JobOverlapHelper +import com.qubole.sparklens.pluggable.ComplimentaryMetrics import org.scalatest.FunSuite @@ -61,13 +62,14 @@ class JobOverlapAnalyzerSuite extends FunSuite { new AppContext(new ApplicationInfo(), new AggregateMetrics(), - new DriverMetrics(), mutable.HashMap[String, HostTimeSpan](), mutable.HashMap[String, ExecutorTimeSpan](), jobMap, jobSQLExecIDMap, mutable.HashMap[Int, StageTimeSpan](), - mutable.HashMap[Int, Long]()) + mutable.HashMap[Int, Long](), + mutable.HashMap[String, ComplimentaryMetrics]() + ) } test("JobOverlapAnalyzerTest: Jobs running in parallel should be considered while computing " + diff --git a/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala b/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala index e6f3a77..8005aa2 100644 --- a/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala +++ b/src/test/scala/com/qubole/sparklens/scheduler/PQParallelStageSchedulerSuite.scala @@ -19,6 +19,7 @@ package com.qubole.sparklens.scheduler import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo, DriverMetrics} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} +import com.qubole.sparklens.pluggable.ComplimentaryMetrics import org.scalatest.FunSuite import scala.collection.mutable @@ -261,13 +262,13 @@ class PQParallelStageSchedulerSuite extends FunSuite { val ac = new AppContext(new ApplicationInfo(), new AggregateMetrics(), - new DriverMetrics(), mutable.HashMap[String, HostTimeSpan](), mutable.HashMap[String, ExecutorTimeSpan](), jobMap, jobSQLExecIDMap, mutable.HashMap[Int, StageTimeSpan](), - mutable.HashMap[Int, Long]()) + mutable.HashMap[Int, Long](), + mutable.HashMap[String, ComplimentaryMetrics]()) val time = CompletionEstimator.estimateAppWallClockTimeWithJobLists(ac, 1, 1, 3) assert(time === 3, s"Test failed") From 6f2d0e287b1d484d71ee84177886abf357a8398e Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Thu, 25 Jul 2019 18:03:55 +0530 Subject: [PATCH 9/9] Fix the driver CPU time collection calculation --- .../com/qubole/sparklens/pluggable/DriverMetrics.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala b/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala index 4f244e9..4036c22 100644 --- a/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/pluggable/DriverMetrics.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import com.qubole.sparklens.common.AggregateValue import com.qubole.sparklens.common.MetricsHelper._ +import javax.management.{Attribute, ObjectName} import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart} import org.json4s @@ -29,6 +30,8 @@ import scala.collection.mutable class DriverMetrics extends ComplimentaryMetrics { + private val ProcessCpuTime = "ProcessCpuTime" + val map = new mutable.HashMap[DriverMetrics.Metric, AggregateValue]() @transient val formatterMap = new mutable.HashMap[DriverMetrics.Metric, ((DriverMetrics .Metric, AggregateValue), mutable.StringBuilder) => Unit]() @@ -54,8 +57,11 @@ class DriverMetrics extends ComplimentaryMetrics { } def collectGCMetrics(): Unit = { + val operatingSystemObjectName = ObjectName.getInstance("java.lang:type=OperatingSystem") updateMetric(DriverMetrics.driverCPUTime, - ManagementFactory.getThreadMXBean.getCurrentThreadCpuTime) + ManagementFactory.getPlatformMBeanServer + .getAttribute(operatingSystemObjectName, ProcessCpuTime).asInstanceOf[Attribute] + .getValue.asInstanceOf[Long]) var gcCount: Long = 0 var gcTime: Long = 0