[SPARK-21502][MESOS] fix --supervise for mesos in cluster mode

## What changes were proposed in this pull request?
With supervise enabled for a driver, re-launching it was failing because the driver had the same framework Id. This patch creates a new driver framework id every time we re-launch a driver, but we keep the driver submission id the same since that is the same with the task id the driver was launched with on mesos and retry state and other info within Dispatcher's data structures uses that as a key.
We append a "-retry-%4d" string as a suffix to the framework id passed by the dispatcher to the driver and the same value to the app_id created by each driver, except the first time where we dont need the retry suffix.
The previous format for the frameworkId was   'DispactherFId-DriverSubmissionId'.

We also detect the case where we have multiple spark contexts started from within the same driver and we do set proper names to their corresponding app-ids. The old practice was to unset the framework id passed from the dispatcher after the driver framework was started for the first time and let mesos decide the framework ID for subsequent spark contexts. The decided fId was passed as an appID.
This patch affects heavily the history server. Btw we dont have the issues of the standalone case where driver id must be different since the dispatcher will re-launch a driver(mesos task) only if it gets an update that it is dead and this is verified by mesos implicitly. We also dont fix the fine grained mode which is deprecated and of no use.

## How was this patch tested?

This task was manually tested on dc/os. Launched a driver, stoped its container and verified the expected behavior.

Initial retry of the driver, driver in pending state:

![image](https://user-images.githubusercontent.com/7945591/28473862-1088b736-6e4f-11e7-8d7d-7b785b1da6a6.png)

Driver re-launched:
![image](https://user-images.githubusercontent.com/7945591/28473885-26e02d16-6e4f-11e7-9eb8-6bf7bdb10cb8.png)

Another re-try:
![image](https://user-images.githubusercontent.com/7945591/28473897-35702318-6e4f-11e7-9585-fd295ad7c6b6.png)

The resulted entries in history server at the bottom:

![image](https://user-images.githubusercontent.com/7945591/28473910-4946dabc-6e4f-11e7-90a6-fa4f80893c61.png)

Regarding multiple spark contexts here is the end result regarding the spark history server, for the second spark context we add an increasing number as a suffix:

![image](https://user-images.githubusercontent.com/7945591/28474432-69cf8b06-6e51-11e7-93c7-e6c0b04dec93.png)

Author: Stavros Kontopoulos <st.kontopoulos@gmail.com>

Closes #18705 from skonto/fix_supervise_flag.
This commit is contained in:
Stavros Kontopoulos 2017-07-24 11:11:34 -07:00 committed by Marcelo Vanzin
parent 86664338f2
commit b09ec92a6b
2 changed files with 20 additions and 3 deletions

View file

@ -369,7 +369,8 @@ private[spark] class MesosClusterScheduler(
}
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
s"${frameworkId}-${desc.submissionId}"
val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" }.getOrElse("")
s"${frameworkId}-${desc.submissionId}${retries}"
}
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {

View file

@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
@ -170,6 +171,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
override def start() {
super.start()
val startedBefore = IdHelper.startedBefore.getAndSet(true)
val suffix = if (startedBefore) {
f"-${IdHelper.nextSCNumber.incrementAndGet()}%04d"
} else {
""
}
val driver = createSchedulerDriver(
master,
MesosCoarseGrainedSchedulerBackend.this,
@ -179,10 +189,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
sc.conf.getOption("spark.mesos.driver.frameworkId")
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
)
unsetFrameworkID(sc)
startScheduler(driver)
}
@ -271,6 +280,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
driver: org.apache.mesos.SchedulerDriver,
frameworkId: FrameworkID,
masterInfo: MasterInfo) {
this.appId = frameworkId.getValue
this.mesosExternalShuffleClient.foreach(_.init(appId))
this.schedulerDriver = driver
@ -672,3 +682,9 @@ private class Slave(val hostname: String) {
var taskFailures = 0
var shuffleRegistered = false
}
object IdHelper {
// Use atomic values since Spark contexts can be initialized in parallel
private[mesos] val nextSCNumber = new AtomicLong(0)
private[mesos] val startedBefore = new AtomicBoolean(false)
}