From 524b96dc67004577d921ceda55d4cd026b9606df Mon Sep 17 00:00:00 2001 From: Szymon Marcinkiewicz Date: Thu, 18 Jan 2024 13:59:00 +0100 Subject: [PATCH 1/3] Not saving all events to StateEventStore --- .../core/state/StateAggregator.kt | 6 +-- .../core/state/StateEvent.kt | 38 +++++++++++-------- .../core/state/StateEventStore.kt | 10 +++-- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregator.kt b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregator.kt index fb39e68..9746f81 100644 --- a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregator.kt +++ b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregator.kt @@ -41,14 +41,14 @@ internal object StateAggregator { private fun aggregateCollectionMigrationState( sourceToDestination: SourceToDestination, - events: List + events: Map ): CollectionState = CollectionState( sourceToDestination, eventsToSteps(events) ) - private fun eventsToSteps(events: List): List { - return events.fold(mutableMapOf()) { result, migrationEvent -> + private fun eventsToSteps(events: Map): List { + return events.values.fold(mutableMapOf()) { result, migrationEvent -> buildSteps(result, migrationEvent) }.values.toList().sortedBy { it.startDate } } diff --git a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateEvent.kt b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateEvent.kt index e78b37e..0194977 100644 --- a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateEvent.kt +++ b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateEvent.kt @@ -5,50 +5,56 @@ import java.time.Instant sealed class StateEvent( open val sourceToDestination: SourceToDestination, - val date: Instant = Instant.now() + val type: Type, + val date: Instant = Instant.now(), ) { + enum class Type { + START, SOURCE_TO_LOCAL_START, DUMP_START, DUMP_UPDATE, DUMP_FINISH, RESTORE_START, RESTORE_UPDATE, RESTORE_FINISH, + INDEX_REBUILD_START, INDEX_REBUILD_FINISH, LOCAL_TO_DESTINATION_START, STOP, PAUSE, RESUME, FAILED; + } + data class StartEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.START) data class SourceToLocalStartEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.SOURCE_TO_LOCAL_START) data class DumpStartEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.DUMP_START) data class DumpUpdateEvent(override val sourceToDestination: SourceToDestination, val info: String) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.DUMP_UPDATE) data class DumpFinishEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.DUMP_FINISH) data class RestoreStartEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.RESTORE_START) data class RestoreUpdateEvent(override val sourceToDestination: SourceToDestination, val info: String) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.RESTORE_UPDATE) data class RestoreFinishEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.RESTORE_FINISH) data class IndexRebuildStartEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.INDEX_REBUILD_START) data class IndexRebuildFinishEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.INDEX_REBUILD_FINISH) data class LocalToDestinationStartEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.LOCAL_TO_DESTINATION_START) data class StopEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.STOP) data class PauseEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.PAUSE) data class ResumeEvent(override val sourceToDestination: SourceToDestination) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.RESUME) data class FailedEvent(override val sourceToDestination: SourceToDestination, val throwable: Throwable? = null) : - StateEvent(sourceToDestination) + StateEvent(sourceToDestination, Type.FAILED) } diff --git a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateEventStore.kt b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateEventStore.kt index 72a1b17..bbda5e7 100644 --- a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateEventStore.kt +++ b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateEventStore.kt @@ -4,12 +4,14 @@ import pl.allegro.tech.mongomigrationstream.core.mongo.SourceToDestination import java.util.concurrent.ConcurrentHashMap internal class StateEventStore { - private val events: ConcurrentHashMap> = ConcurrentHashMap() + private val events: ConcurrentHashMap> = ConcurrentHashMap() fun storeEvent(event: StateEvent) { - events[event.sourceToDestination] = events.getOrDefault(event.sourceToDestination, emptyList()) + listOf(event) + val eventsForCollection = events.getOrDefault(event.sourceToDestination, ConcurrentHashMap()) + eventsForCollection[event.type] = event + events[event.sourceToDestination] = eventsForCollection } - fun getEvents(sourceToDestination: SourceToDestination): List = events[sourceToDestination] ?: emptyList() - fun getAllEvents(): Map> = events + fun getEvents(sourceToDestination: SourceToDestination): Map = events[sourceToDestination] ?: emptyMap() + fun getAllEvents(): Map> = events } From d236f525115b7dc09a0decf2e2b0e0a0275b281e Mon Sep 17 00:00:00 2001 From: Szymon Marcinkiewicz Date: Fri, 19 Jan 2024 11:49:34 +0100 Subject: [PATCH 2/3] Fix aggregation after changing EventStore --- .../core/state/StateAggregator.kt | 21 +- .../core/state/StateAggregatorTest.kt | 459 ++++++++++++++++++ 2 files changed, 466 insertions(+), 14 deletions(-) create mode 100644 mongo-migration-stream-core/src/test/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregatorTest.kt diff --git a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregator.kt b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregator.kt index 9746f81..4ebefd9 100644 --- a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregator.kt +++ b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregator.kt @@ -30,6 +30,7 @@ import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.ResumeEvent import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.SourceToLocalStartEvent import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.StartEvent import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.StopEvent +import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.Type internal object StateAggregator { fun aggregateMigrationState(eventStore: StateEventStore): State = @@ -41,14 +42,14 @@ internal object StateAggregator { private fun aggregateCollectionMigrationState( sourceToDestination: SourceToDestination, - events: Map + events: Map ): CollectionState = CollectionState( sourceToDestination, eventsToSteps(events) ) - private fun eventsToSteps(events: Map): List { - return events.values.fold(mutableMapOf()) { result, migrationEvent -> + private fun eventsToSteps(events: Map): List { + return events.values.sortedBy { it.date }.fold(mutableMapOf()) { result, migrationEvent -> buildSteps(result, migrationEvent) }.values.toList().sortedBy { it.startDate } } @@ -62,20 +63,12 @@ internal object StateAggregator { is SourceToLocalStartEvent -> updateSteps(steps, CollectionStep(SOURCE_TO_LOCAL, event.date)) is DumpStartEvent -> updateSteps(steps, CollectionStep(DUMP, event.date)) is DumpFinishEvent -> steps[DUMP]?.copy(endDate = event.date)?.let { updateSteps(steps, it) } - is DumpUpdateEvent -> steps[DUMP]?.copy( - info = (steps[DUMP]?.info ?: emptyList()) + listOf(Info(event.date, event.info)) - )?.let { updateSteps(steps, it) } - + is DumpUpdateEvent -> steps[DUMP]?.copy(info = listOf(Info(event.date, event.info)))?.let { updateSteps(steps, it) } is RestoreStartEvent -> updateSteps(steps, CollectionStep(RESTORE, event.date)) - is RestoreUpdateEvent -> steps[RESTORE]?.copy( - info = (steps[RESTORE]?.info ?: emptyList()) + listOf(Info(event.date, event.info)) - )?.let { updateSteps(steps, it) } - + is RestoreUpdateEvent -> steps[RESTORE]?.copy(info = listOf(Info(event.date, event.info)))?.let { updateSteps(steps, it) } is RestoreFinishEvent -> steps[RESTORE]?.copy(endDate = event.date)?.let { updateSteps(steps, it) } is IndexRebuildStartEvent -> updateSteps(steps, CollectionStep(INDEX_REBUILD, event.date)) - is IndexRebuildFinishEvent -> steps[INDEX_REBUILD]?.copy(endDate = event.date) - ?.let { updateSteps(steps, it) } - + is IndexRebuildFinishEvent -> steps[INDEX_REBUILD]?.copy(endDate = event.date)?.let { updateSteps(steps, it) } is LocalToDestinationStartEvent -> updateSteps(steps, CollectionStep(LOCAL_TO_DESTINATION, event.date)) is StopEvent -> updateSteps(steps, CollectionStep(FINISHED, event.date)) is PauseEvent -> updateSteps(steps, CollectionStep(PAUSED, event.date)) diff --git a/mongo-migration-stream-core/src/test/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregatorTest.kt b/mongo-migration-stream-core/src/test/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregatorTest.kt new file mode 100644 index 0000000..5d60d1a --- /dev/null +++ b/mongo-migration-stream-core/src/test/kotlin/pl/allegro/tech/mongomigrationstream/core/state/StateAggregatorTest.kt @@ -0,0 +1,459 @@ +package pl.allegro.tech.mongomigrationstream.core.state + +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.collections.shouldBeEmpty +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import pl.allegro.tech.mongomigrationstream.core.mongo.DbCollection +import pl.allegro.tech.mongomigrationstream.core.mongo.SourceToDestination + +internal class StateAggregatorTest : ShouldSpec({ + + should("aggregate state from no events") { + // Given: + val eventStore = StateEventStore() + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.shouldBeEmpty() + } + + should("aggregate state from [Start] event") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) // Only one source to destination + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(1) // Only one step for a given source to destination + result.collectionStates[0].steps[0].type.shouldBe(State.StepType.NEW) + } + + should("aggregate state from [Start, SourceToLocalStart] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(2) + result.collectionStates[0].steps[1].type.shouldBe(State.StepType.SOURCE_TO_LOCAL) + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) // Only one source to destination + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(3) + result.collectionStates[0].steps[2].type.shouldBe(State.StepType.DUMP) + } + + should("contain info about dump when update dump event is aggregated") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + val infoMessage = "Info message" + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, infoMessage)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates[0].steps[2].info.size.shouldBe(1) + result.collectionStates[0].steps[2].info.first().message.shouldBe(infoMessage) + } + + should("contain only last info about dump when multiple update dump events are aggregated") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + val newestMessage = "Newest message" + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, newestMessage)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates[0].steps[2].info.size.shouldBe(1) // Even though there are multiple DumpUpdateEvents, store only the last one + result.collectionStates[0].steps[2].info.first().message.shouldBe(newestMessage) + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + val dumpInfo = "dumpInfo" + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, dumpInfo)) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(3) + result.collectionStates[0].steps[2].type.shouldBe(State.StepType.DUMP) + + val dumpStep = result.collectionStates[0].steps[2] + dumpStep.startDate.shouldNotBeNull() + dumpStep.endDate.shouldNotBeNull() + dumpStep.info.first().message.shouldBe(dumpInfo) + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(4) + result.collectionStates[0].steps[3].type.shouldBe(State.StepType.RESTORE) + val restoreStep = result.collectionStates[0].steps[3] + restoreStep.startDate.shouldNotBeNull() + restoreStep.endDate.shouldBeNull() + restoreStep.info.shouldBeEmpty() + } + + should("contain info about restore when update restore event is aggregated") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + val restoreUpdateMessage = "restore update message" + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreUpdateEvent(sourceToDestination, restoreUpdateMessage)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates[0].steps[3].type.shouldBe(State.StepType.RESTORE) + result.collectionStates[0].steps[3].info.size.shouldBe(1) + result.collectionStates[0].steps[3].info.first().message.shouldBe(restoreUpdateMessage) + } + + should("contain only last info about restore when multiple update restore events are aggregated") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + val newestMessage = "restore update message" + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.RestoreUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.RestoreUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.RestoreUpdateEvent(sourceToDestination, newestMessage)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates[0].steps[3].type.shouldBe(State.StepType.RESTORE) + result.collectionStates[0].steps[3].info.size.shouldBe(1) // despite multiple RestoreUpdate, only last one is stored + result.collectionStates[0].steps[3].info.first().message.shouldBe(newestMessage) + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart, RestoreFinish] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreFinishEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(4) + result.collectionStates[0].steps[3].type.shouldBe(State.StepType.RESTORE) + + val restoreStep = result.collectionStates[0].steps[3] + restoreStep.startDate.shouldNotBeNull() + restoreStep.endDate.shouldNotBeNull() // Restore step is finished + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart, RestoreFinish, IndexRebuildStart] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.IndexRebuildStartEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(5) + result.collectionStates[0].steps[4].type.shouldBe(State.StepType.INDEX_REBUILD) + + val indexRebuildStep = result.collectionStates[0].steps[4] + indexRebuildStep.startDate.shouldNotBeNull() + indexRebuildStep.endDate.shouldBeNull() + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart, RestoreFinish, IndexRebuildStart, IndexRebuildFinish] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.IndexRebuildStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.IndexRebuildFinishEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(5) + result.collectionStates[0].steps[4].type.shouldBe(State.StepType.INDEX_REBUILD) + + val indexRebuildStep = result.collectionStates[0].steps[4] + indexRebuildStep.startDate.shouldNotBeNull() + indexRebuildStep.endDate.shouldNotBeNull() // Index rebuilding is finished + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart, RestoreFinish, IndexRebuildStart, IndexRebuildFinish, LocalToDestinationStart] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.IndexRebuildStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.LocalToDestinationStartEvent(sourceToDestination)) // Local to destination most of the time will occur earlier than index rebuild finish + eventStore.storeEvent(StateEvent.IndexRebuildFinishEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(6) + result.collectionStates[0].steps[5].type.shouldBe(State.StepType.LOCAL_TO_DESTINATION) + + val localToDestination = result.collectionStates[0].steps[5] + localToDestination.startDate.shouldNotBeNull() + localToDestination.endDate.shouldBeNull() + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart, RestoreFinish, IndexRebuildStart, IndexRebuildFinish, LocalToDestinationStart, Stop] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.IndexRebuildStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.LocalToDestinationStartEvent(sourceToDestination)) // Local to destination most of the time will occur earlier than index rebuild finish + eventStore.storeEvent(StateEvent.IndexRebuildFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.StopEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(7) + result.collectionStates[0].steps[6].type.shouldBe(State.StepType.FINISHED) + + val finishStep = result.collectionStates[0].steps[6] + finishStep.startDate.shouldNotBeNull() + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart, RestoreFinish, IndexRebuildStart, IndexRebuildFinish, LocalToDestinationStart, Pause] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.IndexRebuildStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.LocalToDestinationStartEvent(sourceToDestination)) // Local to destination most of the time will occur earlier than index rebuild finish + eventStore.storeEvent(StateEvent.IndexRebuildFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.PauseEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(7) + result.collectionStates[0].steps[6].type.shouldBe(State.StepType.PAUSED) + + val pausedStep = result.collectionStates[0].steps[6] + pausedStep.startDate.shouldNotBeNull() + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart, RestoreFinish, IndexRebuildStart, IndexRebuildFinish, LocalToDestinationStart, Pause, Resume] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.IndexRebuildStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.LocalToDestinationStartEvent(sourceToDestination)) // Local to destination most of the time will occur earlier than index rebuild finish + eventStore.storeEvent(StateEvent.IndexRebuildFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.PauseEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.ResumeEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(8) + result.collectionStates[0].steps[7].type.shouldBe(State.StepType.RESUMED) + + val resumedStep = result.collectionStates[0].steps[7] + resumedStep.startDate.shouldNotBeNull() + } + + should("aggregate state from [Start, SourceToLocalStart, DumpStart, DumpUpdate, DumpFinish, RestoreStart, RestoreFinish, IndexRebuildStart, IndexRebuildFinish, LocalToDestinationStart, Pause, Resume, Failed] events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpUpdateEvent(sourceToDestination, "")) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.IndexRebuildStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.LocalToDestinationStartEvent(sourceToDestination)) // Local to destination most of the time will occur earlier than index rebuild finish + eventStore.storeEvent(StateEvent.IndexRebuildFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.PauseEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.ResumeEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.FailedEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(9) + result.collectionStates[0].steps[8].type.shouldBe(State.StepType.FAILED) + } + + should("aggregate failed events if FailedEvent occurred between other events") { + // Given: + val sourceToDestination = SourceToDestination(DbCollection("", ""), DbCollection("", "")) + val eventStore = StateEventStore() + eventStore.storeEvent(StateEvent.StartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.SourceToLocalStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpStartEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.FailedEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.DumpFinishEvent(sourceToDestination)) + eventStore.storeEvent(StateEvent.RestoreStartEvent(sourceToDestination)) + + // When: + val result = StateAggregator.aggregateMigrationState(eventStore) + + // Then: + result.collectionStates.size.shouldBe(1) + result.collectionStates[0].sourceToDestination.shouldBe(sourceToDestination) + result.collectionStates[0].steps.size.shouldBe(5) + result.collectionStates[0].steps[3].type.shouldBe(State.StepType.FAILED) // FailedEvent is not the last one + result.collectionStates[0].steps[4].type.shouldBe(State.StepType.RESTORE) + } +}) From f0dce21f8c60922114c56eca6cf75e2d7c9b58e8 Mon Sep 17 00:00:00 2001 From: Szymon Marcinkiewicz Date: Fri, 19 Jan 2024 12:04:15 +0100 Subject: [PATCH 3/3] Add Allegro Tech blog post to README.md --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 937675e..c8cfe11 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,5 @@ java -jar mongo-migration-stream-cli.jar --config application.properties ## More about _mongo-migration-stream_ project You can find more information about _mongo-migration-stream_ in: - +- [Allegro Tech Blog post about the reasons why we've created mongo-migration-stream and how it works](https://blog.allegro.tech/2023/09/online-mongodb-migration.html), - TODO: Documentation -- TODO: Allegro Tech Blog post