diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4039bd1295123..3519f72448dd8 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1178,6 +1178,36 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { + val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) + abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + // We invoke abortAndPauseCleaning here because log cleaner runs asynchronously and replaceCurrentWithFutureLog + // invokes resumeCleaning which requires log cleaner's internal state to have a key for the given topic partition. + abortAndPauseCleaning(tp) + + if (currentLog.isDefined) + info(s"Attempting to recover abandoned future log for $tp at $futureLog and removing ${currentLog.get}") + else + info(s"Attempting to recover abandoned future log for $tp at $futureLog") + replaceCurrentWithFutureLog(currentLog, futureLog) + info(s"Successfully recovered abandoned future log for $tp") + } + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[(UnifiedLog, Option[UnifiedLog])] = { + futureLogs.values.flatMap { futureLog => + val topicId = futureLog.topicId.getOrElse { + throw new RuntimeException(s"The log dir $futureLog does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = futureLog.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) + .filter(pr => directoryId(futureLog.parentDir).contains(pr.directory(brokerId))) + .map(_ => (futureLog, Option(currentLogs.get(futureLog.topicPartition)).filter(currentLog => currentLog.topicId.contains(topicId)))) + } + } + /** * Mark the partition directory in the source log directory for deletion and * rename the future log of this partition in the destination log directory to be the current log @@ -1189,49 +1219,62 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) + info(s"The current replica is successfully replaced with the future replica for $topicPartition") + } + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { + val topicPartition = destLog.topicPartition - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { - cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) - resumeCleaning(topicPartition) + destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) + // the metrics tags still contain "future", so we have to remove it. + // we will add metrics back after sourceLog remove the metrics + destLog.removeLogMetrics() + if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) + } + + // Now that future replica has been successfully renamed to be the current replica + // Update the cached map and log cleaner as appropriate. + futureLogs.remove(topicPartition) + currentLogs.put(topicPartition, destLog) + if (cleaner != null) { + sourceLog.foreach { srcLog => + cleaner.alterCheckpointDir(topicPartition, srcLog.parentDirFile, destLog.parentDirFile) } + resumeCleaning(topicPartition) + } - try { - sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) + try { + sourceLog.foreach { srcLog => + srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. - sourceLog.close() - val logDir = sourceLog.parentDirFile + srcLog.close() + val logDir = srcLog.parentDirFile val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) - sourceLog.removeLogMetrics() - destLog.newMetrics() - addLogToBeDeleted(sourceLog) - } catch { - case e: KafkaStorageException => - // If sourceLog's log directory is offline, we need close its handlers here. - // handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map - sourceLog.closeHandlers() - sourceLog.removeLogMetrics() - throw e + srcLog.removeLogMetrics() + addLogToBeDeleted(srcLog) } - - info(s"The current replica is successfully replaced with the future replica for $topicPartition") + destLog.newMetrics() + } catch { + case e: KafkaStorageException => + // If sourceLog's log directory is offline, we need close its handlers here. + // handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map + sourceLog.foreach { srcLog => + srcLog.closeHandlers() + srcLog.removeLogMetrics() + } + throw e } } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 11e3ca7fddce7..048a665757b74 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -294,6 +294,14 @@ class BrokerMetadataPublisher( isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log) ) + // Rename all future replicas which are in the same directory as the + // one assigned by the controller. This can only happen due to a disk + // failure and broker shutdown after the directory assignment has been + // updated in the controller but before the future replica could be + // promoted. + // See KAFKA-16082 for details. + logManager.recoverAbandonedFutureLogs(brokerId, newImage.topics()) + // Make the LogCleaner available for reconfiguration. We can't do this prior to this // point because LogManager#startup creates the LogCleaner object, if // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 50be178f76035..8fc23a075eee4 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -17,10 +17,12 @@ package kafka.server +import kafka.log.UnifiedLog import kafka.network.SocketServer import kafka.server.IntegrationTestUtils.connectAndReceive import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils +import org.apache.commons.io.FileUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin._ import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} @@ -1402,6 +1404,129 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2). + setBrokerNodes(3, 2). + setNumControllerNodes(1).build()). + build() + try { + cluster.format() + cluster.startup() + val admin = Admin.create(cluster.clientProperties()) + try { + val broker0 = cluster.brokers().get(0) + val broker1 = cluster.brokers().get(1) + val foo0 = new TopicPartition("foo", 0) + + admin.createTopics(Arrays.asList( + new NewTopic("foo", 3, 3.toShort))).all().get() + + // Wait until foo-0 is created on broker0. + TestUtils.retry(60000) { + assertTrue(broker0.logManager.getLog(foo0).isDefined) + } + + // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2] + broker0.shutdown() + TestUtils.retry(60000) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(1, 2), info.get.isr().asScala.toSet) + } + + // Modify foo-0 so that it refers to a future replica. + // This is equivalent to a failure during the promotion of the future replica and a restart with directory for + // the main replica being offline + val log = broker0.logManager.getLog(foo0).get + log.renameDir(UnifiedLog.logFutureDirName(foo0), shouldReinitialize = false) + + // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2] + broker0.startup() + TestUtils.retry(60000) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet) + assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty) + } + } finally { + admin.close() + } + } finally { + cluster.close() + } + } + + @Test + def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2). + setBrokerNodes(3, 2). + setNumControllerNodes(1).build()). + build() + try { + cluster.format() + cluster.startup() + val admin = Admin.create(cluster.clientProperties()) + try { + val broker0 = cluster.brokers().get(0) + val broker1 = cluster.brokers().get(1) + val foo0 = new TopicPartition("foo", 0) + + admin.createTopics(Arrays.asList( + new NewTopic("foo", 3, 3.toShort))).all().get() + + // Wait until foo-0 is created on broker0. + TestUtils.retry(60000) { + assertTrue(broker0.logManager.getLog(foo0).isDefined) + } + + // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2] + broker0.shutdown() + TestUtils.retry(60000) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(1, 2), info.get.isr().asScala.toSet) + } + + val log = broker0.logManager.getLog(foo0).get + + // Copy foo-0 to targetParentDir + // This is so that we can rename the main replica to a future down below + val parentDir = log.parentDir + val targetParentDir = broker0.config.logDirs.filter(_ != parentDir).head + val targetDirFile = new File(targetParentDir, log.dir.getName) + FileUtils.copyDirectory(log.dir, targetDirFile) + assertTrue(targetDirFile.exists()) + + // Rename original log to a future + // This is equivalent to a failure during the promotion of the future replica and a restart with directory for + // the main replica being online + val originalLogFile = log.dir + log.renameDir(UnifiedLog.logFutureDirName(foo0), shouldReinitialize = false) + assertFalse(originalLogFile.exists()) + + // Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2] + broker0.startup() + TestUtils.retry(60000) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet) + assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty) + assertFalse(targetDirFile.exists()) + assertTrue(originalLogFile.exists()) + } + } finally { + admin.close() + } + } finally { + cluster.close() + } + } } class BadAuthorizer() extends Authorizer {