Skip to content

Commit

Permalink
Fixed error handling in IndexClonerImpl (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
pitagoras3 authored Nov 9, 2023
1 parent 86e0662 commit 2c080a1
Showing 1 changed file with 28 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.allegro.tech.mongomigrationstream.core.index

import com.mongodb.MongoException
import com.mongodb.client.MongoDatabase
import io.github.oshai.kotlinlogging.KotlinLogging
import org.bson.Document
Expand All @@ -10,7 +9,6 @@ import pl.allegro.tech.mongomigrationstream.core.performer.IndexCloner
import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.IndexRebuildFinishEvent
import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.IndexRebuildStartEvent
import pl.allegro.tech.mongomigrationstream.core.state.StateInfo
import java.util.concurrent.CompletableFuture

private val logger = KotlinLogging.logger {}

Expand All @@ -23,43 +21,46 @@ internal class IndexClonerImpl(
private val executor = MigrationExecutors.createIndexClonerExecutor(sourceToDestination.source)

override fun cloneIndexes() {
logger.info { "Cloning all indexes for collection: [${sourceToDestination.source.collectionName}]" }
stateInfo.notifyStateChange(IndexRebuildStartEvent(sourceToDestination))

executor.execute {
getRawSourceIndexes(sourceToDestination).map {
try { // TODO: this try-catch should be in createIndexOnDestinationCollection method
CompletableFuture.supplyAsync({ createIndexOnDestinationCollection(sourceToDestination, it) }, executor)
} catch (exception: MongoException) {
logger.error(exception) { "Error when creating index [${it.toJson()}] - skipping this index creation for collection [${sourceToDestination.source.collectionName}]" }
CompletableFuture.completedFuture(Unit)
}
}.map { it.join() }

stateInfo.notifyStateChange(IndexRebuildFinishEvent(sourceToDestination))
logger.info { "Cloning all indexes for collection: [${sourceToDestination.source.collectionName}]" }
getRawSourceIndexes(sourceToDestination).forEach { createIndexOnDestinationCollection(sourceToDestination, it) }
}
stateInfo.notifyStateChange(IndexRebuildFinishEvent(sourceToDestination))
}

private fun createIndexOnDestinationCollection(
sourceToDestination: SourceToDestination,
indexDefinition: Document
) {
destinationDb.runCommand(
Document().append("createIndexes", sourceToDestination.destination.collectionName)
try {
logger.info { "Creating index [${indexDefinition.toJson()}] on destination collection ${sourceToDestination.destination.collectionName}" }
val createIndexesCommand: Document = Document()
.append("createIndexes", sourceToDestination.destination.collectionName)
.append("indexes", listOf(indexDefinition))
)
destinationDb.runCommand(createIndexesCommand)
} catch (t: Throwable) {
// Swallowing exception to allow other indexes to be created
logger.error(t) { "Error when creating index [${indexDefinition.toJson()}] - skipping this index creation for collection [${sourceToDestination.source.collectionName}]" }
}
}

private fun getRawSourceIndexes(sourceToDestination: SourceToDestination): List<Document> =
sourceDb.getCollection(sourceToDestination.source.collectionName).listIndexes()
.toList()
.filterNot { it.get("key", Document::class.java) == Document().append("_id", 1) }
.map {
it.remove("ns")
it.remove("v")
it["background"] = true
it
}
private fun getRawSourceIndexes(sourceToDestination: SourceToDestination): List<Document> {
return try {
sourceDb.getCollection(sourceToDestination.source.collectionName).listIndexes()
.toList()
.filterNot { it.get("key", Document::class.java) == Document().append("_id", 1) }
.map {
it.remove("ns")
it.remove("v")
it["background"] = true
it
}
} catch (t: Throwable) {
logger.error(t) { "Can't get indexes for source collection [${sourceToDestination.source.collectionName}]" }
emptyList()
}
}

override fun stop() {
logger.info { "Trying to shut down IndexCloner gracefully..." }
Expand Down

0 comments on commit 2c080a1

Please sign in to comment.