diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala index b659af89fad..b7cd867871b 100644 --- a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala @@ -141,9 +141,18 @@ object PersistenceTestKitPlugin { * Persistence testkit plugin for snapshots. */ @InternalApi -class PersistenceTestKitSnapshotPlugin extends SnapshotStore { +class PersistenceTestKitSnapshotPlugin( + // providing this parameter in first position as unused + // because Persistence extension that instantiates the plugins + // does not support constructors without it + @unused cfg: Config, + cfgPath: String +) extends SnapshotStore { - private final val storage = SnapshotStorageEmulatorExtension(context.system) + private final val storage = { + log.debug("Using snapshot storage emulator extension [{}] for test kit snapshot storage", cfgPath) + SnapshotStorageEmulatorExtension(context.system).storageFor(cfgPath) + } override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = Future.fromTry(Try(storage.tryRead(persistenceId, criteria))) diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala index b6ce9f65bd2..b185557139a 100644 --- a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala @@ -13,6 +13,8 @@ package org.apache.pekko.persistence.testkit.internal +import java.util.concurrent.ConcurrentHashMap + import org.apache.pekko import pekko.actor.Extension import pekko.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider } @@ -24,17 +26,34 @@ import pekko.persistence.testkit.scaladsl.SnapshotTestKit * INTERNAL API */ @InternalApi -private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorage] with ExtensionIdProvider { +private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorageEmulatorExtension] + with ExtensionIdProvider { - override def get(system: ActorSystem): SnapshotStorage = super.get(system) + override def get(system: ActorSystem): SnapshotStorageEmulatorExtension = super.get(system) - override def createExtension(system: ExtendedActorSystem): SnapshotStorage = - if (SnapshotTestKit.Settings(system).serialize) { - new SerializedSnapshotStorageImpl(system) - } else { - new SimpleSnapshotStorageImpl - } + override def createExtension(system: ExtendedActorSystem): SnapshotStorageEmulatorExtension = + new SnapshotStorageEmulatorExtension(system) override def lookup: ExtensionId[_ <: Extension] = SnapshotStorageEmulatorExtension } + +/** + * INTERNAL API + */ +@InternalApi +final class SnapshotStorageEmulatorExtension(system: ExtendedActorSystem) extends Extension { + private val stores = new ConcurrentHashMap[String, SnapshotStorage]() + private lazy val shouldCreateSerializedSnapshotStorage = SnapshotTestKit.Settings(system).serialize + + def storageFor(key: String): SnapshotStorage = + stores.computeIfAbsent(key, + _ => { + // we don't really care about the key here, we just want separate instances + if (shouldCreateSerializedSnapshotStorage) { + new SerializedSnapshotStorageImpl(system) + } else { + new SimpleSnapshotStorageImpl + } + }) +} diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/PersistenceTestKit.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/PersistenceTestKit.scala index 379ecd75a00..51aff3d7576 100644 --- a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/PersistenceTestKit.scala +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/PersistenceTestKit.scala @@ -336,7 +336,8 @@ class SnapshotTestKit(system: ActorSystem) import SnapshotTestKit._ - override protected val storage: SnapshotStorage = SnapshotStorageEmulatorExtension(system) + override protected val storage: SnapshotStorage = + SnapshotStorageEmulatorExtension(system).storageFor(PersistenceTestKitSnapshotPlugin.PluginId) override def getItem(persistenceId: String, nextInd: Int): Option[Any] = { storage.firstInExpectNextQueue(persistenceId).map(reprToAny) diff --git a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/RuntimeJournalsSpec.scala b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/RuntimeJournalsSpec.scala new file mode 100644 index 00000000000..644df55d07f --- /dev/null +++ b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/RuntimeJournalsSpec.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.testkit.scaladsl + +import com.typesafe.config.ConfigFactory +import org.apache.pekko +import pekko.Done +import pekko.actor.testkit.typed.scaladsl.LogCapturing +import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import pekko.actor.typed.ActorRef +import pekko.actor.typed.Behavior +import pekko.actor.typed.scaladsl.adapter._ +import pekko.persistence.JournalProtocol.RecoverySuccess +import pekko.persistence.JournalProtocol.ReplayMessages +import pekko.persistence.JournalProtocol.ReplayedMessage +import pekko.persistence.Persistence +import pekko.persistence.SelectedSnapshot +import pekko.persistence.SnapshotProtocol.LoadSnapshot +import pekko.persistence.SnapshotProtocol.LoadSnapshotResult +import pekko.persistence.SnapshotSelectionCriteria +import pekko.persistence.testkit.PersistenceTestKitPlugin +import pekko.persistence.testkit.PersistenceTestKitSnapshotPlugin +import pekko.persistence.typed.PersistenceId +import pekko.persistence.typed.scaladsl.Effect +import pekko.persistence.typed.scaladsl.EventSourcedBehavior +import pekko.persistence.typed.scaladsl.RetentionCriteria +import org.scalatest.Inside +import org.scalatest.wordspec.AnyWordSpecLike + +object RuntimeJournalsSpec { + + private object Actor { + sealed trait Command + case class Save(text: String, replyTo: ActorRef[Done]) extends Command + case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command + case object Stop extends Command + + def apply(persistenceId: String, journal: String): Behavior[Command] = + EventSourcedBehavior[Command, String, String]( + PersistenceId.ofUniqueId(persistenceId), + "", + (state, cmd) => + cmd match { + case Save(text, replyTo) => + Effect.persist(text).thenRun(_ => replyTo ! Done) + case ShowMeWhatYouGot(replyTo) => + replyTo ! state + Effect.none + case Stop => + Effect.stop() + }, + (state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|")) + .withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue)) + .withJournalPluginId(s"$journal.journal") + .withJournalPluginConfig(Some(config(journal))) + .withSnapshotPluginId(s"$journal.snapshot") + .withSnapshotPluginConfig(Some(config(journal))) + + } + + private def config(journal: String) = { + ConfigFactory.parseString(s""" + $journal { + journal.class = "${classOf[PersistenceTestKitPlugin].getName}" + snapshot.class = "${classOf[PersistenceTestKitSnapshotPlugin].getName}" + } + """) + } +} + +class RuntimeJournalsSpec + extends ScalaTestWithActorTestKit + with AnyWordSpecLike + with LogCapturing + with Inside { + + import RuntimeJournalsSpec._ + + "The testkit journal and snapshot store plugins" must { + + "be possible to configure at runtime and use in multiple isolated instances" in { + val probe = createTestProbe[Any]() + + { + // one actor in each journal with same id + val j1 = spawn(Actor("id1", "journal1")) + val j2 = spawn(Actor("id1", "journal2")) + j1 ! Actor.Save("j1m1", probe.ref) + probe.receiveMessage() + j2 ! Actor.Save("j2m1", probe.ref) + probe.receiveMessage() + } + + { + def assertJournal(journal: String, expectedEvent: String) = { + val ref = Persistence(system).journalFor(s"$journal.journal", config(journal)) + ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1", probe.ref.toClassic), probe.ref.toClassic) + inside(probe.receiveMessage()) { + case ReplayedMessage(persistentRepr) => + persistentRepr.persistenceId shouldBe "id1" + persistentRepr.payload shouldBe expectedEvent + } + probe.expectMessage(RecoverySuccess(1)) + } + + assertJournal("journal1", "j1m1") + assertJournal("journal2", "j2m1") + } + + { + def assertSnapshot(journal: String, expectedShapshot: String) = { + val ref = Persistence(system).snapshotStoreFor(s"$journal.snapshot", config(journal)) + ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest, Long.MaxValue), + probe.ref.toClassic) + inside(probe.receiveMessage()) { + case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) => + snapshot shouldBe expectedShapshot + } + } + + assertSnapshot("journal1", "j1m1") + assertSnapshot("journal2", "j2m1") + } + } + } +} diff --git a/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior.excludes b/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior.excludes new file mode 100644 index 00000000000..d3578cc3f1b --- /dev/null +++ b/persistence-typed/src/main/mima-filters/1.1.x.backwards.excludes/eventsourcedbehavior.excludes @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Add EventSourcedBehavior.withJournalPluginConfig +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withJournalPluginConfig") +# Add EventSourcedBehavior.withSnapshotPluginConfig +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withSnapshotPluginConfig") diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala index cf8a2ae87f7..101c04e5898 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala @@ -14,9 +14,11 @@ package org.apache.pekko.persistence.typed.internal import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal - +import com.typesafe.config.ConfigFactory import org.apache.pekko +import pekko.util.Helpers.ConfigOps import pekko.actor.Cancellable import pekko.actor.typed.Signal import pekko.actor.typed.scaladsl.ActorContext @@ -74,8 +76,10 @@ private[pekko] final class BehaviorSetup[C, E, S]( val persistence: Persistence = Persistence(context.system.toClassic) - val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId) - val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId) + val journal: ClassicActorRef = persistence + .journalFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty)) + val snapshotStore: ClassicActorRef = persistence + .snapshotStoreFor(settings.snapshotPluginId, settings.snapshotPluginConfig.getOrElse(ConfigFactory.empty)) val isSnapshotOptional: Boolean = Persistence(context.system.classicSystem).configFor(snapshotStore).getBoolean("snapshot-is-optional") @@ -107,14 +111,18 @@ private[pekko] final class BehaviorSetup[C, E, S]( private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None + val recoveryEventTimeout: FiniteDuration = persistence + .journalConfigFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty)) + .getMillisDuration("recovery-event-timeout") + def startRecoveryTimer(snapshot: Boolean): Unit = { cancelRecoveryTimer() implicit val ec: ExecutionContext = context.executionContext val timer = if (snapshot) - context.scheduleOnce(settings.recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true)) + context.scheduleOnce(recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true)) else - context.system.scheduler.scheduleWithFixedDelay(settings.recoveryEventTimeout, settings.recoveryEventTimeout) { + context.system.scheduler.scheduleWithFixedDelay(recoveryEventTimeout, recoveryEventTimeout) { () => context.self ! RecoveryTickEvent(snapshot = false) } diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 40a08328b8e..1baaeccbb63 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -17,6 +17,7 @@ import java.util.Optional import java.util.UUID import java.util.concurrent.atomic.AtomicInteger +import com.typesafe.config.Config import org.apache.pekko import pekko.actor.typed import pekko.actor.typed.ActorRef @@ -105,6 +106,8 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State]( loggerClass: Class[_], journalPluginId: Option[String] = None, snapshotPluginId: Option[String] = None, + journalPluginConfig: Option[Config] = None, + snapshotPluginConfig: Option[Config] = None, tagger: Event => Set[String] = (_: Event) => Set.empty[String], eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State], @@ -132,7 +135,8 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State]( case _ => false } if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass) - val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse("")) + val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""), + journalPluginConfig, snapshotPluginConfig) // stashState outside supervise because StashState should survive restarts due to persist failures val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings) @@ -261,6 +265,14 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State]( copy(snapshotPluginId = if (id != "") Some(id) else None) } + override def withJournalPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = { + copy(journalPluginConfig = config) + } + + override def withSnapshotPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = { + copy(snapshotPluginConfig = config) + } + override def withSnapshotSelectionCriteria( selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = { copy(recovery = Recovery(selection.toClassic)) diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala index ea68d0f9280..23421997a72 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedSettings.scala @@ -13,26 +13,40 @@ package org.apache.pekko.persistence.typed.internal -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration._ - import com.typesafe.config.Config import org.apache.pekko import pekko.actor.typed.ActorSystem import pekko.annotation.InternalApi -import pekko.persistence.Persistence /** * INTERNAL API */ @InternalApi private[pekko] object EventSourcedSettings { - def apply(system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String): EventSourcedSettings = - apply(system.settings.config, journalPluginId, snapshotPluginId) - - def apply(config: Config, journalPluginId: String, snapshotPluginId: String): EventSourcedSettings = { + def apply( + system: ActorSystem[_], + journalPluginId: String, + snapshotPluginId: String + ): EventSourcedSettings = + apply(system.settings.config, journalPluginId, snapshotPluginId, None, None) + + def apply( + system: ActorSystem[_], + journalPluginId: String, + snapshotPluginId: String, + journalPluginConfig: Option[Config], + snapshotPluginConfig: Option[Config] + ): EventSourcedSettings = + apply(system.settings.config, journalPluginId, snapshotPluginId, journalPluginConfig, snapshotPluginConfig) + + def apply( + config: Config, + journalPluginId: String, + snapshotPluginId: String, + journalPluginConfig: Option[Config], + snapshotPluginConfig: Option[Config] + ): EventSourcedSettings = { val typedConfig = config.getConfig("pekko.persistence.typed") val stashOverflowStrategy = typedConfig.getString("stash-overflow-strategy").toLowerCase match { @@ -47,36 +61,18 @@ import pekko.persistence.Persistence val logOnStashing = typedConfig.getBoolean("log-stashing") - val journalConfig = journalConfigFor(config, journalPluginId) - val recoveryEventTimeout: FiniteDuration = - journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis - val useContextLoggerForInternalLogging = typedConfig.getBoolean("use-context-logger-for-internal-logging") - Persistence.verifyPluginConfigExists(config, snapshotPluginId, "Snapshot store") - EventSourcedSettings( stashCapacity = stashCapacity, stashOverflowStrategy, logOnStashing = logOnStashing, - recoveryEventTimeout, journalPluginId, snapshotPluginId, + journalPluginConfig, + snapshotPluginConfig, useContextLoggerForInternalLogging) } - - private def journalConfigFor(config: Config, journalPluginId: String): Config = { - def defaultJournalPluginId = { - val configPath = config.getString("pekko.persistence.journal.plugin") - Persistence.verifyPluginConfigIsDefined(configPath, "Default journal") - configPath - } - - val configPath = if (journalPluginId == "") defaultJournalPluginId else journalPluginId - Persistence.verifyPluginConfigExists(config, configPath, "Journal") - config.getConfig(configPath).withFallback(config.getConfig(Persistence.JournalFallbackConfigPath)) - } - } /** @@ -87,9 +83,10 @@ private[pekko] final case class EventSourcedSettings( stashCapacity: Int, stashOverflowStrategy: StashOverflowStrategy, logOnStashing: Boolean, - recoveryEventTimeout: FiniteDuration, journalPluginId: String, snapshotPluginId: String, + journalPluginConfig: Option[Config], + snapshotPluginConfig: Option[Config], useContextLoggerForInternalLogging: Boolean) { require(journalPluginId != null, "journal plugin id must not be null; use empty string for 'default' journal") diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala index 3149255a3c6..6e12eb3d37d 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala @@ -230,7 +230,7 @@ private[pekko] final class ReplayingEvents[C, E, S]( this } else { val msg = - s"Replay timed out, didn't get event within [${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" + s"Replay timed out, didn't get event within [${setup.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" onRecoveryFailure(new RecoveryTimedOut(msg), None) } } else { diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingSnapshot.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingSnapshot.scala index e28411d96d9..ff2f74a3619 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingSnapshot.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingSnapshot.scala @@ -127,7 +127,7 @@ private[pekko] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetu if (snapshot) { // we know we're in snapshotting mode; snapshot recovery timeout arrived val ex = new RecoveryTimedOut( - s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}") + s"Recovery timed out, didn't get snapshot within ${setup.recoveryEventTimeout}") onRecoveryFailure(ex) } else Behaviors.same // ignore, since we received the snapshot already diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala index c133bcbdbdb..e2383f21ead 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -16,6 +16,7 @@ package org.apache.pekko.persistence.typed.javadsl import java.util.Collections import java.util.Optional +import com.typesafe.config.Config import org.apache.pekko import pekko.actor.typed import pekko.actor.typed.BackoffSupervisorStrategy @@ -26,6 +27,7 @@ import pekko.annotation.InternalApi import pekko.persistence.typed._ import pekko.persistence.typed.EventAdapter import pekko.persistence.typed.internal._ +import pekko.util.OptionConverters._ import pekko.util.unused abstract class EventSourcedBehavior[Command, Event, State] private[pekko] ( @@ -130,6 +132,16 @@ abstract class EventSourcedBehavior[Command, Event, State] private[pekko] ( */ def snapshotPluginId: String = "" + /** + * Override and define the journal plugin config that this actor should use instead of the default. + */ + def journalPluginConfig: Optional[Config] = Optional.empty() + + /** + * Override and define the snapshot store plugin config that this actor should use instead of the default. + */ + def snapshotPluginConfig: Optional[Config] = Optional.empty() + /** * Override and define the snapshot selection criteria used by this actor instead of the default. * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events @@ -225,6 +237,8 @@ abstract class EventSourcedBehavior[Command, Event, State] private[pekko] ( .withJournalPluginId(journalPluginId) .withSnapshotPluginId(snapshotPluginId) .withRecovery(recovery.asScala) + .withJournalPluginConfig(journalPluginConfig.toScala) + .withSnapshotPluginConfig(snapshotPluginConfig.toScala) val handler = signalHandler() val behaviorWithSignalHandler = diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala index 4bdbc9225ea..57ab08d8932 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -13,6 +13,7 @@ package org.apache.pekko.persistence.typed.scaladsl +import com.typesafe.config.Config import org.apache.pekko import pekko.actor.typed.BackoffSupervisorStrategy import pekko.actor.typed.Behavior @@ -162,6 +163,20 @@ object EventSourcedBehavior { */ def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] + /** + * Change the journal plugin config that this actor should use. + * + * @since 1.1.3 + */ + def withJournalPluginConfig(id: Option[Config]): EventSourcedBehavior[Command, Event, State] + + /** + * Change the snapshot store plugin config that this actor should use. + * + * @since 1.1.3 + */ + def withSnapshotPluginConfig(id: Option[Config]): EventSourcedBehavior[Command, Event, State] + /** * Changes the snapshot selection criteria used by this behavior. * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events diff --git a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala index 7e13db514ea..b7180fb78d6 100644 --- a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala +++ b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/StashStateSpec.scala @@ -74,9 +74,10 @@ class StashStateSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with stashCapacity = capacity, stashOverflowStrategy = StashOverflowStrategy.Fail, logOnStashing = false, - recoveryEventTimeout = 3.seconds, journalPluginId = "", snapshotPluginId = "", + journalPluginConfig = None, + snapshotPluginConfig = None, useContextLoggerForInternalLogging = false) }