Skip to content

Commit

Permalink
Journal sequence actor uses NumericRanges instead of a Set to keep tr…
Browse files Browse the repository at this point in the history
…ack of missing elements (#158)

* Added test to reproduce performance (memory) issue in journal sequence actor

* JournalSequenceActor uses NumericRanges for missing elements to prevent performance and memory issues.
fixes #155

* Small code simplification

* Fixed test failure

* Code simplification as suggested in PR comment
  • Loading branch information
WellingR authored and octonato committed Dec 19, 2017
1 parent a1ebadc commit adefac7
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package akka.persistence.jdbc.query
package akka.persistence.jdbc
package query

import akka.actor.{Actor, ActorLogging, Props, Status, Timers}
import akka.persistence.jdbc.query.dao.ReadJournalDao
Expand All @@ -7,6 +8,7 @@ import akka.persistence.jdbc.config.JournalSequenceRetrievalConfig
import akka.stream.Materializer
import akka.stream.scaladsl.Sink

import scala.collection.immutable.NumericRange
import scala.concurrent.duration.FiniteDuration

object JournalSequenceActor {
Expand All @@ -25,6 +27,22 @@ object JournalSequenceActor {
private case object AssumeMaxOrderingIdTimerKey

private type OrderingId = Long

/**
* Efficient representation of missing elements using NumericRanges.
* It can be seen as a collection of OrderingIds
*/
private case class MissingElements(elements: Seq[NumericRange[OrderingId]]) {
def addRange(from: OrderingId, until: OrderingId): MissingElements = {
val newRange = from.until(until)
MissingElements(elements :+ newRange)
}
def contains(id: OrderingId): Boolean = elements.exists(_.containsTyped(id))
def isEmpty: Boolean = elements.forall(_.isEmpty)
}
private object MissingElements {
def empty: MissingElements = MissingElements(Vector.empty)
}
}

/**
Expand Down Expand Up @@ -56,7 +74,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen
* @param moduloCounter A counter which is incremented every time a new query have been executed, modulo `maxTries`
* @param previousDelay The last used delay (may change in case failures occur)
*/
def receive(currentMaxOrdering: OrderingId, missingByCounter: Map[Int, Set[OrderingId]], moduloCounter: Int, previousDelay: FiniteDuration = queryDelay): Receive = {
def receive(currentMaxOrdering: OrderingId, missingByCounter: Map[Int, MissingElements], moduloCounter: Int, previousDelay: FiniteDuration = queryDelay): Receive = {

case ScheduleAssumeMaxOrderingId(max) =>
// All elements smaller than max can be assumed missing after this delay
Expand Down Expand Up @@ -97,36 +115,18 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen
/**
* This method that implements the "find gaps" algo. It's the meat and main purpose of this actor.
*/
def findGaps(elements: Seq[OrderingId], currentMaxOrdering: OrderingId, missingByCounter: Map[Int, Set[OrderingId]], moduloCounter: Int) = {
def findGaps(elements: Seq[OrderingId], currentMaxOrdering: OrderingId, missingByCounter: Map[Int, MissingElements], moduloCounter: Int): Unit = {

// list of elements that will be considered as genuine gaps.
// `givenUp` is either empty or is was filled on a previous iteration
val givenUp = missingByCounter.getOrElse(moduloCounter, Set.empty)
val givenUp = missingByCounter.getOrElse(moduloCounter, MissingElements.empty)

val (nextMax, _, missingElems) =
// using the ordering elements that were fetched, we verify if there are any gaps
elements.foldLeft[(OrderingId, OrderingId, Set[OrderingId])](currentMaxOrdering, currentMaxOrdering, Set.empty) {
elements.foldLeft[(OrderingId, OrderingId, MissingElements)](currentMaxOrdering, currentMaxOrdering, MissingElements.empty) {

case ((currentMax, previousElement, missing), currentElement) =>

// we accumulate in newMissing the gaps we detect on each iteration
val newMissing = currentElement match {

// if current element is contiguous to previous, there is no gap
case e if e == previousElement + 1 => missing

// if it's a gap and has been detected before on a previous iteration we give up
// that means that we consider it a genuine gap that will never be filled
case e if givenUp(e) => missing

// any other case is a gap that we expect to be filled soon
case _ =>
val currentlyMissing = previousElement + 1 until currentElement
// we don't want to declare it as missing if it has been already declared on a previous iterations
def alreadyMissing(e: Long) = missingByCounter.values.exists(_.contains(e))
missing ++ currentlyMissing.filterNot(alreadyMissing)
}

// we must decide if we move the cursor forward
val newMax =
if ((currentMax + 1).until(currentElement).forall(givenUp.contains)) {
Expand All @@ -136,18 +136,18 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen
currentElement
} else currentMax

// we accumulate in newMissing the gaps we detect on each iteration
val newMissing =
if (previousElement + 1 == currentElement || newMax == currentElement) missing
else missing.addRange(previousElement + 1, currentElement)

(newMax, currentElement, newMissing)
}

val newMissingByCounter =
(missingByCounter + (moduloCounter -> missingElems))
.map {
case (key, value) =>
key -> value.filter(missingId => missingId > nextMax)
}
val newMissingByCounter = missingByCounter + (moduloCounter -> missingElems)

// did we detect gaps in the current batch?
val noGapsFound = newMissingByCounter.values.forall(_.isEmpty)
val noGapsFound = missingElems.isEmpty

// full batch means that we retrieved as much elements as the batchSize
// that happens when we are not yet at the end of the stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,37 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean) e
}
}

if (!isOracle) {
// Note this test case cannot be executed for oracle, because forceInsertAll is not supported in the oracle driver.
it should "recover after the specified max number if tries if the first event has a very high sequence number and lots of large gaps exist" in {
withActorSystem { implicit system: ActorSystem =>
withDatabase { db =>
implicit val materializer: ActorMaterializer = ActorMaterializer()
val numElements = 1000
val gapSize = 10000
val firstElement = 100000000
val lastElement = firstElement + (numElements * gapSize)
Source.fromIterator(() => (firstElement to lastElement by gapSize).iterator)
.map(id => JournalRow(id, deleted = false, "id", id, Array(0.toByte)))
.grouped(10000)
.mapAsync(4) { rows =>
db.run(JournalTable.forceInsertAll(rows))
}
.runWith(Sink.ignore).futureValue

withJournalSequenceActor(db, maxTries = 2) { actor =>
// Should normally recover after `maxTries` seconds
val patienceConfig = PatienceConfig(10.seconds)
eventually {
val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering
currentMax shouldBe lastElement
}(patienceConfig, implicitly)
}
}
}
}
}

it should s"assume that the max ordering id in the database on startup is the max after (queryDelay * maxTries)" in {
withActorSystem { implicit system: ActorSystem =>
withDatabase { db =>
Expand Down

0 comments on commit adefac7

Please sign in to comment.