[SPARK-16809] enable history server links in dispatcher UI

## What changes were proposed in this pull request?

Links the Spark Mesos Dispatcher UI to the history server UI

- adds spark.mesos.dispatcher.historyServer.url
- explicitly generates frameworkIDs for the launched drivers, so the dispatcher knows how to correlate drivers and frameworkIDs

## How was this patch tested?

manual testing

Author: Michael Gummelt <mgummelt@mesosphere.io>
Author: Sergiusz Urbaniak <sur@mesosphere.io>

Closes #14414 from mgummelt/history-server.
This commit is contained in:
Michael Gummelt 2016-08-09 10:55:33 +01:00 committed by Sean Owen
parent 2154345b6a
commit 62e6212441
7 changed files with 75 additions and 10 deletions

View file

@ -28,10 +28,17 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.ui.{UIUtils, WebUIPage}
private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url")
def render(request: HttpServletRequest): Seq[Node] = { def render(request: HttpServletRequest): Seq[Node] = {
val state = parent.scheduler.getSchedulerState() val state = parent.scheduler.getSchedulerState()
val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources")
val driverHeaders = queuedHeaders ++ val driverHeader = Seq("Driver ID")
val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil)
val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")
val queuedHeaders = driverHeader ++ submissionHeader
val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++
Seq("Start Date", "Mesos Slave ID", "State") Seq("Start Date", "Mesos Slave ID", "State")
val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++ val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
Seq("Last Failed Status", "Next Retry Time", "Attempt Count") Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
@ -68,8 +75,18 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
val id = state.driverDescription.submissionId val id = state.driverDescription.submissionId
val historyCol = if (historyServerURL.isDefined) {
<td>
<a href={s"${historyServerURL.get}/history/${state.frameworkId}"}>
{state.frameworkId}
</a>
</td>
} else Nil
<tr> <tr>
<td><a href={s"driver?id=$id"}>{id}</a></td> <td><a href={s"driver?id=$id"}>{id}</a></td>
{historyCol}
<td>{state.driverDescription.submissionDate}</td> <td>{state.driverDescription.submissionDate}</td>
<td>{state.driverDescription.command.mainClass}</td> <td>{state.driverDescription.command.mainClass}</td>
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td> <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>

View file

@ -28,7 +28,7 @@ import org.apache.spark.ui.JettyUtils._
private[spark] class MesosClusterUI( private[spark] class MesosClusterUI(
securityManager: SecurityManager, securityManager: SecurityManager,
port: Int, port: Int,
conf: SparkConf, val conf: SparkConf,
dispatcherPublicAddress: String, dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler) val scheduler: MesosClusterScheduler)
extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {

View file

@ -43,6 +43,8 @@ import org.apache.spark.util.Utils
* @param slaveId Slave ID that the task is assigned to * @param slaveId Slave ID that the task is assigned to
* @param mesosTaskStatus The last known task status update. * @param mesosTaskStatus The last known task status update.
* @param startDate The date the task was launched * @param startDate The date the task was launched
* @param finishDate The date the task finished
* @param frameworkId Mesos framework ID the task registers with
*/ */
private[spark] class MesosClusterSubmissionState( private[spark] class MesosClusterSubmissionState(
val driverDescription: MesosDriverDescription, val driverDescription: MesosDriverDescription,
@ -50,12 +52,13 @@ private[spark] class MesosClusterSubmissionState(
val slaveId: SlaveID, val slaveId: SlaveID,
var mesosTaskStatus: Option[TaskStatus], var mesosTaskStatus: Option[TaskStatus],
var startDate: Date, var startDate: Date,
var finishDate: Option[Date]) var finishDate: Option[Date],
val frameworkId: String)
extends Serializable { extends Serializable {
def copy(): MesosClusterSubmissionState = { def copy(): MesosClusterSubmissionState = {
new MesosClusterSubmissionState( new MesosClusterSubmissionState(
driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate) driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId)
} }
} }
@ -63,6 +66,7 @@ private[spark] class MesosClusterSubmissionState(
* Tracks the retry state of a driver, which includes the next time it should be scheduled * Tracks the retry state of a driver, which includes the next time it should be scheduled
* and necessary information to do exponential backoff. * and necessary information to do exponential backoff.
* This class is not thread-safe, and we expect the caller to handle synchronizing state. * This class is not thread-safe, and we expect the caller to handle synchronizing state.
*
* @param lastFailureStatus Last Task status when it failed. * @param lastFailureStatus Last Task status when it failed.
* @param retries Number of times it has been retried. * @param retries Number of times it has been retried.
* @param nextRetry Time at which it should be retried next * @param nextRetry Time at which it should be retried next
@ -80,6 +84,7 @@ private[spark] class MesosClusterRetryState(
/** /**
* The full state of the cluster scheduler, currently being used for displaying * The full state of the cluster scheduler, currently being used for displaying
* information on the UI. * information on the UI.
*
* @param frameworkId Mesos Framework id for the cluster scheduler. * @param frameworkId Mesos Framework id for the cluster scheduler.
* @param masterUrl The Mesos master url * @param masterUrl The Mesos master url
* @param queuedDrivers All drivers queued to be launched * @param queuedDrivers All drivers queued to be launched
@ -358,13 +363,25 @@ private[spark] class MesosClusterScheduler(
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
} }
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
m.updated(k, f(m.getOrElse(k, default)))
}
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
s"${frameworkId}-${desc.submissionId}"
}
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
val env = { val env = {
val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ") val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts) val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
driverEnv ++ executorEnv ++ desc.command.environment var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
)
driverEnv ++ executorEnv ++ commandEnv
} }
val envBuilder = Environment.newBuilder() val envBuilder = Environment.newBuilder()
@ -552,7 +569,7 @@ private[spark] class MesosClusterScheduler(
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId) submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId, val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None) None, new Date(), None, getDriverFrameworkID(submission))
launchedDrivers(submission.submissionId) = newState launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState) launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId) afterLaunchCallback(submission.submissionId)

View file

@ -152,8 +152,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.sparkUser, sc.sparkUser,
sc.appName, sc.appName,
sc.conf, sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)) sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
None,
None,
sc.conf.getOption("spark.mesos.driver.frameworkId")
) )
unsetFrameworkID(sc)
startScheduler(driver) startScheduler(driver)
} }

View file

@ -77,8 +77,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.sparkUser, sc.sparkUser,
sc.appName, sc.appName,
sc.conf, sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)) sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
Option.empty,
Option.empty,
sc.conf.getOption("spark.mesos.driver.frameworkId")
) )
unsetFrameworkID(sc)
startScheduler(driver) startScheduler(driver)
} }

View file

@ -357,4 +357,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
} }
/**
* spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
* submissions with frameworkIDs. However, this causes issues when a driver process launches
* more than one framework (more than one SparkContext(, because they all try to register with
* the same frameworkID. To enforce that only the first driver registers with the configured
* framework ID, the driver calls this method after the first registration.
*/
def unsetFrameworkID(sc: SparkContext) {
sc.conf.remove("spark.mesos.driver.frameworkId")
System.clearProperty("spark.mesos.driver.frameworkId")
}
} }

View file

@ -468,6 +468,16 @@ See the [configuration page](configuration.html) for information on Spark config
If unset it will point to Spark's internal web UI. If unset it will point to Spark's internal web UI.
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.mesos.dispatcher.historyServer.url</code></td>
<td><code>(none)</code></td>
<td>
Set the URL of the <a href="http://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact">history
server</a>. The dispatcher will then link each driver to its entry
in the history server.
</td>
</tr>
</table> </table>
# Troubleshooting and Debugging # Troubleshooting and Debugging