From 2c080a1c93979ee7f4ffa91c3e524f61d4972ede Mon Sep 17 00:00:00 2001 From: Szymon Marcinkiewicz Date: Thu, 9 Nov 2023 13:37:56 +0100 Subject: [PATCH] Fixed error handling in IndexClonerImpl (#33) --- .../core/index/IndexClonerImpl.kt | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/index/IndexClonerImpl.kt b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/index/IndexClonerImpl.kt index 3b9267a..fe7b74c 100644 --- a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/index/IndexClonerImpl.kt +++ b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/index/IndexClonerImpl.kt @@ -1,6 +1,5 @@ package pl.allegro.tech.mongomigrationstream.core.index -import com.mongodb.MongoException import com.mongodb.client.MongoDatabase import io.github.oshai.kotlinlogging.KotlinLogging import org.bson.Document @@ -10,7 +9,6 @@ import pl.allegro.tech.mongomigrationstream.core.performer.IndexCloner import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.IndexRebuildFinishEvent import pl.allegro.tech.mongomigrationstream.core.state.StateEvent.IndexRebuildStartEvent import pl.allegro.tech.mongomigrationstream.core.state.StateInfo -import java.util.concurrent.CompletableFuture private val logger = KotlinLogging.logger {} @@ -23,43 +21,46 @@ internal class IndexClonerImpl( private val executor = MigrationExecutors.createIndexClonerExecutor(sourceToDestination.source) override fun cloneIndexes() { - logger.info { "Cloning all indexes for collection: [${sourceToDestination.source.collectionName}]" } stateInfo.notifyStateChange(IndexRebuildStartEvent(sourceToDestination)) - executor.execute { - getRawSourceIndexes(sourceToDestination).map { - try { // TODO: this try-catch should be in createIndexOnDestinationCollection method - CompletableFuture.supplyAsync({ createIndexOnDestinationCollection(sourceToDestination, it) }, executor) - } catch (exception: MongoException) { - logger.error(exception) { "Error when creating index [${it.toJson()}] - skipping this index creation for collection [${sourceToDestination.source.collectionName}]" } - CompletableFuture.completedFuture(Unit) - } - }.map { it.join() } - - stateInfo.notifyStateChange(IndexRebuildFinishEvent(sourceToDestination)) + logger.info { "Cloning all indexes for collection: [${sourceToDestination.source.collectionName}]" } + getRawSourceIndexes(sourceToDestination).forEach { createIndexOnDestinationCollection(sourceToDestination, it) } } + stateInfo.notifyStateChange(IndexRebuildFinishEvent(sourceToDestination)) } private fun createIndexOnDestinationCollection( sourceToDestination: SourceToDestination, indexDefinition: Document ) { - destinationDb.runCommand( - Document().append("createIndexes", sourceToDestination.destination.collectionName) + try { + logger.info { "Creating index [${indexDefinition.toJson()}] on destination collection ${sourceToDestination.destination.collectionName}" } + val createIndexesCommand: Document = Document() + .append("createIndexes", sourceToDestination.destination.collectionName) .append("indexes", listOf(indexDefinition)) - ) + destinationDb.runCommand(createIndexesCommand) + } catch (t: Throwable) { + // Swallowing exception to allow other indexes to be created + logger.error(t) { "Error when creating index [${indexDefinition.toJson()}] - skipping this index creation for collection [${sourceToDestination.source.collectionName}]" } + } } - private fun getRawSourceIndexes(sourceToDestination: SourceToDestination): List = - sourceDb.getCollection(sourceToDestination.source.collectionName).listIndexes() - .toList() - .filterNot { it.get("key", Document::class.java) == Document().append("_id", 1) } - .map { - it.remove("ns") - it.remove("v") - it["background"] = true - it - } + private fun getRawSourceIndexes(sourceToDestination: SourceToDestination): List { + return try { + sourceDb.getCollection(sourceToDestination.source.collectionName).listIndexes() + .toList() + .filterNot { it.get("key", Document::class.java) == Document().append("_id", 1) } + .map { + it.remove("ns") + it.remove("v") + it["background"] = true + it + } + } catch (t: Throwable) { + logger.error(t) { "Can't get indexes for source collection [${sourceToDestination.source.collectionName}]" } + emptyList() + } + } override fun stop() { logger.info { "Trying to shut down IndexCloner gracefully..." }