From b2463fad718d25f564d62c50d587610de3d0c5bd Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 2 Nov 2017 13:25:48 +0000 Subject: [PATCH] [SPARK-22145][MESOS] fix supervise with checkpointing on mesos ## What changes were proposed in this pull request? - Fixes the issue with the frameworkId being recovered by checkpointed data overwriting the one sent by the dipatcher. - Keeps submission driver id as the only index for all data structures in the dispatcher. Allocates a different task id per driver retry to satisfy the mesos requirements. Check the relevant ticket for the details on that. ## How was this patch tested? Manually tested this with DC/OS 1.10. Launched a streaming job with checkpointing to hdfs, made the driver fail several times and observed behavior: ![image](https://user-images.githubusercontent.com/7945591/30940500-f7d2a744-a3e9-11e7-8c56-f2ccbb271e80.png) ![image](https://user-images.githubusercontent.com/7945591/30940550-19bc15de-a3ea-11e7-8a11-f48abfe36720.png) ![image](https://user-images.githubusercontent.com/7945591/30940524-083ea308-a3ea-11e7-83ae-00d3fa17b928.png) ![image](https://user-images.githubusercontent.com/7945591/30940579-2f0fb242-a3ea-11e7-82f9-86179da28b8c.png) ![image](https://user-images.githubusercontent.com/7945591/30940591-3b561b0e-a3ea-11e7-9dbd-e71912bb2ef3.png) ![image](https://user-images.githubusercontent.com/7945591/30940605-49c810ca-a3ea-11e7-8af5-67930851fd38.png) ![image](https://user-images.githubusercontent.com/7945591/30940631-59f4a288-a3ea-11e7-88cb-c3741b72bb13.png) ![image](https://user-images.githubusercontent.com/7945591/30940642-62346c9e-a3ea-11e7-8935-82e494925f67.png) ![image](https://user-images.githubusercontent.com/7945591/30940653-6c46d53c-a3ea-11e7-8dd1-5840d484d28c.png) Author: Stavros Kontopoulos Closes #19374 from skonto/fix_retry. --- .../scala/org/apache/spark/SparkContext.scala | 1 + .../cluster/mesos/MesosClusterScheduler.scala | 90 +++++++++++-------- .../apache/spark/streaming/Checkpoint.scala | 3 +- 3 files changed, 57 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6f25d346e6e54..c7dd635ad4c96 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -310,6 +310,7 @@ class SparkContext(config: SparkConf) extends Logging { * (i.e. * in case of local spark app something like 'local-1433865536131' * in case of YARN something like 'application_1433865536131_34483' + * in case of MESOS something like 'driver-20170926223339-0001' * ) */ def applicationId: String = _applicationId diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 82470264f2a4a..de846c85d53a6 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -134,22 +134,24 @@ private[spark] class MesosClusterScheduler( private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false) private val schedulerState = engineFactory.createEngine("scheduler") private val stateLock = new Object() + // Keyed by submission id private val finishedDrivers = new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers) private var frameworkId: String = null - // Holds all the launched drivers and current launch state, keyed by driver id. + // Holds all the launched drivers and current launch state, keyed by submission id. private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]() // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation. // All drivers that are loaded after failover are added here, as we need get the latest - // state of the tasks from Mesos. + // state of the tasks from Mesos. Keyed by task Id. private val pendingRecover = new mutable.HashMap[String, SlaveID]() - // Stores all the submitted drivers that hasn't been launched. + // Stores all the submitted drivers that hasn't been launched, keyed by submission id private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]() - // All supervised drivers that are waiting to retry after termination. + // All supervised drivers that are waiting to retry after termination, keyed by submission id private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]() private val queuedDriversState = engineFactory.createEngine("driverQueue") private val launchedDriversState = engineFactory.createEngine("launchedDrivers") private val pendingRetryDriversState = engineFactory.createEngine("retryList") + private final val RETRY_SEP = "-retry-" // Flag to mark if the scheduler is ready to be called, which is until the scheduler // is registered with Mesos master. @volatile protected var ready = false @@ -192,8 +194,8 @@ private[spark] class MesosClusterScheduler( // 3. Check if it's in the retry list. // 4. Check if it has already completed. if (launchedDrivers.contains(submissionId)) { - val task = launchedDrivers(submissionId) - schedulerDriver.killTask(task.taskId) + val state = launchedDrivers(submissionId) + schedulerDriver.killTask(state.taskId) k.success = true k.message = "Killing running driver" } else if (removeFromQueuedDrivers(submissionId)) { @@ -275,7 +277,7 @@ private[spark] class MesosClusterScheduler( private def recoverState(): Unit = { stateLock.synchronized { launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state => - launchedDrivers(state.taskId.getValue) = state + launchedDrivers(state.driverDescription.submissionId) = state pendingRecover(state.taskId.getValue) = state.slaveId } queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d) @@ -353,7 +355,8 @@ private[spark] class MesosClusterScheduler( .setSlaveId(slaveId) .setState(MesosTaskState.TASK_STAGING) .build() - launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus)) + launchedDrivers.get(getSubmissionIdFromTaskId(taskId)) + .map(_.mesosTaskStatus.getOrElse(newStatus)) .getOrElse(newStatus) } // TODO: Page the status updates to avoid trying to reconcile @@ -369,10 +372,19 @@ private[spark] class MesosClusterScheduler( } private def getDriverFrameworkID(desc: MesosDriverDescription): String = { - val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" }.getOrElse("") + val retries = desc.retryState.map { d => s"${RETRY_SEP}${d.retries.toString}" }.getOrElse("") s"${frameworkId}-${desc.submissionId}${retries}" } + private def getDriverTaskId(desc: MesosDriverDescription): String = { + val sId = desc.submissionId + desc.retryState.map(state => sId + s"${RETRY_SEP}${state.retries.toString}").getOrElse(sId) + } + + private def getSubmissionIdFromTaskId(taskId: String): String = { + taskId.split(s"${RETRY_SEP}").head + } + private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { m.updated(k, f(m.getOrElse(k, default))) } @@ -551,7 +563,7 @@ private[spark] class MesosClusterScheduler( } private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = { - val taskId = TaskID.newBuilder().setValue(desc.submissionId).build() + val taskId = TaskID.newBuilder().setValue(getDriverTaskId(desc)).build() val (remainingResources, cpuResourcesToUse) = partitionResources(offer.remainingResources, "cpus", desc.cores) @@ -604,7 +616,7 @@ private[spark] class MesosClusterScheduler( val task = createTaskInfo(submission, offer) queuedTasks += task logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " + - submission.submissionId) + submission.submissionId + s" with taskId: ${task.getTaskId.toString}") val newState = new MesosClusterSubmissionState( submission, task.getTaskId, @@ -718,45 +730,51 @@ private[spark] class MesosClusterScheduler( logInfo(s"Received status update: taskId=${taskId}" + s" state=${status.getState}" + s" message=${status.getMessage}" + - s" reason=${status.getReason}"); + s" reason=${status.getReason}") stateLock.synchronized { - if (launchedDrivers.contains(taskId)) { + val subId = getSubmissionIdFromTaskId(taskId) + if (launchedDrivers.contains(subId)) { if (status.getReason == Reason.REASON_RECONCILIATION && !pendingRecover.contains(taskId)) { // Task has already received update and no longer requires reconciliation. return } - val state = launchedDrivers(taskId) + val state = launchedDrivers(subId) // Check if the driver is supervise enabled and can be relaunched. if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { - removeFromLaunchedDrivers(taskId) + removeFromLaunchedDrivers(subId) state.finishDate = Some(new Date()) val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState val (retries, waitTimeSec) = retryState .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) } .getOrElse{ (1, 1) } val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) - val newDriverDescription = state.driverDescription.copy( retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec))) - addDriverToPending(newDriverDescription, taskId); + addDriverToPending(newDriverDescription, newDriverDescription.submissionId) } else if (TaskState.isFinished(mesosToTaskState(status.getState))) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - if (finishedDrivers.size >= retainedDrivers) { - val toRemove = math.max(retainedDrivers / 10, 1) - finishedDrivers.trimStart(toRemove) - } - finishedDrivers += state + retireDriver(subId, state) } state.mesosTaskStatus = Option(status) } else { - logError(s"Unable to find driver $taskId in status update") + logError(s"Unable to find driver with $taskId in status update") } } } + private def retireDriver( + submissionId: String, + state: MesosClusterSubmissionState) = { + removeFromLaunchedDrivers(submissionId) + state.finishDate = Some(new Date()) + if (finishedDrivers.size >= retainedDrivers) { + val toRemove = math.max(retainedDrivers / 10, 1) + finishedDrivers.trimStart(toRemove) + } + finishedDrivers += state + } + override def frameworkMessage( driver: SchedulerDriver, executorId: ExecutorID, @@ -769,31 +787,31 @@ private[spark] class MesosClusterScheduler( slaveId: SlaveID, status: Int): Unit = {} - private def removeFromQueuedDrivers(id: String): Boolean = { - val index = queuedDrivers.indexWhere(_.submissionId.equals(id)) + private def removeFromQueuedDrivers(subId: String): Boolean = { + val index = queuedDrivers.indexWhere(_.submissionId.equals(subId)) if (index != -1) { queuedDrivers.remove(index) - queuedDriversState.expunge(id) + queuedDriversState.expunge(subId) true } else { false } } - private def removeFromLaunchedDrivers(id: String): Boolean = { - if (launchedDrivers.remove(id).isDefined) { - launchedDriversState.expunge(id) + private def removeFromLaunchedDrivers(subId: String): Boolean = { + if (launchedDrivers.remove(subId).isDefined) { + launchedDriversState.expunge(subId) true } else { false } } - private def removeFromPendingRetryDrivers(id: String): Boolean = { - val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id)) + private def removeFromPendingRetryDrivers(subId: String): Boolean = { + val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(subId)) if (index != -1) { pendingRetryDrivers.remove(index) - pendingRetryDriversState.expunge(id) + pendingRetryDriversState.expunge(subId) true } else { false @@ -810,8 +828,8 @@ private[spark] class MesosClusterScheduler( revive() } - private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = { - pendingRetryDriversState.persist(taskId, desc) + private def addDriverToPending(desc: MesosDriverDescription, subId: String) = { + pendingRetryDriversState.persist(subId, desc) pendingRetryDrivers += desc revive() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index b8c780db07c98..40a0b8e3a407d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -58,7 +58,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) "spark.yarn.credentials.file", "spark.yarn.credentials.renewalTime", "spark.yarn.credentials.updateTime", - "spark.ui.filters") + "spark.ui.filters", + "spark.mesos.driver.frameworkId") val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host")