Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add EventsByTagQuery to JavaDSL PersistenceTestKitReadJournal #1763

Merged
merged 5 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import pekko.persistence.query.javadsl.{
CurrentEventsByPersistenceIdQuery,
CurrentEventsByTagQuery,
EventsByPersistenceIdQuery,
EventsByTagQuery,
ReadJournal
}
import pekko.persistence.query.typed
Expand All @@ -38,7 +39,8 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with CurrentEventsBySliceQuery {
with CurrentEventsBySliceQuery
with EventsByTagQuery {

override def eventsByPersistenceId(
persistenceId: String,
Expand All @@ -62,6 +64,9 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
offset: Offset): Source[typed.EventEnvelope[Event], NotUsed] =
delegate.currentEventsBySlices(entityType, minSlice, maxSlice, offset).asJava

override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
delegate.eventsByTag(tag, offset).asJava

override def sliceForPersistenceId(persistenceId: String): Int =
delegate.sliceForPersistenceId(persistenceId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.pekko.persistence.testkit.query

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
Expand All @@ -27,7 +25,9 @@ import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.persistence.query.EventEnvelope
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.NoOffset
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.query.javadsl.{ PersistenceTestKitReadJournal => JavaPersistenceTestKitReadJournal }
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
Expand All @@ -36,6 +36,9 @@ import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.immutable.Seq
import scala.concurrent.duration._

object EventsByTagSpec {
val config = PersistenceTestKitPlugin.config.withFallback(
ConfigFactory.parseString("""
Expand Down Expand Up @@ -74,8 +77,16 @@ class EventsByTagSpec

implicit val classic: pekko.actor.ActorSystem = system.classicSystem

val queries =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
private val persistenceQuery = PersistenceQuery(system)

private val queries =
persistenceQuery.readJournalFor[PersistenceTestKitReadJournal](
PersistenceTestKitReadJournal.Identifier)

private val queriesJava =
persistenceQuery.getReadJournalFor(
classOf[JavaPersistenceTestKitReadJournal],
JavaPersistenceTestKitReadJournal.Identifier)

def setup(persistenceId: String, tags: Set[String]): ActorRef[Command] = {
val probe = createTestProbe[Done]()
Expand Down Expand Up @@ -115,6 +126,19 @@ class EventsByTagSpec
probe.expectNext("c-4")
}

"find new events (Java DSL)" in {
val ackProbe = createTestProbe[Done]()
val tag = "c-tag"
val ref = setup("c", Set(tag))
val src = queriesJava.eventsByTag(tag, NoOffset).asScala
val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("c-1", "c-2", "c-3")

ref ! Command("c-4", ackProbe.ref)
ackProbe.expectMessage(Done)

probe.expectNext("c-4")
}

"find new events after batched setup" in {
val ackProbe = createTestProbe[Done]()
val tag = "d-tag"
Expand Down
Loading