From adefac76fff0c57fed1841ddb509b0d22f7c4fa2 Mon Sep 17 00:00:00 2001 From: Ruud Welling Date: Tue, 19 Dec 2017 16:03:01 +0100 Subject: [PATCH] Journal sequence actor uses NumericRanges instead of a Set to keep track of missing elements (#158) * Added test to reproduce performance (memory) issue in journal sequence actor * JournalSequenceActor uses NumericRanges for missing elements to prevent performance and memory issues. fixes #155 * Small code simplification * Fixed test failure * Code simplification as suggested in PR comment --- .../jdbc/query/JournalSequenceActor.scala | 60 +++++++++---------- .../jdbc/query/JournalSequenceActorTest.scala | 31 ++++++++++ 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala b/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala index 7b4866513..e84620e69 100644 --- a/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala +++ b/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala @@ -1,4 +1,5 @@ -package akka.persistence.jdbc.query +package akka.persistence.jdbc +package query import akka.actor.{Actor, ActorLogging, Props, Status, Timers} import akka.persistence.jdbc.query.dao.ReadJournalDao @@ -7,6 +8,7 @@ import akka.persistence.jdbc.config.JournalSequenceRetrievalConfig import akka.stream.Materializer import akka.stream.scaladsl.Sink +import scala.collection.immutable.NumericRange import scala.concurrent.duration.FiniteDuration object JournalSequenceActor { @@ -25,6 +27,22 @@ object JournalSequenceActor { private case object AssumeMaxOrderingIdTimerKey private type OrderingId = Long + + /** + * Efficient representation of missing elements using NumericRanges. + * It can be seen as a collection of OrderingIds + */ + private case class MissingElements(elements: Seq[NumericRange[OrderingId]]) { + def addRange(from: OrderingId, until: OrderingId): MissingElements = { + val newRange = from.until(until) + MissingElements(elements :+ newRange) + } + def contains(id: OrderingId): Boolean = elements.exists(_.containsTyped(id)) + def isEmpty: Boolean = elements.forall(_.isEmpty) + } + private object MissingElements { + def empty: MissingElements = MissingElements(Vector.empty) + } } /** @@ -56,7 +74,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen * @param moduloCounter A counter which is incremented every time a new query have been executed, modulo `maxTries` * @param previousDelay The last used delay (may change in case failures occur) */ - def receive(currentMaxOrdering: OrderingId, missingByCounter: Map[Int, Set[OrderingId]], moduloCounter: Int, previousDelay: FiniteDuration = queryDelay): Receive = { + def receive(currentMaxOrdering: OrderingId, missingByCounter: Map[Int, MissingElements], moduloCounter: Int, previousDelay: FiniteDuration = queryDelay): Receive = { case ScheduleAssumeMaxOrderingId(max) => // All elements smaller than max can be assumed missing after this delay @@ -97,36 +115,18 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen /** * This method that implements the "find gaps" algo. It's the meat and main purpose of this actor. */ - def findGaps(elements: Seq[OrderingId], currentMaxOrdering: OrderingId, missingByCounter: Map[Int, Set[OrderingId]], moduloCounter: Int) = { + def findGaps(elements: Seq[OrderingId], currentMaxOrdering: OrderingId, missingByCounter: Map[Int, MissingElements], moduloCounter: Int): Unit = { // list of elements that will be considered as genuine gaps. // `givenUp` is either empty or is was filled on a previous iteration - val givenUp = missingByCounter.getOrElse(moduloCounter, Set.empty) + val givenUp = missingByCounter.getOrElse(moduloCounter, MissingElements.empty) val (nextMax, _, missingElems) = // using the ordering elements that were fetched, we verify if there are any gaps - elements.foldLeft[(OrderingId, OrderingId, Set[OrderingId])](currentMaxOrdering, currentMaxOrdering, Set.empty) { + elements.foldLeft[(OrderingId, OrderingId, MissingElements)](currentMaxOrdering, currentMaxOrdering, MissingElements.empty) { case ((currentMax, previousElement, missing), currentElement) => - // we accumulate in newMissing the gaps we detect on each iteration - val newMissing = currentElement match { - - // if current element is contiguous to previous, there is no gap - case e if e == previousElement + 1 => missing - - // if it's a gap and has been detected before on a previous iteration we give up - // that means that we consider it a genuine gap that will never be filled - case e if givenUp(e) => missing - - // any other case is a gap that we expect to be filled soon - case _ => - val currentlyMissing = previousElement + 1 until currentElement - // we don't want to declare it as missing if it has been already declared on a previous iterations - def alreadyMissing(e: Long) = missingByCounter.values.exists(_.contains(e)) - missing ++ currentlyMissing.filterNot(alreadyMissing) - } - // we must decide if we move the cursor forward val newMax = if ((currentMax + 1).until(currentElement).forall(givenUp.contains)) { @@ -136,18 +136,18 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen currentElement } else currentMax + // we accumulate in newMissing the gaps we detect on each iteration + val newMissing = + if (previousElement + 1 == currentElement || newMax == currentElement) missing + else missing.addRange(previousElement + 1, currentElement) + (newMax, currentElement, newMissing) } - val newMissingByCounter = - (missingByCounter + (moduloCounter -> missingElems)) - .map { - case (key, value) => - key -> value.filter(missingId => missingId > nextMax) - } + val newMissingByCounter = missingByCounter + (moduloCounter -> missingElems) // did we detect gaps in the current batch? - val noGapsFound = newMissingByCounter.values.forall(_.isEmpty) + val noGapsFound = missingElems.isEmpty // full batch means that we retrieved as much elements as the batchSize // that happens when we are not yet at the end of the stream diff --git a/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala b/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala index b6465db6a..17348632c 100644 --- a/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala +++ b/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala @@ -74,6 +74,37 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) e } } + if (!isOracle) { + // Note this test case cannot be executed for oracle, because forceInsertAll is not supported in the oracle driver. + it should "recover after the specified max number if tries if the first event has a very high sequence number and lots of large gaps exist" in { + withActorSystem { implicit system: ActorSystem => + withDatabase { db => + implicit val materializer: ActorMaterializer = ActorMaterializer() + val numElements = 1000 + val gapSize = 10000 + val firstElement = 100000000 + val lastElement = firstElement + (numElements * gapSize) + Source.fromIterator(() => (firstElement to lastElement by gapSize).iterator) + .map(id => JournalRow(id, deleted = false, "id", id, Array(0.toByte))) + .grouped(10000) + .mapAsync(4) { rows => + db.run(JournalTable.forceInsertAll(rows)) + } + .runWith(Sink.ignore).futureValue + + withJournalSequenceActor(db, maxTries = 2) { actor => + // Should normally recover after `maxTries` seconds + val patienceConfig = PatienceConfig(10.seconds) + eventually { + val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering + currentMax shouldBe lastElement + }(patienceConfig, implicitly) + } + } + } + } + } + it should s"assume that the max ordering id in the database on startup is the max after (queryDelay * maxTries)" in { withActorSystem { implicit system: ActorSystem => withDatabase { db =>