forked from powerapi-ng/powerapi-scala
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
merge(feature/reporter): Merge the feature/reporter into develop
Reference: powerapi-ng#33
- Loading branch information
Showing
31 changed files
with
1,382 additions
and
151 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,47 +24,56 @@ package org.powerapi.core | |
|
||
import java.util.UUID | ||
import akka.actor.SupervisorStrategy.{Directive, Resume} | ||
import akka.actor.{Actor, PoisonPill, Props} | ||
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props} | ||
import akka.event.LoggingReceive | ||
import org.powerapi.core.ClockChannel.ClockTick | ||
import org.powerapi.core.power.Power | ||
import org.powerapi.module.PowerChannel.PowerReport | ||
import org.powerapi.core.target.Target | ||
import org.powerapi.reporter.ReporterComponent | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
/** | ||
* One child represents one monitor. | ||
* Allows to publish messages in the right topics depending of the targets. | ||
* | ||
* @author <a href="mailto:[email protected]">Maxime Colmant</a> | ||
* @author <a href="mailto:[email protected]">Loïc Huertas</a> | ||
*/ | ||
class MonitorChild(eventBus: MessageBus, | ||
muid: UUID, | ||
frequency: FiniteDuration, | ||
targets: List[Target]) extends ActorComponent { | ||
targets: List[Target], | ||
aggFunction: Seq[Power] => Power) extends ActorComponent { | ||
|
||
import org.powerapi.core.ClockChannel.{startClock, stopClock, subscribeClockTick, unsubscribeClockTick} | ||
import org.powerapi.core.MonitorChannel.{MonitorStart, MonitorStop, MonitorStopAll, publishMonitorTick} | ||
import org.powerapi.module.PowerChannel.{ AggregateReport, render, subscribePowerReport, unsubscribePowerReport } | ||
|
||
def receive: PartialFunction[Any, Unit] = LoggingReceive { | ||
case MonitorStart(_, id, freq, targs) if muid == id && frequency == freq && targets == targs => start() | ||
case MonitorStart(_, id, freq, targs, _) if muid == id && frequency == freq && targets == targs => start() | ||
} orElse default | ||
|
||
/** | ||
* Running state. | ||
*/ | ||
def running: Actor.Receive = LoggingReceive { | ||
def running(aggR: AggregateReport): Actor.Receive = LoggingReceive { | ||
case tick: ClockTick => produceMessages(tick) | ||
case powerReport: PowerReport => aggregate(aggR, powerReport) | ||
case MonitorStop(_, id) if muid == id => stop() | ||
case _: MonitorStopAll => stop() | ||
} orElse default | ||
|
||
/** | ||
* Start the clock, subscribe on the associated topic for receiving tick messages. | ||
* Start the clock, subscribe on the associated topic for receiving tick messages | ||
* and power reports. | ||
*/ | ||
def start(): Unit = { | ||
startClock(frequency)(eventBus) | ||
subscribeClockTick(frequency)(eventBus)(self) | ||
subscribePowerReport(muid)(eventBus)(self) | ||
log.info("monitor is started, muid: {}", muid) | ||
context.become(running) | ||
context.become(running(AggregateReport(muid, aggFunction))) | ||
} | ||
|
||
/** | ||
|
@@ -73,14 +82,29 @@ class MonitorChild(eventBus: MessageBus, | |
def produceMessages(tick: ClockTick): Unit = { | ||
targets.foreach(target => publishMonitorTick(muid, target, tick)(eventBus)) | ||
} | ||
|
||
/** | ||
* Wait to retrieve power reports of all targets from a same monitor to aggregate them | ||
* into once power report. | ||
*/ | ||
def aggregate(aggR: AggregateReport, powerReport: PowerReport): Unit = { | ||
aggR += powerReport | ||
if (aggR.size == targets.size) { | ||
render(aggR)(eventBus) | ||
context.become(running(AggregateReport(muid, aggFunction))) | ||
} | ||
else | ||
context.become(running(aggR)) | ||
} | ||
|
||
/** | ||
* Publish a request for stopping the clock which is responsible to produce the ticks at this frequency, | ||
* stop to listen ticks and kill the monitor actor. | ||
* stop to listen ticks and power reports and kill the monitor actor. | ||
*/ | ||
def stop(): Unit = { | ||
stopClock(frequency)(eventBus) | ||
unsubscribeClockTick(frequency)(eventBus)(self) | ||
unsubscribePowerReport(muid)(eventBus)(self) | ||
log.info("monitor is stopped, muid: {}", muid) | ||
self ! PoisonPill | ||
} | ||
|
@@ -90,17 +114,22 @@ class MonitorChild(eventBus: MessageBus, | |
* This actor listens the bus on a given topic and reacts on the received messages. | ||
* It is responsible to handle a pool of child actors which represent all monitors. | ||
* | ||
* @author <a href="mailto:[email protected]">Maxime Colmant</a> | ||
* @author Maxime Colmant <[email protected]> | ||
*/ | ||
class Monitors(eventBus: MessageBus) extends Supervisor { | ||
import org.powerapi.core.MonitorChannel.{MonitorStart, MonitorStop, MonitorStopAll, formatMonitorChildName, subscribeMonitorsChannel} | ||
import org.powerapi.core.MonitorChannel.{MonitorStart, MonitorStop, MonitorStopAll, formatMonitorChildName, stopAllMonitor, subscribeMonitorsChannel} | ||
import org.powerapi.module.SensorChannel.{monitorAllStopped, monitorStopped} | ||
|
||
override def preStart(): Unit = { | ||
subscribeMonitorsChannel(eventBus)(self) | ||
super.preStart() | ||
} | ||
|
||
override def postStop(): Unit = { | ||
context.actorSelection("*") ! stopAllMonitor | ||
super.postStop() | ||
} | ||
|
||
/** | ||
* MonitorChild actors can only launch exception if the message received is not handled. | ||
*/ | ||
|
@@ -128,7 +157,7 @@ class Monitors(eventBus: MessageBus) extends Supervisor { | |
*/ | ||
def start(msg: MonitorStart): Unit = { | ||
val name = formatMonitorChildName(msg.muid) | ||
val child = context.actorOf(Props(classOf[MonitorChild], eventBus, msg.muid, msg.frequency, msg.targets), name) | ||
val child = context.actorOf(Props(classOf[MonitorChild], eventBus, msg.muid, msg.frequency, msg.targets, msg.aggFunction), name) | ||
child ! msg | ||
context.become(running) | ||
} | ||
|
@@ -159,9 +188,20 @@ class Monitors(eventBus: MessageBus) extends Supervisor { | |
/** | ||
* This class is an interface for interacting directly with a MonitorChild actor. | ||
*/ | ||
class Monitor(eventBus: MessageBus, targets: List[Target]) { | ||
class Monitor(eventBus: MessageBus, _system: ActorSystem) { | ||
val muid = UUID.randomUUID() | ||
|
||
|
||
def reportTo(reporterComponent: Class[_ <: ReporterComponent], args: List[Any] = List()): Monitor = { | ||
reportTo(_system.actorOf(Props(reporterComponent, args: _*))) | ||
} | ||
|
||
def reportTo(reporter: ActorRef): Monitor = { | ||
import org.powerapi.module.PowerChannel.subscribeAggPowerReport | ||
|
||
subscribeAggPowerReport(muid)(eventBus)(reporter) | ||
this | ||
} | ||
|
||
def cancel(): Unit = { | ||
import org.powerapi.core.MonitorChannel.stopMonitor | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,17 +23,20 @@ | |
package org.powerapi.core | ||
|
||
import java.util.UUID | ||
import akka.actor.ActorRef | ||
import org.powerapi.core.ClockChannel.ClockTick | ||
import org.powerapi.core.target.Target | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
import akka.actor.ActorRef | ||
|
||
/** | ||
* Monitor channel and messages. | ||
* | ||
* @author <a href="mailto:[email protected]">Maxime Colmant</a> | ||
* @author <a href="mailto:[email protected]">Loïc Huertas</a> | ||
*/ | ||
object MonitorChannel extends Channel { | ||
import org.powerapi.core.ClockChannel.ClockTick | ||
import org.powerapi.core.power.Power | ||
|
||
type M = MonitorMessage | ||
|
||
|
@@ -59,11 +62,13 @@ object MonitorChannel extends Channel { | |
* @param muid: monitor unique identifier (MUID), which is at the origin of the report flow. | ||
* @param frequency: clock frequency. | ||
* @param targets: monitor targets. | ||
* @param aggFunction: aggregate power estimation for a specific sample of power reports. | ||
*/ | ||
case class MonitorStart(topic: String, | ||
muid: UUID, | ||
frequency: FiniteDuration, | ||
targets: List[Target]) extends MonitorMessage | ||
targets: List[Target], | ||
aggFunction: Seq[Power] => Power) extends MonitorMessage | ||
|
||
/** | ||
* MonitorStop is represented as a dedicated type of message. | ||
|
@@ -100,8 +105,8 @@ object MonitorChannel extends Channel { | |
/** | ||
* External Methods used by the API (or a Monitor object) for interacting with the bus. | ||
*/ | ||
def startMonitor(muid: UUID, frequency: FiniteDuration, targets: List[Target]): MessageBus => Unit = { | ||
publish(MonitorStart(topic, muid, frequency, targets)) | ||
def startMonitor(muid: UUID, frequency: FiniteDuration, targets: List[Target], aggFunction: Seq[Power] => Power): MessageBus => Unit = { | ||
publish(MonitorStart(topic, muid, frequency, targets, aggFunction)) | ||
} | ||
|
||
def stopMonitor(muid: UUID): MessageBus => Unit = { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* This software is licensed under the GNU Affero General Public License, quoted below. | ||
* | ||
* This file is a part of PowerAPI. | ||
* | ||
* Copyright (C) 2011-2014 Inria, University of Lille 1. | ||
* | ||
* PowerAPI is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as | ||
* published by the Free Software Foundation, either version 3 of | ||
* the License, or (at your option) any later version. | ||
* | ||
* PowerAPI is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with PowerAPI. | ||
* | ||
* If not, please consult http://www.gnu.org/licenses/agpl-3.0.html. | ||
*/ | ||
package org.powerapi.core.power | ||
|
||
object Power { | ||
def apply(value: Double, unit: PowerUnit): Power = new RawPower(value, unit) | ||
def apply(value: Double, unit: String): Power = new RawPower(value, PowerUnitSystem(unit)) | ||
|
||
/** | ||
* The natural ordering of powers matches the natural ordering for Double. | ||
*/ | ||
implicit object PowerIsOrdered extends Ordering[Power] { | ||
def compare(a: Power, b: Power) = a compare b | ||
} | ||
} | ||
|
||
trait Power extends Ordered[Power] { | ||
def value: Double | ||
def unit: PowerUnit | ||
def toMilliWatts: Double | ||
def toWatts: Double | ||
def toKiloWatts: Double | ||
def toMegaWatts: Double | ||
def toUnit(unit: PowerUnit): Double | ||
def +(other: Power): Power | ||
def -(other: Power): Power | ||
def *(factor: Double): Power | ||
def /(divisor: Double): Power | ||
def min(other: Power): Power = if (this < other) this else other | ||
def max(other: Power): Power = if (this > other) this else other | ||
|
||
// Java API | ||
def div(divisor: Double) = this / divisor | ||
def gt(other: Power) = this > other | ||
def gteq(other: Power) = this >= other | ||
def lt(other: Power) = this < other | ||
def lteq(other: Power) = this <= other | ||
def minus(other: Power) = this - other | ||
def mul(factor: Double) = this * factor | ||
def plus(other: Power) = this + other | ||
} | ||
|
||
object RawPower { | ||
|
||
implicit object RawPowerIsOrdered extends Ordering[RawPower] { | ||
def compare(a: RawPower, b: RawPower) = a compare b | ||
} | ||
|
||
def apply(value: Double, unit: PowerUnit) = new RawPower(value, unit) | ||
def apply(value: Double, unit: String) = new RawPower(value, PowerUnitSystem(unit)) | ||
|
||
// limit on abs. value of powers in their units | ||
private final val max_mw = Double.MaxValue | ||
private final val max_w = max_mw / 1000.0 | ||
private final val max_kw = max_w / 1000.0 | ||
private final val max_Mw = max_kw / 1000.0 | ||
} | ||
|
||
/** | ||
* Defines a power value. | ||
* | ||
* @author Loïc Huertas <[email protected]> | ||
* @author Romain Rouvoy <[email protected]> | ||
*/ | ||
final class RawPower(val value: Double, val unit: PowerUnit) extends Power { | ||
import org.apache.logging.log4j.LogManager | ||
import RawPower._ | ||
|
||
private val log = LogManager.getLogger | ||
|
||
private[this] def bounded(max: Double) = 0.0 <= value && value <= max | ||
|
||
require(unit match { | ||
case MILLIWATTS => bounded(max_mw) | ||
case WATTS => bounded(max_w) | ||
case KILOWATTS => bounded(max_kw) | ||
case MEGAWATTS => bounded(max_Mw) | ||
case _ => | ||
val v = MEGAWATTS.convert(value, unit) | ||
0.0 <= v && v <= max_Mw | ||
}, "Power value is limited to 1.79e308 mW and cannot be negative") | ||
|
||
def toMilliWatts = unit.toMilliWatts(value) | ||
def toWatts = unit.toWatts(value) | ||
def toKiloWatts = unit.toKiloWatts(value) | ||
def toMegaWatts = unit.toMegaWatts(value) | ||
def toUnit(u: PowerUnit) = toMilliWatts / MILLIWATTS.convert(1, u) | ||
|
||
override def toString() = s"$value $unit" | ||
|
||
def compare(other: Power) = toMilliWatts compare other.toMilliWatts | ||
|
||
private[this] def safeAdd(a: Double, b: Double): Double = { | ||
if ((b > 0.0) && (a > Double.MaxValue - b)) throw new IllegalArgumentException("double overflow") | ||
if ((b < 0.0) && (a < -b)) throw new IllegalArgumentException("negative power cannot exists") | ||
a + b | ||
} | ||
private[this] def add(otherValue: Double, otherUnit: PowerUnit): Power = { | ||
val commonUnit = if (otherUnit.convert(1, unit) < 1.0) unit else otherUnit | ||
val resultValue = safeAdd(commonUnit.convert(value, unit), commonUnit.convert(otherValue, otherUnit)) | ||
new RawPower(resultValue, commonUnit) | ||
} | ||
|
||
def +(other: Power) = add(other.value, other.unit) | ||
def -(other: Power) = add(-other.value, other.unit) | ||
|
||
private[this] def safeMul(a: Double): Double = { | ||
if (a.isInfinite) throw new IllegalArgumentException("multiplication's result is an infinite value") | ||
if (a.isNaN) throw new IllegalArgumentException("multiplication's result is an undefined value") | ||
if (a > Double.MaxValue) throw new IllegalArgumentException("double overflow") | ||
if (a < 0.0) throw new IllegalArgumentException("negative power cannot exists") | ||
a | ||
} | ||
|
||
def *(factor: Double) = new RawPower({ | ||
if (factor.isInfinite || factor.isNaN) throw new IllegalArgumentException("factor must be a finite and defined value") | ||
else safeMul(value * factor) | ||
}, unit | ||
) | ||
def /(divisor: Double) = new RawPower({ | ||
if (divisor.isInfinite || divisor.isNaN) throw new IllegalArgumentException("divisor must be a finite and defined value") | ||
else safeMul(value / divisor) | ||
}, unit | ||
) | ||
|
||
override def equals(other: Any) = other match { | ||
case x: RawPower => toMilliWatts == x.toMilliWatts | ||
case _ => super.equals(other) | ||
} | ||
} | ||
|
Oops, something went wrong.