Skip to content

Commit

Permalink
feat: await partition shutdown (#552)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Dec 13, 2023
1 parent 19171c0 commit 20fd237
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ object ElasticLogManager {
INSTANCE.get.newSegment(topicPartition, baseOffset, time, fileSuffix)
}

def shutdownNow(): Unit = {
def shutdown(): Unit = {
INSTANCE.foreach(_.shutdownNow())
}
}
2 changes: 0 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,6 @@ class BrokerServer(
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)

CoreUtils.swallow(ElasticLogManager.shutdownNow(), this)

// log manager need clientToControllerChannelManager to send request to controller.
if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2769,8 +2769,12 @@ class ReplicaManager(val config: KafkaConfig,
info(s"try force stop partitions ${partitionsToClose.keys}")
stopPartitions(partitionsToClose)
}
checkAllPartitionClosed()
while (!checkAllPartitionClosed() && (System.currentTimeMillis() - start) < 30000) {
info("still has opening partition, retry check later")
Thread.sleep(1000)
}
info("await all partitions closed")
CoreUtils.swallow(ElasticLogManager.shutdown(), this)
}

def checkAllPartitionClosed(): Boolean = {
Expand Down

0 comments on commit 20fd237

Please sign in to comment.