[SPARK-22145][MESOS] fix supervise with checkpointing on mesos

## What changes were proposed in this pull request?

- Fixes the issue with the frameworkId being recovered by checkpointed data overwriting the one sent by the dipatcher.
- Keeps submission driver id as the only index for all data structures in the dispatcher.
Allocates a different task id per driver retry to satisfy the mesos requirements. Check the relevant ticket for the details on that.
## How was this patch tested?

Manually tested this with DC/OS 1.10. Launched a streaming job with checkpointing to hdfs, made the driver fail several times and observed behavior:
![image](https://user-images.githubusercontent.com/7945591/30940500-f7d2a744-a3e9-11e7-8c56-f2ccbb271e80.png)

![image](https://user-images.githubusercontent.com/7945591/30940550-19bc15de-a3ea-11e7-8a11-f48abfe36720.png)

![image](https://user-images.githubusercontent.com/7945591/30940524-083ea308-a3ea-11e7-83ae-00d3fa17b928.png)

![image](https://user-images.githubusercontent.com/7945591/30940579-2f0fb242-a3ea-11e7-82f9-86179da28b8c.png)

![image](https://user-images.githubusercontent.com/7945591/30940591-3b561b0e-a3ea-11e7-9dbd-e71912bb2ef3.png)

![image](https://user-images.githubusercontent.com/7945591/30940605-49c810ca-a3ea-11e7-8af5-67930851fd38.png)

![image](https://user-images.githubusercontent.com/7945591/30940631-59f4a288-a3ea-11e7-88cb-c3741b72bb13.png)

![image](https://user-images.githubusercontent.com/7945591/30940642-62346c9e-a3ea-11e7-8935-82e494925f67.png)

![image](https://user-images.githubusercontent.com/7945591/30940653-6c46d53c-a3ea-11e7-8dd1-5840d484d28c.png)

Author: Stavros Kontopoulos <st.kontopoulos@gmail.com>

Closes #19374 from skonto/fix_retry.
This commit is contained in:
Stavros Kontopoulos 2017-11-02 13:25:48 +00:00 committed by Sean Owen
parent 277b1924b4
commit b2463fad71
3 changed files with 57 additions and 37 deletions

View file

@ -310,6 +310,7 @@ class SparkContext(config: SparkConf) extends Logging {
* (i.e.
* in case of local spark app something like 'local-1433865536131'
* in case of YARN something like 'application_1433865536131_34483'
* in case of MESOS something like 'driver-20170926223339-0001'
* )
*/
def applicationId: String = _applicationId

View file

@ -134,22 +134,24 @@ private[spark] class MesosClusterScheduler(
private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new Object()
// Keyed by submission id
private val finishedDrivers =
new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
private var frameworkId: String = null
// Holds all the launched drivers and current launch state, keyed by driver id.
// Holds all the launched drivers and current launch state, keyed by submission id.
private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
// Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
// All drivers that are loaded after failover are added here, as we need get the latest
// state of the tasks from Mesos.
// state of the tasks from Mesos. Keyed by task Id.
private val pendingRecover = new mutable.HashMap[String, SlaveID]()
// Stores all the submitted drivers that hasn't been launched.
// Stores all the submitted drivers that hasn't been launched, keyed by submission id
private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
// All supervised drivers that are waiting to retry after termination.
// All supervised drivers that are waiting to retry after termination, keyed by submission id
private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
private val queuedDriversState = engineFactory.createEngine("driverQueue")
private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
private val pendingRetryDriversState = engineFactory.createEngine("retryList")
private final val RETRY_SEP = "-retry-"
// Flag to mark if the scheduler is ready to be called, which is until the scheduler
// is registered with Mesos master.
@volatile protected var ready = false
@ -192,8 +194,8 @@ private[spark] class MesosClusterScheduler(
// 3. Check if it's in the retry list.
// 4. Check if it has already completed.
if (launchedDrivers.contains(submissionId)) {
val task = launchedDrivers(submissionId)
schedulerDriver.killTask(task.taskId)
val state = launchedDrivers(submissionId)
schedulerDriver.killTask(state.taskId)
k.success = true
k.message = "Killing running driver"
} else if (removeFromQueuedDrivers(submissionId)) {
@ -275,7 +277,7 @@ private[spark] class MesosClusterScheduler(
private def recoverState(): Unit = {
stateLock.synchronized {
launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
launchedDrivers(state.taskId.getValue) = state
launchedDrivers(state.driverDescription.submissionId) = state
pendingRecover(state.taskId.getValue) = state.slaveId
}
queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
@ -353,7 +355,8 @@ private[spark] class MesosClusterScheduler(
.setSlaveId(slaveId)
.setState(MesosTaskState.TASK_STAGING)
.build()
launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus))
launchedDrivers.get(getSubmissionIdFromTaskId(taskId))
.map(_.mesosTaskStatus.getOrElse(newStatus))
.getOrElse(newStatus)
}
// TODO: Page the status updates to avoid trying to reconcile
@ -369,10 +372,19 @@ private[spark] class MesosClusterScheduler(
}
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" }.getOrElse("")
val retries = desc.retryState.map { d => s"${RETRY_SEP}${d.retries.toString}" }.getOrElse("")
s"${frameworkId}-${desc.submissionId}${retries}"
}
private def getDriverTaskId(desc: MesosDriverDescription): String = {
val sId = desc.submissionId
desc.retryState.map(state => sId + s"${RETRY_SEP}${state.retries.toString}").getOrElse(sId)
}
private def getSubmissionIdFromTaskId(taskId: String): String = {
taskId.split(s"${RETRY_SEP}").head
}
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
m.updated(k, f(m.getOrElse(k, default)))
}
@ -551,7 +563,7 @@ private[spark] class MesosClusterScheduler(
}
private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
val taskId = TaskID.newBuilder().setValue(getDriverTaskId(desc)).build()
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.remainingResources, "cpus", desc.cores)
@ -604,7 +616,7 @@ private[spark] class MesosClusterScheduler(
val task = createTaskInfo(submission, offer)
queuedTasks += task
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
submission.submissionId + s" with taskId: ${task.getTaskId.toString}")
val newState = new MesosClusterSubmissionState(
submission,
task.getTaskId,
@ -718,31 +730,43 @@ private[spark] class MesosClusterScheduler(
logInfo(s"Received status update: taskId=${taskId}" +
s" state=${status.getState}" +
s" message=${status.getMessage}" +
s" reason=${status.getReason}");
s" reason=${status.getReason}")
stateLock.synchronized {
if (launchedDrivers.contains(taskId)) {
val subId = getSubmissionIdFromTaskId(taskId)
if (launchedDrivers.contains(subId)) {
if (status.getReason == Reason.REASON_RECONCILIATION &&
!pendingRecover.contains(taskId)) {
// Task has already received update and no longer requires reconciliation.
return
}
val state = launchedDrivers(taskId)
val state = launchedDrivers(subId)
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
removeFromLaunchedDrivers(taskId)
removeFromLaunchedDrivers(subId)
state.finishDate = Some(new Date())
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
.getOrElse{ (1, 1) }
val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
val newDriverDescription = state.driverDescription.copy(
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
addDriverToPending(newDriverDescription, taskId);
addDriverToPending(newDriverDescription, newDriverDescription.submissionId)
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
removeFromLaunchedDrivers(taskId)
retireDriver(subId, state)
}
state.mesosTaskStatus = Option(status)
} else {
logError(s"Unable to find driver with $taskId in status update")
}
}
}
private def retireDriver(
submissionId: String,
state: MesosClusterSubmissionState) = {
removeFromLaunchedDrivers(submissionId)
state.finishDate = Some(new Date())
if (finishedDrivers.size >= retainedDrivers) {
val toRemove = math.max(retainedDrivers / 10, 1)
@ -750,12 +774,6 @@ private[spark] class MesosClusterScheduler(
}
finishedDrivers += state
}
state.mesosTaskStatus = Option(status)
} else {
logError(s"Unable to find driver $taskId in status update")
}
}
}
override def frameworkMessage(
driver: SchedulerDriver,
@ -769,31 +787,31 @@ private[spark] class MesosClusterScheduler(
slaveId: SlaveID,
status: Int): Unit = {}
private def removeFromQueuedDrivers(id: String): Boolean = {
val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
private def removeFromQueuedDrivers(subId: String): Boolean = {
val index = queuedDrivers.indexWhere(_.submissionId.equals(subId))
if (index != -1) {
queuedDrivers.remove(index)
queuedDriversState.expunge(id)
queuedDriversState.expunge(subId)
true
} else {
false
}
}
private def removeFromLaunchedDrivers(id: String): Boolean = {
if (launchedDrivers.remove(id).isDefined) {
launchedDriversState.expunge(id)
private def removeFromLaunchedDrivers(subId: String): Boolean = {
if (launchedDrivers.remove(subId).isDefined) {
launchedDriversState.expunge(subId)
true
} else {
false
}
}
private def removeFromPendingRetryDrivers(id: String): Boolean = {
val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
private def removeFromPendingRetryDrivers(subId: String): Boolean = {
val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(subId))
if (index != -1) {
pendingRetryDrivers.remove(index)
pendingRetryDriversState.expunge(id)
pendingRetryDriversState.expunge(subId)
true
} else {
false
@ -810,8 +828,8 @@ private[spark] class MesosClusterScheduler(
revive()
}
private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = {
pendingRetryDriversState.persist(taskId, desc)
private def addDriverToPending(desc: MesosDriverDescription, subId: String) = {
pendingRetryDriversState.persist(subId, desc)
pendingRetryDrivers += desc
revive()
}

View file

@ -58,7 +58,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
"spark.yarn.credentials.file",
"spark.yarn.credentials.renewalTime",
"spark.yarn.credentials.updateTime",
"spark.ui.filters")
"spark.ui.filters",
"spark.mesos.driver.frameworkId")
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
.remove("spark.driver.host")