Skip to content

Commit

Permalink
shutdown revert
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Dec 17, 2024
1 parent feba418 commit af0a543
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait MiniClusterFeature extends Logging {
val workerInfos = new mutable.HashMap[Worker, Thread]()
val indToWorkerInfos = new mutable.HashMap[Int, (Worker, Thread)]()
var workerConfForAdding: Map[String, String] = _
var testKillWorker: Int = -1
var testKillWorker: (Int, Thread) = (-1, null)

class RunnerWrap[T](code: => T) extends Thread {

Expand Down Expand Up @@ -200,7 +200,7 @@ trait MiniClusterFeature extends Logging {
worker.initialize()
} catch {
case ex: Exception =>
if (testKillWorker != (i - 1)) {
if (testKillWorker._1 != (i - 1)) {
if (workers(i - 1) != null) {
workers(i - 1).shutdownGracefully()
}
Expand Down Expand Up @@ -269,17 +269,20 @@ trait MiniClusterFeature extends Logging {
workerInfo = indToWorkerInfos.get(ind)
}
if (workerInfo.nonEmpty && ind < workerNum) {
testKillWorker = ind
testKillWorker = (ind, null)
}
val killerThread = new RunnerWrap({
Thread.sleep(sleepTime)
if (testKillWorker != -1) {
if (testKillWorker._1 != -1) {
workerInfo.get._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
workerInfo.get._1.rpcEnv.shutdown()
workerInfo.get._2.interrupt()
}
})
killerThread.start()
if (testKillWorker._1 != -1) {
testKillWorker = (ind, killerThread)
}
}

def shutdownMiniCluster(): Unit = {
Expand All @@ -301,6 +304,10 @@ trait MiniClusterFeature extends Logging {
worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
thread.interrupt()
}
if (testKillWorker._1 != -1) {
testKillWorker._2.interrupt()
testKillWorker = (-1, null)
}
workerInfos.clear()
masterInfo._2.interrupt()
MemoryManager.reset()
Expand Down

0 comments on commit af0a543

Please sign in to comment.