From a675b17723f50c4517eaf0c634b753a9d7b8be14 Mon Sep 17 00:00:00 2001 From: Flavio Brasil Date: Fri, 10 Jan 2025 20:47:09 -0800 Subject: [PATCH] [aeron] introduce integration --- build.sbt | 17 + kyo-aeron/jvm/src/main/scala/kyo/Topic.scala | 261 ++++++++++++++ kyo-aeron/jvm/src/test/scala/kyo/Test.scala | 22 ++ .../jvm/src/test/scala/kyo/TopicTest.scala | 319 ++++++++++++++++++ .../shared/src/main/scala/kyo/Retry.scala | 7 +- .../shared/src/test/scala/kyo/RetryTest.scala | 26 ++ kyo-data/shared/src/main/scala/kyo/Tag.scala | 3 + .../shared/src/test/scala/kyo/TagTest.scala | 4 + 8 files changed, 656 insertions(+), 3 deletions(-) create mode 100644 kyo-aeron/jvm/src/main/scala/kyo/Topic.scala create mode 100644 kyo-aeron/jvm/src/test/scala/kyo/Test.scala create mode 100644 kyo-aeron/jvm/src/test/scala/kyo/TopicTest.scala diff --git a/build.sbt b/build.sbt index cdd4109b5..30724af88 100644 --- a/build.sbt +++ b/build.sbt @@ -111,6 +111,7 @@ lazy val kyoJVM = project `kyo-stats-otel`.jvm, `kyo-cache`.jvm, `kyo-reactive-streams`.jvm, + `kyo-aeron`.jvm, `kyo-sttp`.jvm, `kyo-tapir`.jvm, `kyo-caliban`.jvm, @@ -423,6 +424,22 @@ lazy val `kyo-reactive-streams` = ) .jvmSettings(mimaCheck(false)) +lazy val `kyo-aeron` = + crossProject(JVMPlatform) + .withoutSuffixFor(JVMPlatform) + .crossType(CrossType.Full) + .in(file("kyo-aeron")) + .dependsOn(`kyo-core`) + .settings( + `kyo-settings`, + libraryDependencies ++= Seq( + "io.aeron" % "aeron-driver" % "1.46.7", + "io.aeron" % "aeron-client" % "1.46.7", + "com.lihaoyi" %% "upickle" % "4.1.0" + ) + ) + .jvmSettings(mimaCheck(false)) + lazy val `kyo-sttp` = crossProject(JSPlatform, JVMPlatform, NativePlatform) .withoutSuffixFor(JVMPlatform) diff --git a/kyo-aeron/jvm/src/main/scala/kyo/Topic.scala b/kyo-aeron/jvm/src/main/scala/kyo/Topic.scala new file mode 100644 index 000000000..2541a663a --- /dev/null +++ b/kyo-aeron/jvm/src/main/scala/kyo/Topic.scala @@ -0,0 +1,261 @@ +package kyo + +import io.aeron.Aeron +import io.aeron.FragmentAssembler +import io.aeron.Publication +import io.aeron.driver.MediaDriver +import io.aeron.logbuffer.BufferClaim +import io.aeron.logbuffer.Header +import org.agrona.DirectBuffer +import scala.annotation.implicitNotFound +import scala.annotation.targetName +import scala.compiletime.* +import upickle.default.* + +/** High-performance publish-subscribe messaging for local and distributed systems. + * + * Topic provides reliable, typed messaging built on Aeron's efficient transport protocol. It excels at ultra-low latency inter-process + * communication (IPC) on the same machine through shared memory, while also supporting efficient UDP multicast for message distribution + * and reliable UDP unicast between remote services. + * + * Messages are automatically serialized and deserialized using upickle, requiring only a ReadWriter type class instance (aliased as + * [[Topic.AsMessage]]). The transport layer handles message fragmentation and flow control automatically. + * + * Publishing messages is done through [[Topic.publish]], which handles backpressure and connection management automatically. Subscribers + * use [[Topic.stream]] to receive typed messages with automatic reassembly and connection handling. + * + * Type safety is enforced by using the message type's Tag to generate unique Aeron stream IDs - this means each exact type gets its own + * channel, with no subtype polymorphism. A stream of a parent type cannot receive messages published as a subtype, and vice versa. + * + * @see + * [[https://aeron.io/]] for documentation on Aeron URIs and more. + * @see + * [[https://github.com/com-lihaoyi/upickle]] for documentation on serialization. + */ +opaque type Topic <: (Async & Env[Aeron]) = Async & Env[Aeron] + +object Topic: + + /** Exception indicating backpressure from the messaging system. + * + * Thrown when the system cannot immediately handle more messages and needs to apply backpressure for flow control. + */ + case class Backpressured()(using Frame) extends KyoException + + /** Type alias for upickle serialization. + * + * Messages must have a ReadWriter instance to be published or consumed. + */ + type AsMessage[A] = ReadWriter[A] + + /** Default retry schedule for handling backpressure scenarios. + */ + val defaultRetrySchedule = Schedule.linear(10.millis).min(Schedule.fixed(1.second)).jitter(0.2) + + /** Handles Topic with an embedded Aeron MediaDriver. + * + * Creates and manages the lifecycle of an embedded MediaDriver, ensuring proper cleanup through IO.ensure. + * + * @param v + * The computation requiring Topic capabilities + * @return + * The computation result within Async context + */ + def run[A: Flat, S](v: A < (Topic & S))(using Frame): A < (Async & S) = + IO { + val driver = MediaDriver.launchEmbedded() + IO.ensure(driver.close()) { + run(driver)(v) + } + } + + /** Handles Topic with a provided MediaDriver. + * + * Uses an existing MediaDriver instance, allowing for more control over the Aeron setup. The caller is responsible for closing the + * provided MediaDriver instance. + * + * @param driver + * The MediaDriver instance to use + * @param v + * The computation requiring Topic capabilities + * @return + * The computation result within Async context + */ + def run[A: Flat, S](driver: MediaDriver)(v: A < (Topic & S))(using Frame): A < (Async & S) = + IO { + val aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(driver.aeronDirectoryName())) + IO.ensure(aeron.close()) { + run(aeron)(v) + } + } + + /** Handles Topic with a provided Aeron instance. + * + * Directly uses an existing Aeron instance for maximum configuration flexibility. The caller is responsible for closing the provided + * Aeron instance. + * + * @param aeron + * The Aeron instance to use + * @param v + * The computation requiring Topic capabilities + * @return + * The computation result within Async context + */ + def run[A: Flat, S](aeron: Aeron)(v: A < (Topic & S))(using Frame): A < (Async & S) = + Env.run(aeron)(v) + + /** Publishes a stream of messages to a specified Aeron URI. + * + * Messages are published with automatic handling of backpressure and connection issues. The stream is typed and uses efficient binary + * serialization for message transport. + * + * @param uri + * The Aeron URI to publish to. Examples: + * - "aeron:ipc" for efficient inter-process communication on same machine + * - "aeron:udp?endpoint=localhost:40123" for UDP unicast + * - "aeron:udp?endpoint=224.1.1.1:40123|interface=192.168.1.1" for UDP multicast + * @param retrySchedule + * Schedule for retrying on backpressure + * @param stream + * The stream of messages to publish + * @tparam A + * The type of messages being published + * @tparam S + * Additional effects in the computation + * @return + * Unit wrapped in Topic effect with potential Closed or Backpressured aborts + */ + def publish[A: ReadWriter]( + aeronUri: String, + retrySchedule: Schedule = defaultRetrySchedule + )[S](stream: Stream[A, S])(using frame: Frame, tag: Tag[A]): Unit < (Topic & S & Abort[Closed | Backpressured]) = + Env.use[Aeron] { aeron => + IO { + // register the publication with Aeron using type's hash as stream ID + val publication = aeron.addPublication(aeronUri, tag.hash.abs) + + // reuse buffer claim to avoid allocations on hot path + val bufferClaim = new BufferClaim + + // cache backpressure failure for performance + val backpressured = Abort.fail(Backpressured()) + + // ensure publication is closed after use + IO.ensure(IO(publication.close())) { + stream.runForeachChunk { messages => + Retry[Backpressured](retrySchedule) { + IO { + if !publication.isConnected() then backpressured + else + // serialize messages with type tag for runtime verification + val bytes = writeBinary((tag.toString, messages)) + val result = publication.tryClaim(bytes.length, bufferClaim) + if result > 0 then + // write directly to claimed buffer region + val buffer = bufferClaim.buffer() + val offset = bufferClaim.offset() + buffer.putBytes(offset, bytes) + bufferClaim.commit() + else + result match + case Publication.BACK_PRESSURED => + // triggers a retry if the schedule allows + backpressured + case Publication.NOT_CONNECTED => + Abort.fail(Closed("Not connected", frame)) + case Publication.ADMIN_ACTION => + Abort.fail(Closed("Admin action", frame)) + case Publication.CLOSED => + Abort.fail(Closed("Publication closed", frame)) + case _ => + Abort.fail(Closed(s"Unknown error: $result", frame)) + end if + } + } + } + } + } + } + + /** Creates a stream of messages from a specified Aeron URI. + * + * Subscribes to messages with automatic handling of backpressure and connection issues. Messages are typed and automatically + * deserialized from binary format. The stream automatically reassembles fragmented messages, verifies message types match the expected + * type, handles connection issues with configurable retry behavior, and cleans up resources when closed. + * + * @param uri + * The Aeron URI to subscribe to. Examples: + * - "aeron:ipc" for efficient inter-process communication on same machine + * - "aeron:udp?endpoint=localhost:40123" for UDP unicast + * - "aeron:udp?endpoint=224.1.1.1:40123|interface=192.168.1.1" for UDP multicast + * @param retrySchedule + * Schedule for retrying on backpressure + * @tparam A + * The type of messages to receive + * @return + * A stream of messages within Topic effect with potential Backpressured aborts + */ + def stream[A: ReadWriter]( + aeronUri: String, + retrySchedule: Schedule = defaultRetrySchedule + )(using tag: Tag[A], frame: Frame): Stream[A, Topic & Abort[Backpressured]] = + Stream { + Env.use[Aeron] { aeron => + IO { + // register subscription with Aeron using type's hash as stream ID + val subscription = aeron.addSubscription(aeronUri, tag.hash.abs) + + // cache backpressure failure for performance + val backpressured = Abort.fail(Backpressured()) + + // temporary storage for reassembled message + var result: Maybe[(String, Chunk[A])] = Absent + + // handler that reassembles message fragments + val handler = + new FragmentAssembler((buffer: DirectBuffer, offset: Int, length: Int, header: Header) => + val bytes = new Array[Byte](length) + buffer.getBytes(offset, bytes) + result = Maybe(readBinary[(String, Chunk[A])](bytes)) + ) + + // ensure subscription is closed after use + IO.ensure(IO(subscription.close())) { + def loop(): Unit < (Emit[Chunk[A]] & Async & Abort[Backpressured]) = + Retry[Backpressured](retrySchedule) { + IO { + if !subscription.isConnected() then backpressured + else + // clear previous result before polling + result = Absent + val fragmentsRead = subscription.poll(handler, 1) + if fragmentsRead == 0 then + backpressured + else + result match + case Present((tag2, messages)) => + // verify message type matches expected type + if tag2 != tag.toString then + Abort.panic( + new IllegalStateException( + s"Expected messages of type ${tag.show} but got ${tag2}" + ) + ) + else + result = Absent + Emit.valueWith(messages)(loop()) + end if + case Absent => + Abort.panic(new IllegalStateException(s"No results")) + end match + end if + } + } + end loop + loop() + } + } + } + } + end stream +end Topic diff --git a/kyo-aeron/jvm/src/test/scala/kyo/Test.scala b/kyo-aeron/jvm/src/test/scala/kyo/Test.scala new file mode 100644 index 000000000..fb3b2ceae --- /dev/null +++ b/kyo-aeron/jvm/src/test/scala/kyo/Test.scala @@ -0,0 +1,22 @@ +package kyo + +import kyo.internal.BaseKyoCoreTest +import kyo.kernel.Platform +import org.scalatest.NonImplicitAssertions +import org.scalatest.Tag +import org.scalatest.freespec.AsyncFreeSpec +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +abstract class Test extends AsyncFreeSpec with NonImplicitAssertions with BaseKyoCoreTest: + + private def runWhen(cond: => Boolean) = if cond then "" else "org.scalatest.Ignore" + object jvmOnly extends Tag(runWhen(kyo.kernel.Platform.isJVM)) + object jsOnly extends Tag(runWhen(kyo.kernel.Platform.isJS)) + + type Assertion = org.scalatest.Assertion + def assertionSuccess = succeed + def assertionFailure(msg: String) = fail(msg) + + override given executionContext: ExecutionContext = Platform.executionContext +end Test diff --git a/kyo-aeron/jvm/src/test/scala/kyo/TopicTest.scala b/kyo-aeron/jvm/src/test/scala/kyo/TopicTest.scala new file mode 100644 index 000000000..f2de77748 --- /dev/null +++ b/kyo-aeron/jvm/src/test/scala/kyo/TopicTest.scala @@ -0,0 +1,319 @@ +package kyo + +import java.net.DatagramSocket +import java.net.InetSocketAddress +import kyo.debug.Debug + +class TopicTest extends Test: + + case class Message(value: Int) derives CanEqual, Topic.AsMessage + case class GenericMessage[A](value: A) derives CanEqual, Topic.AsMessage + case class ComplexMessage(id: Int, items: List[String]) derives CanEqual, Topic.AsMessage + + val failSchedule = Schedule.fixed(1.millis).take(3) + + def freePort() = + val socket = new DatagramSocket(null) + try + socket.bind(new InetSocketAddress("localhost", 0)) + socket.getLocalPort() + finally + socket.close() + end try + end freePort + + Seq( + "aeron:ipc", + s"aeron:udp?endpoint=127.0.0.1:${freePort()}" + ).foreach { uri => + s"with uri $uri" - { + + "basic publish/subscribe" - { + "single message type" in run { + val messages = Seq(Message(1), Message(2), Message(3)) + Topic.run { + for + started <- Latch.init(1) + fiber <- Async.run(started.release.andThen(Topic.stream[Message](uri).take(messages.size).run)) + _ <- started.await + _ <- Async.sleep(1.second) + _ <- Async.run(Topic.publish(uri)(Stream.init(messages))) + received <- fiber.get + yield assert(received == messages) + } + } + + "multiple message types" in run { + val strings = Seq("hello", "world") + val ints = Seq(42, 43) + Topic.run { + for + started <- Latch.init(2) + stringFiber <- Async.run(started.release.andThen(Topic.stream[String](uri).take(strings.size).run)) + intFiber <- Async.run(started.release.andThen(Topic.stream[Int](uri).take(ints.size).run)) + _ <- started.await + _ <- Async.run(Topic.publish(uri)(Stream.init(strings))) + _ <- Async.run(Topic.publish(uri)(Stream.init(ints))) + stringResults <- stringFiber.get + intResults <- intFiber.get + yield + assert(stringResults == strings) + assert(intResults == ints) + } + } + + "generic message types" in run { + val strMessages = Seq(GenericMessage("hello"), GenericMessage("world")) + val intMessages = Seq(GenericMessage(1), GenericMessage(2)) + + Topic.run { + for + started <- Latch.init(2) + strFiber <- + Async.run(started.release.andThen(Topic.stream[GenericMessage[String]](uri).take(strMessages.size).run)) + intFiber <- + Async.run(started.release.andThen(Topic.stream[GenericMessage[Int]](uri).take(intMessages.size).run)) + _ <- started.await + _ <- Async.run(Topic.publish(uri)(Stream.init(strMessages))) + _ <- Async.run(Topic.publish(uri)(Stream.init(intMessages))) + strResults <- strFiber.get + intResults <- intFiber.get + yield + assert(strResults == strMessages) + assert(intResults == intMessages) + } + } + + "complex message types" in run { + val messages = Seq( + ComplexMessage(1, List("a", "b")), + ComplexMessage(2, List("c", "d")) + ) + Topic.run { + for + started <- Latch.init(1) + fiber <- Async.run(started.release.andThen(Topic.stream[ComplexMessage](uri).take(messages.size).run)) + _ <- started.await + _ <- Async.run(Topic.publish(uri)(Stream.init(messages))) + received <- fiber.get + yield assert(received == messages) + } + } + } + + "multiple subscribers" - { + "fan-out to multiple subscribers" in run { + val messages = Seq(Message(1), Message(2), Message(3)) + + Topic.run { + for + started <- Latch.init(2) + fiber1 <- Async.run(started.release.andThen(Topic.stream[Message](uri).take(messages.size).run)) + fiber2 <- Async.run(started.release.andThen(Topic.stream[Message](uri).take(messages.size).run)) + _ <- started.await + _ <- Async.run(Topic.publish(uri)(Stream.init(messages))) + result1 <- fiber1.get + result2 <- fiber2.get + yield + assert(result1 == messages) + assert(result2 == messages) + } + } + + "subscribers with different consumption rates" in run { + val messages = Seq(Message(1), Message(2)) + + Topic.run { + for + started <- Latch.init(2) + slowFiber <- + Async.run(started.release.andThen( + Topic.stream[Message](uri) + .map(r => Async.delay(1.millis)(r)) + .take(messages.size) + .run + )) + fastFiber <- + Async.run(started.release.andThen( + Topic.stream[Message](uri) + .take(messages.size) + .run + )) + _ <- started.await + _ <- Async.run(Topic.publish(uri)(Stream.init(messages))) + slow <- slowFiber.get + fast <- fastFiber.get + yield + assert(slow == messages) + assert(fast == messages) + } + } + } + + "subtyping" - { + sealed trait Base derives Topic.AsMessage + case class Derived1(value: Int) extends Base derives CanEqual, Topic.AsMessage + case class Derived2(value: String) extends Base derives CanEqual, Topic.AsMessage + + "base type cannot receive subtypes" in run { + val messages = Seq(Derived1(1), Derived1(2)) + + Topic.run { + for + started <- Latch.init(1) + fiber <- Async.run(started.release.andThen(Topic.stream[Base](uri, failSchedule).run)) + _ <- started.await + result1 <- Abort.run(Topic.publish(uri, failSchedule)(Stream.init(messages))) + result2 <- fiber.getResult + yield assert(result1.isFailure && result2.isFailure) + } + } + + "subtype cannot receive base type" in run { + val messages = Seq[Base](Derived1(1), Derived1(2)) + + Topic.run { + for + started <- Latch.init(1) + fiber <- Async.run(started.release.andThen(Topic.stream[Derived1](uri, failSchedule).run)) + _ <- started.await + result1 <- Abort.run(Topic.publish(uri, failSchedule)(Stream.init(messages))) + result2 <- fiber.getResult + yield assert(result1.isFailure && result2.isFailure) + } + } + + "generic base type cannot receive generic subtype" in run { + sealed trait GenericBase[A] derives Topic.AsMessage + case class GenericDerived[A](value: A) extends GenericBase[A] derives CanEqual, Topic.AsMessage + + val messages = Seq(GenericDerived("test")) + + Topic.run { + for + started <- Latch.init(1) + fiber <- Async.run(started.release.andThen(Topic.stream[GenericBase[String]](uri, failSchedule).run)) + _ <- started.await + result1 <- Abort.run(Topic.publish(uri, failSchedule)(Stream.init(messages))) + result2 <- fiber.getResult + yield assert(result1.isFailure && result2.isFailure) + } + } + + "different subtypes maintain separation" in run { + val messages1 = Seq(Derived1(1), Derived1(2)) + val messages2 = Seq(Derived2("a"), Derived2("b")) + + Topic.run { + for + started <- Latch.init(2) + fiber1 <- Async.run(started.release.andThen(Topic.stream[Derived1](uri).take(messages1.size).run)) + fiber2 <- Async.run(started.release.andThen(Topic.stream[Derived2](uri).take(messages2.size).run)) + _ <- started.await + _ <- Async.run(Topic.publish(uri)(Stream.init(messages1))) + _ <- Async.run(Topic.publish(uri)(Stream.init(messages2))) + results1 <- fiber1.get + results2 <- fiber2.get + yield + assert(results1 == messages1) + assert(results2 == messages2) + } + } + } + + "maintains message order for large batches (pendingUntilFixed)" in run { + val count = 200 + val messages = Seq.tabulate(count)(Message(_)) + Topic.run { + for + started <- Latch.init(1) + fiber <- Async.run(started.release.andThen(Topic.stream[Message](uri).take(count).run)) + _ <- started.await + result <- Abort.run(Topic.publish[Message](uri)(Stream.init(messages))) + yield + if result.isPanic then + pending + else + fail("Pending test should have failed") + } + } + + "handles empty streams" in run { + Topic.run { + for + started <- Latch.init(1) + fiber <- Async.run(started.release.andThen(Topic.stream[Message](uri, failSchedule).take(1).run)) + _ <- started.await + _ <- Topic.publish[Message](uri, failSchedule)(Stream.empty) + result <- fiber.getResult + yield assert(result.isFailure) + } + } + + "partial subscriber failure" in run { + val messageCount = 10 + val messages = (0 until messageCount).map(Message(_)) + + Topic.run { + for + started <- Latch.init(2) + failingFiber <- Async.run( + started.release.andThen( + Topic.stream[Message](uri) + .map(_ => Abort.fail("Planned failure"): Message < Abort[String]) + .take(messageCount) + .run + ) + ) + normalFiber <- Async.run( + started.release.andThen( + Topic.stream[Message](uri) + .take(messageCount) + .run + ) + ) + _ <- started.await + _ <- Topic.publish(uri)(Stream.init(messages)) + failingResult <- failingFiber.getResult + normalResult <- normalFiber.get + yield + assert(failingResult.isFailure) + assert(normalResult == messages) + } + } + + "isolation" - { + "publisher without subscribers" in run { + Topic.run { + for + result <- Abort.run(Topic.publish(uri, failSchedule)(Stream.init(Seq(Message(1))))) + yield assert(result.isFailure) + } + } + + "subscriber without publisher" in run { + Topic.run { + for + fiber <- Async.run(Topic.stream[Message](uri, failSchedule).take(1).run) + result <- fiber.getResult + yield assert(result.isFailure) + } + } + + "subscriber starts after publisher" in run { + Topic.run { + for + started <- Latch.init(1) + done <- Latch.init(1) + result1 <- Abort.run(Topic.publish(uri, failSchedule)(Stream.init(Seq(Message(1))))) + result2 <- Abort.run(Topic.stream[Message](uri, failSchedule).take(1).run) + yield + assert(result1.isFailure) + assert(result2.isFailure) + } + } + } + } + } + +end TopicTest diff --git a/kyo-core/shared/src/main/scala/kyo/Retry.scala b/kyo-core/shared/src/main/scala/kyo/Retry.scala index b726d381d..8e890157a 100644 --- a/kyo-core/shared/src/main/scala/kyo/Retry.scala +++ b/kyo-core/shared/src/main/scala/kyo/Retry.scala @@ -42,15 +42,16 @@ object Retry: Frame ): A < (Async & Abort[E] & S) = Abort.run[E](v).map { - case Result.Success(r) => r - case error: Result.Error[?] => + case Result.Success(value) => value + case result: Result.Failure[E] @unchecked => Clock.now.map { now => schedule.next(now).map { (delay, nextSchedule) => Async.delay(delay)(Retry[E](nextSchedule)(v)) }.getOrElse { - Abort.get(error) + Abort.get(result) } } + case Result.Panic(ex) => Abort.panic(ex) } end RetryOps diff --git a/kyo-core/shared/src/test/scala/kyo/RetryTest.scala b/kyo-core/shared/src/test/scala/kyo/RetryTest.scala index 9e62d0c3e..5d6d6fbc0 100644 --- a/kyo-core/shared/src/test/scala/kyo/RetryTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/RetryTest.scala @@ -86,4 +86,30 @@ class RetryTest extends Test: } } } + + "panics" - { + "should not retry on panic" in run { + var calls = 0 + Abort.run[Exception] { + Retry[Exception](Schedule.repeat(3)) { + calls += 1 + Abort.panic(new RuntimeException("panic")) + } + }.map { v => + assert(v.isPanic && calls == 1) + } + } + + "should not retry on panic with default schedule" in run { + var calls = 0 + Abort.run[Exception] { + Retry[Exception] { + calls += 1 + Abort.panic(new RuntimeException("panic")) + } + }.map { v => + assert(v.isPanic && calls == 1) + } + } + } end RetryTest diff --git a/kyo-data/shared/src/main/scala/kyo/Tag.scala b/kyo-data/shared/src/main/scala/kyo/Tag.scala index 1ced3145b..ab25aff2d 100644 --- a/kyo-data/shared/src/main/scala/kyo/Tag.scala +++ b/kyo-data/shared/src/main/scala/kyo/Tag.scala @@ -6,6 +6,7 @@ import scala.annotation.switch import scala.annotation.tailrec import scala.collection.immutable import scala.quoted.* +import scala.util.hashing.MurmurHash3 opaque type Tag[A] = String @@ -30,6 +31,8 @@ object Tag: val decoded = t1.drop(2).takeWhile(_ != ';') TagMacro.fromCompact.getOrElse(decoded, decoded) + def hash: Int = MurmurHash3.stringHash(t1) + private[kyo] def raw: String = t1 end extension diff --git a/kyo-data/shared/src/test/scala/kyo/TagTest.scala b/kyo-data/shared/src/test/scala/kyo/TagTest.scala index edb76e42f..d64cd0b78 100644 --- a/kyo-data/shared/src/test/scala/kyo/TagTest.scala +++ b/kyo-data/shared/src/test/scala/kyo/TagTest.scala @@ -788,4 +788,8 @@ class TagTest extends AsyncFreeSpec with NonImplicitAssertions: opaque type Inner[A] = List[A] opaque type Outer[B] = Inner[B] + "hash" in { + assert(Tag[Int].hash == Tag[Int].hash) + } + end TagTest