Skip to content


[SPARK-22145][MESOS] fix supervise with checkpointing on mesos
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

- Fixes the issue with the frameworkId being recovered by checkpointed data overwriting the one sent by the dipatcher.
- Keeps submission driver id as the only index for all data structures in the dispatcher.
Allocates a different task id per driver retry to satisfy the mesos requirements. Check the relevant ticket for the details on that.
## How was this patch tested?

Manually tested this with DC/OS 1.10. Launched a streaming job with checkpointing to hdfs, made the driver fail several times and observed behavior:









Author: Stavros Kontopoulos <[email protected]>

Closes apache#19374 from skonto/fix_retry.
  • Loading branch information
skonto authored and srowen committed Nov 2, 2017
1 parent 277b192 commit b2463fa
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 37 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ class SparkContext(config: SparkConf) extends Logging {
* (i.e.
* in case of local spark app something like 'local-1433865536131'
* in case of YARN something like 'application_1433865536131_34483'
* in case of MESOS something like 'driver-20170926223339-0001'
* )
def applicationId: String = _applicationId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,22 +134,24 @@ private[spark] class MesosClusterScheduler(
private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new Object()
// Keyed by submission id
private val finishedDrivers =
new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
private var frameworkId: String = null
// Holds all the launched drivers and current launch state, keyed by driver id.
// Holds all the launched drivers and current launch state, keyed by submission id.
private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
// Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
// All drivers that are loaded after failover are added here, as we need get the latest
// state of the tasks from Mesos.
// state of the tasks from Mesos. Keyed by task Id.
private val pendingRecover = new mutable.HashMap[String, SlaveID]()
// Stores all the submitted drivers that hasn't been launched.
// Stores all the submitted drivers that hasn't been launched, keyed by submission id
private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
// All supervised drivers that are waiting to retry after termination.
// All supervised drivers that are waiting to retry after termination, keyed by submission id
private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
private val queuedDriversState = engineFactory.createEngine("driverQueue")
private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
private val pendingRetryDriversState = engineFactory.createEngine("retryList")
private final val RETRY_SEP = "-retry-"
// Flag to mark if the scheduler is ready to be called, which is until the scheduler
// is registered with Mesos master.
@volatile protected var ready = false
Expand Down Expand Up @@ -192,8 +194,8 @@ private[spark] class MesosClusterScheduler(
// 3. Check if it's in the retry list.
// 4. Check if it has already completed.
if (launchedDrivers.contains(submissionId)) {
val task = launchedDrivers(submissionId)
val state = launchedDrivers(submissionId)
k.success = true
k.message = "Killing running driver"
} else if (removeFromQueuedDrivers(submissionId)) {
Expand Down Expand Up @@ -275,7 +277,7 @@ private[spark] class MesosClusterScheduler(
private def recoverState(): Unit = {
stateLock.synchronized {
launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
launchedDrivers(state.taskId.getValue) = state
launchedDrivers(state.driverDescription.submissionId) = state
pendingRecover(state.taskId.getValue) = state.slaveId
queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
Expand Down Expand Up @@ -353,7 +355,8 @@ private[spark] class MesosClusterScheduler(
// TODO: Page the status updates to avoid trying to reconcile
Expand All @@ -369,10 +372,19 @@ private[spark] class MesosClusterScheduler(

private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
val retries = { d => s"-retry-${d.retries.toString}" }.getOrElse("")
val retries = { d => s"${RETRY_SEP}${d.retries.toString}" }.getOrElse("")

private def getDriverTaskId(desc: MesosDriverDescription): String = {
val sId = desc.submissionId => sId + s"${RETRY_SEP}${state.retries.toString}").getOrElse(sId)

private def getSubmissionIdFromTaskId(taskId: String): String = {

private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
m.updated(k, f(m.getOrElse(k, default)))
Expand Down Expand Up @@ -551,7 +563,7 @@ private[spark] class MesosClusterScheduler(

private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
val taskId = TaskID.newBuilder().setValue(getDriverTaskId(desc)).build()

val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.remainingResources, "cpus", desc.cores)
Expand Down Expand Up @@ -604,7 +616,7 @@ private[spark] class MesosClusterScheduler(
val task = createTaskInfo(submission, offer)
queuedTasks += task
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId + s" with taskId: ${task.getTaskId.toString}")
val newState = new MesosClusterSubmissionState(
Expand Down Expand Up @@ -718,45 +730,51 @@ private[spark] class MesosClusterScheduler(
logInfo(s"Received status update: taskId=${taskId}" +
s" state=${status.getState}" +
s" message=${status.getMessage}" +
s" reason=${status.getReason}");
s" reason=${status.getReason}")

stateLock.synchronized {
if (launchedDrivers.contains(taskId)) {
val subId = getSubmissionIdFromTaskId(taskId)
if (launchedDrivers.contains(subId)) {
if (status.getReason == Reason.REASON_RECONCILIATION &&
!pendingRecover.contains(taskId)) {
// Task has already received update and no longer requires reconciliation.
val state = launchedDrivers(taskId)
val state = launchedDrivers(subId)
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
state.finishDate = Some(new Date())
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
.getOrElse{ (1, 1) }
val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)

val newDriverDescription = state.driverDescription.copy(
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
addDriverToPending(newDriverDescription, taskId);
addDriverToPending(newDriverDescription, newDriverDescription.submissionId)
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
state.finishDate = Some(new Date())
if (finishedDrivers.size >= retainedDrivers) {
val toRemove = math.max(retainedDrivers / 10, 1)
finishedDrivers += state
retireDriver(subId, state)
state.mesosTaskStatus = Option(status)
} else {
logError(s"Unable to find driver $taskId in status update")
logError(s"Unable to find driver with $taskId in status update")

private def retireDriver(
submissionId: String,
state: MesosClusterSubmissionState) = {
state.finishDate = Some(new Date())
if (finishedDrivers.size >= retainedDrivers) {
val toRemove = math.max(retainedDrivers / 10, 1)
finishedDrivers += state

override def frameworkMessage(
driver: SchedulerDriver,
executorId: ExecutorID,
Expand All @@ -769,31 +787,31 @@ private[spark] class MesosClusterScheduler(
slaveId: SlaveID,
status: Int): Unit = {}

private def removeFromQueuedDrivers(id: String): Boolean = {
val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
private def removeFromQueuedDrivers(subId: String): Boolean = {
val index = queuedDrivers.indexWhere(_.submissionId.equals(subId))
if (index != -1) {
} else {

private def removeFromLaunchedDrivers(id: String): Boolean = {
if (launchedDrivers.remove(id).isDefined) {
private def removeFromLaunchedDrivers(subId: String): Boolean = {
if (launchedDrivers.remove(subId).isDefined) {
} else {

private def removeFromPendingRetryDrivers(id: String): Boolean = {
val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
private def removeFromPendingRetryDrivers(subId: String): Boolean = {
val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(subId))
if (index != -1) {
} else {
Expand All @@ -810,8 +828,8 @@ private[spark] class MesosClusterScheduler(

private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = {
pendingRetryDriversState.persist(taskId, desc)
private def addDriverToPending(desc: MesosDriverDescription, subId: String) = {
pendingRetryDriversState.persist(subId, desc)
pendingRetryDrivers += desc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)

val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
Expand Down

0 comments on commit b2463fa

Please sign in to comment.