From a0010a433740d5fec09f0301165927b3debf9afc Mon Sep 17 00:00:00 2001 From: gfieni Date: Thu, 1 Mar 2018 11:35:35 +0100 Subject: [PATCH 1/2] fix(sampling-cpu): Fix broken CPU sampling --- .../module/libpfm/LibpfmCoreProcessSensor.scala | 4 +--- .../org/powerapi/module/libpfm/LibpfmCoreSensor.scala | 4 +--- .../org/powerapi/module/libpfm/LibpfmFormula.scala | 8 +++----- .../module/libpfm/PerformanceCounterChannel.scala | 4 ++-- .../libpfm/cycles/LibpfmCoreCyclesFormula.scala | 11 ++++------- .../scala/org/powerapi/sampling/cpu/Sampling.scala | 10 ++++++---- 6 files changed, 17 insertions(+), 24 deletions(-) diff --git a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreProcessSensor.scala b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreProcessSensor.scala index 9692384..3d20522 100644 --- a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreProcessSensor.scala +++ b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreProcessSensor.scala @@ -123,9 +123,7 @@ class LibpfmCoreProcessSensor(eventBus: MessageBus, muid: UUID, target: Target, (core, event, Await.result(actor.?(msg.tick)(timeout), timeout.duration).asInstanceOf[HWCounter]) } - publishPCReport(muid, target, allValues.groupBy(tuple3 => (tuple3._1, tuple3._2)).map { - case ((core, event), values) => Map[Int, Map[String, Seq[HWCounter]]](core -> Map(event -> values.map(_._3).toSeq)) - }.foldLeft(Map[Int, Map[String, Seq[HWCounter]]]())((acc, elt) => acc ++ elt), msg.tick)(eventBus) + publishPCReport(muid, target, allValues.groupBy(_._1).mapValues(_.map { case (_, event, hpc) => Map(event -> hpc) }.reduce(_ ++ _)), msg.tick)(eventBus) context.become(sense(newIdentifiers) orElse sensorDefault) } diff --git a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreSensor.scala b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreSensor.scala index 8e9c455..5b043b2 100644 --- a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreSensor.scala +++ b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmCoreSensor.scala @@ -87,8 +87,6 @@ class LibpfmCoreSensor(eventBus: MessageBus, muid: UUID, target: Target, libpfmH (core, event, Await.result(actor.?(msg.tick)(timeout), timeout.duration).asInstanceOf[HWCounter]) } - publishPCReport(muid, target, allValues.groupBy(tuple3 => (tuple3._1, tuple3._2)).map { - case ((core, event), values) => Map[Int, Map[String, Seq[HWCounter]]](core -> Map(event -> values.map(_._3).toSeq)) - }.foldLeft(Map[Int, Map[String, Seq[HWCounter]]]())((acc, elt) => acc ++ elt), msg.tick)(eventBus) + publishPCReport(muid, target, allValues.groupBy(_._1).mapValues(_.map { case (_, event, hpc) => Map(event -> hpc) }.reduce(_ ++ _)), msg.tick)(eventBus) } } diff --git a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmFormula.scala b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmFormula.scala index fe4474c..cbd66ca 100644 --- a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmFormula.scala +++ b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/LibpfmFormula.scala @@ -25,15 +25,13 @@ package org.powerapi.module.libpfm import java.util.UUID import scala.concurrent.duration.FiniteDuration - import akka.actor.Actor - import org.powerapi.core.MessageBus import org.powerapi.core.power._ import org.powerapi.core.target.Target import org.powerapi.module.Formula import org.powerapi.module.PowerChannel.publishRawPowerReport -import org.powerapi.module.libpfm.PerformanceCounterChannel.{PCReport, subscribePCReport, unsubscribePCReport} +import org.powerapi.module.libpfm.PerformanceCounterChannel.{HWCounter, PCReport, subscribePCReport, unsubscribePCReport} /** * This formula is designed to fit a multivariate power model. @@ -57,8 +55,8 @@ class LibpfmFormula(eventBus: MessageBus, muid: UUID, target: Target, formula: M if (now - old <= 0) 0 else { val value = msg.values.values.flatten.collect { - case (ev, counters) if ev == event => counters.map(_.value) - }.foldLeft(Seq[Long]())((acc, value) => acc ++ value).sum + case (hpcEvent, hpcValue) if hpcEvent == event => hpcValue.value + }.sum coeff * math.round(value * (samplingInterval.toNanos / (now - old).toDouble)) } diff --git a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/PerformanceCounterChannel.scala b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/PerformanceCounterChannel.scala index 9cb335b..7de8bbd 100644 --- a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/PerformanceCounterChannel.scala +++ b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/PerformanceCounterChannel.scala @@ -43,7 +43,7 @@ object PerformanceCounterChannel extends Channel { /** * Publish a PerformanceCounterReport in the event bus. */ - def publishPCReport(muid: UUID, target: Target, values: Map[Int, Map[String, Seq[HWCounter]]], tick: Tick): MessageBus => Unit = { + def publishPCReport(muid: UUID, target: Target, values: Map[Int, Map[String, HWCounter]], tick: Tick): MessageBus => Unit = { publish(PCReport(pcReportToTopic(muid, target), muid, target, values, tick)) } @@ -109,7 +109,7 @@ object PerformanceCounterChannel extends Channel { case class PCReport(topic: String, muid: UUID, target: Target, - values: Map[Int, Map[String, Seq[HWCounter]]], + values: Map[Int, Map[String, HWCounter]], tick: Tick) extends Message /** diff --git a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/cycles/LibpfmCoreCyclesFormula.scala b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/cycles/LibpfmCoreCyclesFormula.scala index af0f280..3c95582 100644 --- a/powerapi-core/src/main/scala/org/powerapi/module/libpfm/cycles/LibpfmCoreCyclesFormula.scala +++ b/powerapi-core/src/main/scala/org/powerapi/module/libpfm/cycles/LibpfmCoreCyclesFormula.scala @@ -56,13 +56,10 @@ class LibpfmCoreCyclesFormula(eventBus: MessageBus, muid: UUID, target: Target, val now = System.nanoTime() val powers = for (value <- msg.values) yield { - val cycles = value._2.getOrElse(cyclesThreadName, Seq(HWCounter(0))) - val refs = value._2.getOrElse(cyclesRefName, Seq(HWCounter(0))) - val cyclesVal = cycles.map(_.value).sum - val scaledCycles = if (now - old <= 0) 0l else math.round(cyclesVal * (samplingInterval.toNanos / (now - old).toDouble)) - - val refsVal = refs.map(_.value).sum - val scaledRefs = if (now - old <= 0) 0l else math.round(refsVal * (samplingInterval.toNanos / (now - old).toDouble)) + val cycles = value._2.getOrElse(cyclesThreadName, HWCounter(0)) + val refs = value._2.getOrElse(cyclesRefName, HWCounter(0)) + val scaledCycles = if (now - old <= 0) 0l else math.round(cycles.value * (samplingInterval.toNanos / (now - old).toDouble)) + val scaledRefs = if (now - old <= 0) 0l else math.round(refs.value * (samplingInterval.toNanos / (now - old).toDouble)) var coefficient: Double = math.round(scaledCycles / scaledRefs.toDouble) diff --git a/powerapi-sampling-cpu/src/main/scala/org/powerapi/sampling/cpu/Sampling.scala b/powerapi-sampling-cpu/src/main/scala/org/powerapi/sampling/cpu/Sampling.scala index eeb551f..33f1e30 100644 --- a/powerapi-sampling-cpu/src/main/scala/org/powerapi/sampling/cpu/Sampling.scala +++ b/powerapi-sampling-cpu/src/main/scala/org/powerapi/sampling/cpu/Sampling.scala @@ -79,16 +79,18 @@ class CountersDisplay(basepath: String, events: Set[String]) extends Actor with } def receive: Actor.Receive = { - case msg: PCReport => report(msg) - case msg: String => append(msg) + case msg: PCReport => + report(msg) + case msg: String => + append(msg) } def report(msg: PCReport): Unit = { for (event <- events) { val counter = msg.values.values.flatten.collect { - case (ev, counters) if ev == event => counters.map(_.value) - }.foldLeft(Seq[Long]())((acc, value) => acc ++ value).sum + case (hpcEvent, hpcValue) if hpcEvent == event => hpcValue.value + }.sum outputs(event).append(s"$counter\n") outputs(event).flush() From ef760bffd109d994cccb46389f2d73e4bac8f16d Mon Sep 17 00:00:00 2001 From: gfieni Date: Thu, 1 Mar 2018 13:57:06 +0100 Subject: [PATCH 2/2] feat(sampling-cpu): Add messages before starting each phases --- .../src/main/scala/org/powerapi/sampling/cpu/Application.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/powerapi-sampling-cpu/src/main/scala/org/powerapi/sampling/cpu/Application.scala b/powerapi-sampling-cpu/src/main/scala/org/powerapi/sampling/cpu/Application.scala index 8c7347d..a156daf 100644 --- a/powerapi-sampling-cpu/src/main/scala/org/powerapi/sampling/cpu/Application.scala +++ b/powerapi-sampling-cpu/src/main/scala/org/powerapi/sampling/cpu/Application.scala @@ -122,14 +122,17 @@ object Application extends App { } if (samplingOption._1) { + println("Starting sampling phase...") Sampling(samplingOption._2, configuration, libpfmHelper).run() } if (processingOption._1) { + println("Starting processing phase...") Processing(samplingOption._2, processingOption._2, configuration).run() } if (computingOption._1) { + println("Starting computation phase...") PolynomialCyclesRegression(processingOption._2, computingOption._2, configuration).run() }