Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release scala-3 versions of kamon-akka, kamon-pekko and kamon-pekko-http #1295

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.settings(crossScalaVersions += `scala_3_version`)
.dependsOn(
`kamon-scala-future` % "compile,common,akka-2.5,akka-2.6",
`kamon-testkit` % "test,test-common,test-akka-2.5,test-akka-2.6"
Expand Down Expand Up @@ -498,6 +499,9 @@ lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko"))
.settings(instrumentationSettings: _*)
.settings(Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoHttpVersion % "provided"
)
))
.dependsOn(
`kamon-scala-future` % "compile",
Expand All @@ -511,7 +515,6 @@ lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings)
.settings(Seq(
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test",
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
Expand Down
55 changes: 40 additions & 15 deletions instrumentation/kamon-akka/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Def.Initialize

val `Akka-2.4-version` = "2.4.20"
val `Akka-2.5-version` = "2.5.32"
val `Akka-2.6-version` = "2.6.20"
val `Akka-2.6-version` = "2.6.21"

/**
* Compile Configurations
Expand Down Expand Up @@ -31,7 +31,7 @@ configs(
// The Common configuration should always depend on the latest version of Akka. All code in the Common configuration
// should be source compatible with all Akka versions.
inConfig(Common)(Defaults.compileSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`)
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version)
))

libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else Seq(
Expand All @@ -50,7 +50,7 @@ libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else


inConfig(`Compile-Akka-2.6`)(Defaults.compileSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
sources := joinSources(Common, `Compile-Akka-2.6`).value
))

Expand All @@ -73,7 +73,7 @@ inConfig(`Compile-Akka-2.5`)(Defaults.compileSettings ++ Seq(
sources := joinSources(Common, `Compile-Akka-2.5`).value
))

libraryDependencies ++= Seq(
libraryDependencies ++= {if (scalaVersion.value startsWith "3") Seq.empty else Seq(
kanelaAgent % `Compile-Akka-2.5`,
scalatest % `Test-Akka-2.5`,
logbackClassic % `Test-Akka-2.5`,
Expand All @@ -85,21 +85,28 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding" % `Akka-2.5-version` % `Compile-Akka-2.5`,
"com.typesafe.akka" %% "akka-protobuf" % `Akka-2.5-version` % `Compile-Akka-2.5`,
"com.typesafe.akka" %% "akka-testkit" % `Akka-2.5-version` % `Test-Akka-2.5`
)
)}

// Ensure that the packaged artifact contains the instrumentation for all Akka versions.
Compile / packageBin / mappings := Def.taskDyn {
if(scalaBinaryVersion.value == "2.11")
if(scalaBinaryVersion.value == "2.11") {
Def.task {
joinProducts((`Compile-Akka-2.5` / products).value) ++
joinProducts((Common / unmanagedResourceDirectories).value)
}
else
} else if (scalaVersion.value startsWith "3") {
Def.task {
joinProducts((`Compile-Akka-2.6` / products).value) ++
joinProducts((Common / unmanagedResourceDirectories).value)
}
} else {
Def.task {
joinProducts(
(`Compile-Akka-2.5` / products).value ++
(`Compile-Akka-2.6` / products).value
) ++ joinProducts((Common / unmanagedResourceDirectories).value)}
) ++ joinProducts((Common / unmanagedResourceDirectories).value)
}
}
}.value

// Ensure that the packaged sources contains the instrumentation for all Akka versions.
Expand All @@ -108,26 +115,38 @@ Compile / packageSrc / mappings := Def.taskDyn {
Def.task {
(`Compile-Akka-2.5` / packageSrc / mappings).value ++
(Common / packageSrc / mappings).value
}
} else if (scalaVersion.value startsWith "3") {
Def.task {
(`Compile-Akka-2.6` / packageSrc / mappings).value ++
(Common / packageSrc / mappings).value
}
} else
} else {
Def.task {
(`Compile-Akka-2.5` / packageSrc / mappings).value ++
(`Compile-Akka-2.6` / packageSrc / mappings).value ++
(Common / packageSrc / mappings).value
}
}
}.value

// Compile will return the compile analysis for the Common configuration but will run on all Akka configurations.
Compile / compile := Def.taskDyn {
if(scalaBinaryVersion.value == "2.11")
if(scalaBinaryVersion.value == "2.11") {
Def.task {
(`Compile-Akka-2.5` / compile).value
}
else
} else if (scalaVersion.value startsWith "3"){

Def.task {
(`Compile-Akka-2.6` / compile).value
}
} else {
Def.task {
(`Compile-Akka-2.5` / compile).value
(`Compile-Akka-2.6` / compile).value
}
}
}.value

exportJars := true
Expand All @@ -145,7 +164,7 @@ lazy val baseTestSettings = Seq(
)

inConfig(TestCommon)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`)
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version)
))

inConfig(`Test-Akka-2.5`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
Expand All @@ -155,20 +174,26 @@ inConfig(`Test-Akka-2.5`)(Defaults.testSettings ++ instrumentationSettings ++ ba
))

inConfig(`Test-Akka-2.6`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
sources := joinSources(TestCommon, `Test-Akka-2.6`).value,
unmanagedResourceDirectories ++= (Common / unmanagedResourceDirectories).value,
unmanagedResourceDirectories ++= (TestCommon / unmanagedResourceDirectories).value
))

Test / test := Def.taskDyn {
if(scalaBinaryVersion.value == "2.11")
if(scalaBinaryVersion.value == "2.11") {
Def.task {
(`Test-Akka-2.5` / test).value
}
else
} else if (scalaVersion.value startsWith "3") {
Def.task {
(`Test-Akka-2.6` / test).value
}
}
else {
Def.task {
(`Test-Akka-2.5` / test).value
(`Test-Akka-2.6` / test).value
}
}
}.value
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodEnter, OnMe

class ActorInstrumentation extends InstrumentationBuilder {

onType("akka.actor.dungeon.Dispatch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've caused a regression with Akka 2.5 with this one. It looks like this advice will need to be split between Akka versions

.advise(method("sendMessage").and(takesArguments(1)), SendMessageAdvice)

/**
* This is where most of the Actor processing magic happens. Handling of messages, errors and system messages.
*/
Expand All @@ -36,7 +39,6 @@ class ActorInstrumentation extends InstrumentationBuilder {
.advise(isConstructor, ActorCellConstructorAdvice)
.advise(method("invoke"), classOf[ActorCellInvokeAdvice])
.advise(method("handleInvokeFailure"), HandleInvokeFailureMethodAdvice)
.advise(method("sendMessage").and(takesArguments(1)), SendMessageAdvice)
.advise(method("terminate"), TerminateMethodAdvice)
.advise(method("swapMailbox"), ActorCellSwapMailboxAdvice)
.advise(method("invokeAll$1"), InvokeAllMethodInterceptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package kamon.instrumentation.akka.instrumentations
import akka.actor.{ActorSystem, DeadLetter, UnhandledMessage}
import kamon.instrumentation.akka.AkkaMetrics
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice
import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodExit, This}

import scala.annotation.static

class EventStreamInstrumentation extends InstrumentationBuilder {

/**
Expand All @@ -32,19 +35,20 @@ class EventStreamInstrumentation extends InstrumentationBuilder {
.advise(method("publish").and(takesArguments(1)), PublishMethodAdvice)
}


class ConstructorAdvice
object ConstructorAdvice {

@OnMethodExit(suppress = classOf[Throwable])
def exit(@This eventStream: HasSystem, @Argument(0) system:ActorSystem): Unit = {
@static def exit(@Advice.This eventStream: HasSystem, @Argument(0) system:ActorSystem): Unit = {
eventStream.setSystem(system)
}
}

class PublishMethodAdvice
object PublishMethodAdvice {

@OnMethodExit(suppress = classOf[Throwable])
def exit(@This stream:HasSystem, @Argument(0) event: AnyRef):Unit = event match {
@static def exit(@This stream:HasSystem, @Argument(0) event: AnyRef):Unit = event match {
case _: DeadLetter => AkkaMetrics.forSystem(stream.system.name).deadLetters.increment()
case _: UnhandledMessage => AkkaMetrics.forSystem(stream.system.name).unhandledMessages.increment()
case _ => ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@ import akka.routing._
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
import kamon.Kamon
import kamon.testkit.{InitAndStopKamonAfterAll, MetricInspection}
import kamon.tag.Lookups._
import kamon.testkit.{InitAndStopKamonAfterAll, MetricInspection}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._


class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrumentationSpec")) with AnyWordSpecLike
with BeforeAndAfterAll with ImplicitSender with Eventually with MetricInspection.Syntax with Matchers with InitAndStopKamonAfterAll {
implicit lazy val executionContext = system.dispatcher
implicit lazy val executionContext: ExecutionContext = system.dispatcher
import ContextTesting._

"the message passing instrumentation" should {
Expand All @@ -59,7 +60,7 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrum
}

"propagate the current context when using the ask pattern" in new EchoActorFixture {
implicit val timeout = Timeout(1 seconds)
implicit val timeout: Timeout = Timeout(1 seconds)
Kamon.runWithContext(testContext("propagate-with-ask")) {
// The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it.
(contextEchoActor ? "test") pipeTo (testActor)
Expand Down Expand Up @@ -122,11 +123,11 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrum
}

trait EchoActorFixture {
val contextEchoActor = system.actorOf(Props[ContextStringEcho])
val contextEchoActor: ActorRef = system.actorOf(Props[ContextStringEcho])
}

trait EchoSimpleRouterFixture {
val router = {
val router: Router = {
val routees = Vector.fill(5) {
val r = system.actorOf(Props[ContextStringEcho])
ActorRefRoutee(r)
Expand All @@ -136,22 +137,22 @@ class ActorCellInstrumentationSpec extends TestKit(ActorSystem("ActorCellInstrum
}

trait EchoPoolRouterFixture {
val pool = system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[ContextStringEcho]), "pool-router")
val pool: ActorRef = system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[ContextStringEcho]), "pool-router")
}

trait EchoGroupRouterFixture {
val routees = Vector.fill(5) {
val routees: Vector[ActorRef] = Vector.fill(5) {
system.actorOf(Props[ContextStringEcho])
}

val group = system.actorOf(RoundRobinGroup(routees.map(_.path.toStringWithoutAddress)).props(), "group-router")
val group: ActorRef = system.actorOf(RoundRobinGroup(routees.map(_.path.toStringWithoutAddress)).props(), "group-router")
}
}

class ContextStringEcho extends Actor {
import ContextTesting._

def receive = {
def receive: Receive = {
case _: String =>
sender ! Kamon.currentContext().getTag(plain(TestKey))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import kamon.instrumentation.akka.ContextTesting._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

class AskPatternInstrumentationSpec extends TestKit(ActorSystem("AskPatternInstrumentationSpec")) with AnyWordSpecLike
with InitAndStopKamonAfterAll with ImplicitSender {

implicit lazy val ec = system.dispatcher
implicit val askTimeout = Timeout(10 millis)
implicit lazy val ec: ExecutionContextExecutor = system.dispatcher
implicit val askTimeout: Timeout = Timeout(10 millis)

// TODO: Make this work with ActorSelections

Expand Down Expand Up @@ -93,7 +94,7 @@ class AskPatternInstrumentationSpec extends TestKit(ActorSystem("AskPatternInstr
}

class NoReply extends Actor {
def receive = {
def receive: Receive = {
case _ =>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Any
}

"not track Akka Streams actors" in {
implicit val timeout = Timeout(10 seconds)
implicit val timeout: Timeout = Timeout(10 seconds)
val actorWithMaterializer = system.actorOf(Props[ActorWithMaterializer])

val finishedStream = Kamon.runWithSpan(Kamon.serverSpanBuilder("wrapper", "test").start()) {
Expand Down Expand Up @@ -222,7 +222,7 @@ class TracingTestActor extends Actor {
}

class ActorWithMaterializer extends Actor {
implicit val mat = ActorMaterializer()
implicit val mat: Materializer = Materializer(context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs reverting because of Akka 2.5...


override def receive: Receive = {
case "stream" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import kamon.instrumentation.akka.ContextTesting._
import kamon.tag.Lookups._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.{AnyWordSpec, AnyWordSpecLike}
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal

class SystemMessageInstrumentationSpec extends TestKit(ActorSystem("ActorSystemMessageInstrumentationSpec")) with AnyWordSpecLike with Matchers
with BeforeAndAfterAll with ImplicitSender {
implicit lazy val executionContext = system.dispatcher
implicit lazy val executionContext: ExecutionContext = system.dispatcher

"the system message passing instrumentation" should {
"capture and propagate the current context while processing the Create message in top level actors" in {
Expand Down
Loading
Loading