Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix number of events stored in EventStore and adjust aggregation methods #46

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,5 @@ java -jar mongo-migration-stream-cli.jar --config application.properties
## More about _mongo-migration-stream_ project

You can find more information about _mongo-migration-stream_ in:

- [Allegro Tech Blog post about the reasons why we've created mongo-migration-stream and how it works](https://blog.allegro.tech/2023/09/online-mongodb-migration.html),
- TODO: Documentation
- TODO: Allegro Tech Blog post
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.ResumeEvent
import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.SourceToLocalStartEvent
import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.StartEvent
import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.StopEvent
import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.Type

internal object StateAggregator {
fun aggregateMigrationState(eventStore: StateEventStore): State =
Expand All @@ -41,14 +42,14 @@ internal object StateAggregator {

private fun aggregateCollectionMigrationState(
sourceToDestination: SourceToDestination,
events: List<StateEvent>
events: Map<Type, StateEvent>
): CollectionState = CollectionState(
sourceToDestination,
eventsToSteps(events)
)

private fun eventsToSteps(events: List<StateEvent>): List<CollectionStep> {
return events.fold(mutableMapOf<StepType, CollectionStep>()) { result, migrationEvent ->
private fun eventsToSteps(events: Map<Type, StateEvent>): List<CollectionStep> {
return events.values.sortedBy { it.date }.fold(mutableMapOf<StepType, CollectionStep>()) { result, migrationEvent ->
buildSteps(result, migrationEvent)
}.values.toList().sortedBy { it.startDate }
}
Expand All @@ -62,20 +63,12 @@ internal object StateAggregator {
is SourceToLocalStartEvent -> updateSteps(steps, CollectionStep(SOURCE_TO_LOCAL, event.date))
is DumpStartEvent -> updateSteps(steps, CollectionStep(DUMP, event.date))
is DumpFinishEvent -> steps[DUMP]?.copy(endDate = event.date)?.let { updateSteps(steps, it) }
is DumpUpdateEvent -> steps[DUMP]?.copy(
info = (steps[DUMP]?.info ?: emptyList()) + listOf(Info(event.date, event.info))
)?.let { updateSteps(steps, it) }

is DumpUpdateEvent -> steps[DUMP]?.copy(info = listOf(Info(event.date, event.info)))?.let { updateSteps(steps, it) }
is RestoreStartEvent -> updateSteps(steps, CollectionStep(RESTORE, event.date))
is RestoreUpdateEvent -> steps[RESTORE]?.copy(
info = (steps[RESTORE]?.info ?: emptyList()) + listOf(Info(event.date, event.info))
)?.let { updateSteps(steps, it) }

is RestoreUpdateEvent -> steps[RESTORE]?.copy(info = listOf(Info(event.date, event.info)))?.let { updateSteps(steps, it) }
is RestoreFinishEvent -> steps[RESTORE]?.copy(endDate = event.date)?.let { updateSteps(steps, it) }
is IndexRebuildStartEvent -> updateSteps(steps, CollectionStep(INDEX_REBUILD, event.date))
is IndexRebuildFinishEvent -> steps[INDEX_REBUILD]?.copy(endDate = event.date)
?.let { updateSteps(steps, it) }

is IndexRebuildFinishEvent -> steps[INDEX_REBUILD]?.copy(endDate = event.date)?.let { updateSteps(steps, it) }
is LocalToDestinationStartEvent -> updateSteps(steps, CollectionStep(LOCAL_TO_DESTINATION, event.date))
is StopEvent -> updateSteps(steps, CollectionStep(FINISHED, event.date))
is PauseEvent -> updateSteps(steps, CollectionStep(PAUSED, event.date))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,56 @@ import java.time.Instant

sealed class StateEvent(
open val sourceToDestination: SourceToDestination,
val date: Instant = Instant.now()
val type: Type,
val date: Instant = Instant.now(),
) {
enum class Type {
START, SOURCE_TO_LOCAL_START, DUMP_START, DUMP_UPDATE, DUMP_FINISH, RESTORE_START, RESTORE_UPDATE, RESTORE_FINISH,
INDEX_REBUILD_START, INDEX_REBUILD_FINISH, LOCAL_TO_DESTINATION_START, STOP, PAUSE, RESUME, FAILED;
}

data class StartEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.START)

data class SourceToLocalStartEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.SOURCE_TO_LOCAL_START)

data class DumpStartEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.DUMP_START)

data class DumpUpdateEvent(override val sourceToDestination: SourceToDestination, val info: String) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.DUMP_UPDATE)

data class DumpFinishEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.DUMP_FINISH)

data class RestoreStartEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.RESTORE_START)

data class RestoreUpdateEvent(override val sourceToDestination: SourceToDestination, val info: String) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.RESTORE_UPDATE)

data class RestoreFinishEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.RESTORE_FINISH)

data class IndexRebuildStartEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.INDEX_REBUILD_START)

data class IndexRebuildFinishEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.INDEX_REBUILD_FINISH)

data class LocalToDestinationStartEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.LOCAL_TO_DESTINATION_START)

data class StopEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.STOP)

data class PauseEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.PAUSE)

data class ResumeEvent(override val sourceToDestination: SourceToDestination) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.RESUME)

data class FailedEvent(override val sourceToDestination: SourceToDestination, val throwable: Throwable? = null) :
StateEvent(sourceToDestination)
StateEvent(sourceToDestination, Type.FAILED)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import pl.allegro.tech.mongomigrationstream.core.mongo.SourceToDestination
import java.util.concurrent.ConcurrentHashMap

internal class StateEventStore {
private val events: ConcurrentHashMap<SourceToDestination, List<StateEvent>> = ConcurrentHashMap()
private val events: ConcurrentHashMap<SourceToDestination, ConcurrentHashMap<StateEvent.Type, StateEvent>> = ConcurrentHashMap()

fun storeEvent(event: StateEvent) {
events[event.sourceToDestination] = events.getOrDefault(event.sourceToDestination, emptyList()) + listOf(event)
val eventsForCollection = events.getOrDefault(event.sourceToDestination, ConcurrentHashMap())
eventsForCollection[event.type] = event
events[event.sourceToDestination] = eventsForCollection
}

fun getEvents(sourceToDestination: SourceToDestination): List<StateEvent> = events[sourceToDestination] ?: emptyList()
fun getAllEvents(): Map<SourceToDestination, List<StateEvent>> = events
fun getEvents(sourceToDestination: SourceToDestination): Map<StateEvent.Type, StateEvent> = events[sourceToDestination] ?: emptyMap()
fun getAllEvents(): Map<SourceToDestination, Map<StateEvent.Type, StateEvent>> = events
}
Loading