Skip to content

Commit

Permalink
boostrap green score extension
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Aug 4, 2023
1 parent 1fe6bea commit f32dc2c
Show file tree
Hide file tree
Showing 15 changed files with 799 additions and 63 deletions.
6 changes: 3 additions & 3 deletions otoroshi/app/gateway/errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object Errors {
duration = duration,
overhead = overhead,
cbDuration = cbDuration,
overheadWoCb = overhead - cbDuration,
overheadWoCb = Math.abs(overhead - cbDuration),
callAttempts = callAttempts,
url = url,
method = req.method,
Expand Down Expand Up @@ -204,7 +204,7 @@ object Errors {
duration = duration,
overhead = overhead,
cbDuration = cbDuration,
overheadWoCb = overhead - cbDuration,
overheadWoCb = Math.abs(overhead - cbDuration),
callAttempts = callAttempts,
url = url,
method = req.method,
Expand Down Expand Up @@ -260,7 +260,7 @@ object Errors {
duration = duration,
overhead = overhead,
cbDuration = cbDuration,
overheadWoCb = overhead - cbDuration,
overheadWoCb = Math.abs(overhead - cbDuration),
callAttempts = callAttempts,
url = s"${req.theProtocol}://${req.theHost}${req.relativeUri}",
method = req.method,
Expand Down
2 changes: 1 addition & 1 deletion otoroshi/app/gateway/http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ class HttpHandler()(implicit env: Env) {
duration = duration,
overhead = overhead,
cbDuration = cbDuration,
overheadWoCb = overhead - cbDuration,
overheadWoCb = Math.abs(overhead - cbDuration),
callAttempts = callAttempts,
url = url,
method = req.method,
Expand Down
2 changes: 1 addition & 1 deletion otoroshi/app/gateway/websockets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class WebSocketHandler()(implicit env: Env) {
duration = duration,
overhead = overhead,
cbDuration = cbDuration,
overheadWoCb = overhead - cbDuration,
overheadWoCb = Math.abs(overhead - cbDuration),
callAttempts = callAttempts,
url = url,
method = req.method,
Expand Down
117 changes: 117 additions & 0 deletions otoroshi/app/greenscore/ecometrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package otoroshi.greenscore

import com.codahale.metrics.UniformReservoir
import otoroshi.env.Env
import otoroshi.utils.cache.types.UnboundedTrieMap
import play.api.Logger
import play.api.libs.json.JsValue

import java.util.{Timer => _}
import scala.collection.concurrent.TrieMap

class GlobalScore {
private val backendsScore: UnboundedTrieMap[String, BackendScore] = TrieMap.empty
private val routesScore: UnboundedTrieMap[String, RouteScore] = TrieMap.empty

def updateBackend(backendId: String, dataIn: Long,
dataOut: Long,
headers: Long,
headersOut: Long) = {
backendsScore.getOrElseUpdate(backendId, new BackendScore()).update(dataIn, dataOut, headers, headersOut)
}

def updateRoute(routeId: String, overhead: Long,
overheadWithoutCircuitBreaker: Long,
circuitBreakerDuration: Long,
duration: Long,
plugins: Int) = {
routesScore.getOrElseUpdate(routeId, new RouteScore())
.update(overhead, overheadWithoutCircuitBreaker, circuitBreakerDuration, duration, plugins)
}

def compute(): Double = {
backendsScore.values.foldLeft(0.0) { case (acc, item) => acc + item.compute() } +
routesScore.values.foldLeft(0.0) { case (acc, item) => acc + item.compute() }
}
}

class RouteScore {
private val overheadReservoir: UniformReservoir = new UniformReservoir()
private val overheadWithoutCircuitBreakerReservoir: UniformReservoir = new UniformReservoir()
private val circuitBreakerDurationReservoir: UniformReservoir = new UniformReservoir()
private val durationReservoir: UniformReservoir = new UniformReservoir()
private val pluginsReservoir: UniformReservoir = new UniformReservoir()

def update(overhead: Long,
overheadWithoutCircuitBreaker: Long,
circuitBreakerDuration: Long,
duration: Long,
plugins: Int) = {
overheadReservoir.update(overhead)
overheadWithoutCircuitBreakerReservoir.update(overheadWithoutCircuitBreaker)
circuitBreakerDurationReservoir.update(circuitBreakerDuration)
durationReservoir.update(duration)
pluginsReservoir.update(plugins)
}

def compute(): Double = {
overheadReservoir.getSnapshot.getMean +
overheadWithoutCircuitBreakerReservoir.getSnapshot.getMean +
circuitBreakerDurationReservoir.getSnapshot.getMean +
durationReservoir.getSnapshot.getMean +
pluginsReservoir.getSnapshot.getMean
}
}

class BackendScore {
private val dataInReservoir: UniformReservoir = new UniformReservoir()
private val headersOutReservoir: UniformReservoir = new UniformReservoir()
private val dataOutReservoir: UniformReservoir = new UniformReservoir()
private val headersReservoir: UniformReservoir = new UniformReservoir()

def update(dataIn: Long,
dataOut: Long,
headers: Long,
headersOut: Long) = {
dataInReservoir.update(dataIn)
dataOutReservoir.update(dataOut)
headersReservoir.update(headers)
headersOutReservoir.update(headersOut)
}

def compute(): Double = {
dataInReservoir.getSnapshot.getMean +
dataOutReservoir.getSnapshot.getMean +
headersReservoir.getSnapshot.getMean +
headersOutReservoir.getSnapshot.getMean
}
}

class EcoMetrics(env: Env) {

private implicit val ev = env
private implicit val ec = env.otoroshiExecutionContext

private val logger = Logger("otoroshi-eco-metrics")

private val registry = new GlobalScore()

def compute() = registry.compute()

def updateBackend(backendId: String,
dataIn: Long,
dataOut: Long,
headers: Long,
headersOut: Long) = {
registry.updateBackend(backendId, dataIn, dataOut, headers, headersOut)
}

def updateRoute(routeId: String,
overhead: Long,
overheadWoCb: Long,
cbDuration: Long,
duration: Long,
plugins: Int) = {
registry.updateRoute(routeId, overhead, overheadWoCb, cbDuration, duration, plugins)
}
}
183 changes: 183 additions & 0 deletions otoroshi/app/greenscore/extension.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package otoroshi.greenscore

import akka.actor.{Actor, ActorRef, Props}
import otoroshi.api.{GenericResourceAccessApiWithState, Resource, ResourceVersion}
import otoroshi.env.Env
import otoroshi.events.{GatewayEvent, OtoroshiEvent}
import otoroshi.models.{EntityLocation, EntityLocationSupport}
import otoroshi.next.extensions.{AdminExtension, AdminExtensionEntity, AdminExtensionId}
import otoroshi.storage.{BasicStore, RedisLike, RedisLikeStore}
import otoroshi.utils.cache.types.UnboundedTrieMap
import otoroshi.utils.syntax.implicits._
import play.api.Logger
import play.api.libs.json._
import play.api.mvc.Results

import scala.concurrent.Future
import scala.util._

object OtoroshiEventListener {
def props(ext: GreenScoreExtension, env: Env) = Props(new OtoroshiEventListener(ext, env))
}

class OtoroshiEventListener(ext: GreenScoreExtension, env: Env) extends Actor {
override def receive: Receive = {
case evt: GatewayEvent => {
val routeId = evt.route.map(_.id).getOrElse(evt.`@serviceId`)
ext.ecoMetrics.updateRoute(
routeId = routeId,
overhead = evt.overhead,
overheadWoCb = evt.overheadWoCb,
cbDuration = evt.cbDuration,
duration = evt.duration,
plugins = evt.route.map(_.plugins.slots.size).getOrElse(0),
)
ext.ecoMetrics.updateBackend(
backendId = evt.target.scheme + evt.target.host + evt.target.uri,
dataIn = evt.data.dataIn,
dataOut = evt.data.dataOut,
headers = evt.headers.foldLeft(0L) { case (acc, item) =>
acc + item.key.byteString.size + item.value.byteString.size + 3 // 3 = ->
} + evt.method.byteString.size + evt.url.byteString.size + evt.protocol.byteString.size + 2,
headersOut = evt.headersOut.foldLeft(0L) { case (acc, item) =>
acc + item.key.byteString.size + item.value.byteString.size + 3 // 3 = ->
} + evt.protocol.byteString.size + 1 + 3 + Results.Status(evt.status).header.reasonPhrase.map(_.byteString.size).getOrElse(0)
)
ext.logger.debug(s"global score for ${routeId}: ${ext.ecoMetrics.compute()}")
}
case _ =>
}
}

case class GreenScoreEntity(
location: EntityLocation,
id: String,
name: String,
description: String,
tags: Seq[String],
metadata: Map[String, String],
routes: Seq[String],
config: GreenScoreConfig,
) extends EntityLocationSupport {
override def internalId: String = id
override def json: JsValue = GreenScoreEntity.format.writes(this)
override def theName: String = name
override def theDescription: String = description
override def theTags: Seq[String] = tags
override def theMetadata: Map[String, String] = metadata
}

object GreenScoreEntity {
val format = new Format[GreenScoreEntity] {
override def writes(o: GreenScoreEntity): JsValue = o.location.jsonWithKey ++ Json.obj(
"id" -> o.id,
"name" -> o.name,
"description" -> o.description,
"metadata" -> o.metadata,
"tags" -> JsArray(o.tags.map(JsString.apply)),
"routes" -> JsArray(o.routes.map(JsString.apply)),
"config" -> o.config.json,
)

override def reads(json: JsValue): JsResult[GreenScoreEntity] = Try {
GreenScoreEntity(
location = otoroshi.models.EntityLocation.readFromKey(json),
id = (json \ "id").as[String],
name = (json \ "name").as[String],
description = (json \ "description").as[String],
metadata = (json \ "metadata").asOpt[Map[String, String]].getOrElse(Map.empty),
tags = (json \ "tags").asOpt[Seq[String]].getOrElse(Seq.empty[String]),
routes = json.select("routes").asOpt[Seq[String]].getOrElse(Seq.empty),
config = json.select("config").asOpt[JsObject].map(v => GreenScoreConfig.format.reads(v).get).get,
)
} match {
case Failure(ex) => JsError(ex.getMessage)
case Success(value) => JsSuccess(value)
}
}
}

trait GreenScoreDataStore extends BasicStore[GreenScoreEntity]

class KvGreenScoreDataStore(extensionId: AdminExtensionId, redisCli: RedisLike, _env: Env)
extends GreenScoreDataStore
with RedisLikeStore[GreenScoreEntity] {
override def fmt: Format[GreenScoreEntity] = GreenScoreEntity.format
override def redisLike(implicit env: Env): RedisLike = redisCli
override def key(id: String): String = s"${_env.storageRoot}:extensions:${extensionId.cleanup}:greenscores:$id"
override def extractId(value: GreenScoreEntity): String = value.id
}

class GreenScoreAdminExtensionDatastores(env: Env, extensionId: AdminExtensionId) {
val greenscoresDatastore: GreenScoreDataStore = new KvGreenScoreDataStore(extensionId, env.datastores.redis, env)
}

class GreenScoreAdminExtensionState(env: Env) {

private val greenScores = new UnboundedTrieMap[String, GreenScoreEntity]()

def greenScore(id: String): Option[GreenScoreEntity] = greenScores.get(id)
def allGreenScores(): Seq[GreenScoreEntity] = greenScores.values.toSeq

private[greenscore] def updateGreenScores(values: Seq[GreenScoreEntity]): Unit = {
greenScores.addAll(values.map(v => (v.id, v))).remAll(greenScores.keySet.toSeq.diff(values.map(_.id)))
}
}

class GreenScoreExtension(val env: Env) extends AdminExtension {

private[greenscore] val logger = Logger("otoroshi-extension-green-score")
private[greenscore] val ecoMetrics = new EcoMetrics(env)
private val listener: ActorRef = env.analyticsActorSystem.actorOf(OtoroshiEventListener.props(this, env))
private lazy val datastores = new GreenScoreAdminExtensionDatastores(env, id)
private lazy val states = new GreenScoreAdminExtensionState(env)

override def id: AdminExtensionId = AdminExtensionId("otoroshi.extensions.GreenScore")

override def enabled: Boolean = env.isDev || configuration.getOptional[Boolean]("enabled").getOrElse(false)

override def name: String = "Green Score"

override def description: Option[String] = None

override def start(): Unit = {
env.analyticsActorSystem.eventStream.subscribe(listener, classOf[OtoroshiEvent])
}

override def stop(): Unit = {
env.analyticsActorSystem.eventStream.unsubscribe(listener)
}

override def syncStates(): Future[Unit] = {
implicit val ec = env.otoroshiExecutionContext
implicit val ev = env
for {
scores <- datastores.greenscoresDatastore.findAll()
} yield {
states.updateGreenScores(scores)
()
}
}

override def entities(): Seq[AdminExtensionEntity[EntityLocationSupport]] = {
Seq(
AdminExtensionEntity(
Resource(
"GreenScore",
"green-scores",
"green-score",
"green-score.extensions.otoroshi.io",
ResourceVersion("v1", true, false, true),
GenericResourceAccessApiWithState[GreenScoreEntity](
GreenScoreEntity.format,
id => datastores.greenscoresDatastore.key(id),
c => datastores.greenscoresDatastore.extractId(c),
stateAll = () => states.allGreenScores(),
stateOne = id => states.greenScore(id),
stateUpdate = values => states.updateGreenScores(values),
)
)
)
)
}
}
Loading

0 comments on commit f32dc2c

Please sign in to comment.