[SPARK-5338] [MESOS] Add cluster mode support for Mesos

This patch adds the support for cluster mode to run on Mesos.
It introduces a new Mesos framework dedicated to launch new apps/drivers, and can be called with the spark-submit script and specifying --master flag to the cluster mode REST interface instead of Mesos master.

Example:
./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master mesos://10.0.0.206:8077 --executor-memory 1G --total-executor-cores 100 examples/target/spark-examples_2.10-1.3.0-SNAPSHOT.jar 30

Part of this patch is also to abstract the StandaloneRestServer so it can have different implementations of the REST endpoints.

Features of the cluster mode in this PR:
- Supports supervise mode where scheduler will keep trying to reschedule exited job.
- Adds a new UI for the cluster mode scheduler to see all the running jobs, finished jobs, and supervise jobs waiting to be retried
- Supports state persistence to ZK, so when the cluster scheduler fails over it can pick up all the queued and running jobs

Author: Timothy Chen <tnachen@gmail.com>
Author: Luc Bourlier <luc.bourlier@typesafe.com>

Closes #5144 from tnachen/mesos_cluster_mode and squashes the following commits:

069e946 [Timothy Chen] Fix rebase.
e24b512 [Timothy Chen] Persist submitted driver.
390c491 [Timothy Chen] Fix zk conf key for mesos zk engine.
e324ac1 [Timothy Chen] Fix merge.
fd5259d [Timothy Chen] Address review comments.
1553230 [Timothy Chen] Address review comments.
c6c6b73 [Timothy Chen] Pass spark properties to mesos cluster tasks.
f7d8046 [Timothy Chen] Change app name to spark cluster.
17f93a2 [Timothy Chen] Fix head of line blocking in scheduling drivers.
6ff8e5c [Timothy Chen] Address comments and add logging.
df355cd [Timothy Chen] Add metrics to mesos cluster scheduler.
20f7284 [Timothy Chen] Address review comments
7252612 [Timothy Chen] Fix tests.
a46ad66 [Timothy Chen] Allow zk cli param override.
920fc4b [Timothy Chen] Fix scala style issues.
862b5b5 [Timothy Chen] Support asking driver status when it's retrying.
7f214c2 [Timothy Chen] Fix RetryState visibility
e0f33f7 [Timothy Chen] Add supervise support and persist retries.
371ce65 [Timothy Chen] Handle cluster mode recovery and state persistence.
3d4dfa1 [Luc Bourlier] Adds support to kill submissions
febfaba [Timothy Chen] Bound the finished drivers in memory
543a98d [Timothy Chen] Schedule multiple jobs
6887e5e [Timothy Chen] Support looking at SPARK_EXECUTOR_URI env variable in schedulers
8ec76bc [Timothy Chen] Fix Mesos dispatcher UI.
d57d77d [Timothy Chen] Add documentation
825afa0 [Luc Bourlier] Supports more spark-submit parameters
b8e7181 [Luc Bourlier] Adds a shutdown latch to keep the deamon running
0fa7780 [Luc Bourlier] Launch task through the mesos scheduler
5b7a12b [Timothy Chen] WIP: Making a cluster mode a mesos framework.
4b2f5ef [Timothy Chen] Specify user jar in command to be replaced with local.
e775001 [Timothy Chen] Support fetching remote uris in driver runner.
7179495 [Timothy Chen] Change Driver page output and add logging
880bc27 [Timothy Chen] Add Mesos Cluster UI to display driver results
9986731 [Timothy Chen] Kill drivers when shutdown
67cbc18 [Timothy Chen] Rename StandaloneRestClient to RestClient and add sbin scripts
e3facdd [Timothy Chen] Add Mesos Cluster dispatcher
This commit is contained in:
Timothy Chen 2015-04-28 13:31:08 -07:00 committed by Andrew Or
parent 80098109d9
commit 53befacced
30 changed files with 2146 additions and 492 deletions

View file

@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.util.Utils
/**

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.deploy.master
package org.apache.spark.deploy
import scala.collection.JavaConversions._
@ -25,15 +25,17 @@ import org.apache.zookeeper.KeeperException
import org.apache.spark.{Logging, SparkConf}
private[deploy] object SparkCuratorUtil extends Logging {
private[spark] object SparkCuratorUtil extends Logging {
private val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
private val ZK_SESSION_TIMEOUT_MILLIS = 60000
private val RETRY_WAIT_MILLIS = 5000
private val MAX_RECONNECT_ATTEMPTS = 3
def newClient(conf: SparkConf): CuratorFramework = {
val ZK_URL = conf.get("spark.deploy.zookeeper.url")
def newClient(
conf: SparkConf,
zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
val ZK_URL = conf.get(zkUrlConf)
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))

View file

@ -36,11 +36,11 @@ import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
/**
* Whether to submit, kill, or request the status of an application.
* The latter two operations are currently supported only for standalone cluster mode.
@ -114,18 +114,20 @@ object SparkSubmit {
}
}
/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
/**
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
*/
private def kill(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
new RestSubmissionClient()
.killSubmission(args.master, args.submissionToKill)
}
/**
* Request the status of an existing submission using the REST protocol.
* Standalone cluster mode only.
* Standalone and Mesos cluster mode only.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
new RestSubmissionClient()
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
}
@ -252,6 +254,7 @@ object SparkSubmit {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
@ -294,8 +297,9 @@ object SparkSubmit {
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
case (MESOS, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on Mesos clusters.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
@ -377,15 +381,6 @@ object SparkSubmit {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),
// Standalone cluster only
// Do not set CL arguments here because there are multiple possibilities for the main class
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
sysProp = "spark.driver.supervise"),
// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
@ -413,7 +408,15 @@ object SparkSubmit {
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files")
sysProp = "spark.files"),
OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
)
// In client mode, launch the application main class directly
@ -452,7 +455,7 @@ object SparkSubmit {
// All Spark parameters are expected to be passed to the client through system properties.
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
@ -496,6 +499,15 @@ object SparkSubmit {
}
}
if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)

View file

@ -241,8 +241,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://")) {
SparkSubmit.printErrorAndExit("Killing submissions is only supported in standalone mode!")
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Killing submissions is only supported in standalone or Mesos mode!")
}
if (submissionToKill == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
@ -250,9 +251,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://")) {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Requesting submission statuses is only supported in standalone mode!")
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
@ -485,6 +486,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
| Spark standalone with cluster deploy mode only:
| --driver-cores NUM Cores for driver (Default: 1).
|
| Spark standalone or Mesos with cluster deploy mode only:
| --supervise If given, restarts the driver on failure.
| --kill SUBMISSION_ID If given, kills the driver specified.
| --status SUBMISSION_ID If given, requests the status of the driver specified.

View file

@ -130,7 +130,7 @@ private[master] class Master(
private val restServer =
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
Some(new StandaloneRestServer(host, port, self, masterUrl, conf))
Some(new StandaloneRestServer(host, port, conf, self, masterUrl))
} else {
None
}

View file

@ -23,6 +23,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
import org.apache.spark.deploy.SparkCuratorUtil
private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {

View file

@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkCuratorUtil
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)

View file

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.mesos
import java.util.concurrent.CountDownLatch
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
import org.apache.spark.scheduler.cluster.mesos._
import org.apache.spark.util.SignalLogger
import org.apache.spark.{Logging, SecurityManager, SparkConf}
/*
* A dispatcher that is responsible for managing and launching drivers, and is intended to be
* used for Mesos cluster mode. The dispatcher is a long-running process started by the user in
* the cluster independently of Spark applications.
* It contains a [[MesosRestServer]] that listens for requests to submit drivers and a
* [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master
* for resources.
*
* A typical new driver lifecycle is the following:
* - Driver submitted via spark-submit talking to the [[MesosRestServer]]
* - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
* - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue
*
* This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable
* per driver launched.
* This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
* a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and
* stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively.
*/
private[mesos] class MesosClusterDispatcher(
args: MesosClusterDispatcherArguments,
conf: SparkConf)
extends Logging {
private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", "NONE").toUpperCase()
logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
private val engineFactory = recoveryMode match {
case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode)
}
private val scheduler = new MesosClusterScheduler(engineFactory, conf)
private val server = new MesosRestServer(args.host, args.port, conf, scheduler)
private val webUi = new MesosClusterUI(
new SecurityManager(conf),
args.webUiPort,
conf,
publicAddress,
scheduler)
private val shutdownLatch = new CountDownLatch(1)
def start(): Unit = {
webUi.bind()
scheduler.frameworkUrl = webUi.activeWebUiUrl
scheduler.start()
server.start()
}
def awaitShutdown(): Unit = {
shutdownLatch.await()
}
def stop(): Unit = {
webUi.stop()
server.stop()
scheduler.stop()
shutdownLatch.countDown()
}
}
private[mesos] object MesosClusterDispatcher extends Logging {
def main(args: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
conf.setMaster(dispatcherArgs.masterUrl)
conf.setAppName(dispatcherArgs.name)
dispatcherArgs.zookeeperUrl.foreach { z =>
conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
conf.set("spark.mesos.deploy.zookeeper.url", z)
}
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
dispatcher.start()
val shutdownHook = new Thread() {
override def run() {
logInfo("Shutdown hook is shutting down dispatcher")
dispatcher.stop()
dispatcher.awaitShutdown()
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
dispatcher.awaitShutdown()
}
}

View file

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.mesos
import org.apache.spark.SparkConf
import org.apache.spark.util.{IntParam, Utils}
private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var name = "Spark Cluster"
var webUiPort = 8081
var masterUrl: String = _
var zookeeperUrl: Option[String] = None
var propertiesFile: String = _
parse(args.toList)
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
private def parse(args: List[String]): Unit = args match {
case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
host = value
parse(tail)
case ("--port" | "-p") :: IntParam(value) :: tail =>
port = value
parse(tail)
case ("--webui-port" | "-p") :: IntParam(value) :: tail =>
webUiPort = value
parse(tail)
case ("--zk" | "-z") :: value :: tail =>
zookeeperUrl = Some(value)
parse(tail)
case ("--master" | "-m") :: value :: tail =>
if (!value.startsWith("mesos://")) {
System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
System.exit(1)
}
masterUrl = value.stripPrefix("mesos://")
parse(tail)
case ("--name") :: value :: tail =>
name = value
parse(tail)
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)
case ("--help") :: tail =>
printUsageAndExit(0)
case Nil => {
if (masterUrl == null) {
System.err.println("--master is required")
printUsageAndExit(1)
}
}
case _ =>
printUsageAndExit(1)
}
private def printUsageAndExit(exitCode: Int): Unit = {
System.err.println(
"Usage: MesosClusterDispatcher [options]\n" +
"\n" +
"Options:\n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
" --name NAME Framework name to show in Mesos UI\n" +
" -m --master MASTER URI for connecting to Mesos master\n" +
" -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" +
" Zookeeper for persistence\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}
}

View file

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.mesos
import java.util.Date
import org.apache.spark.deploy.Command
import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
/**
* Describes a Spark driver that is submitted from the
* [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by
* [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
* @param jarUrl URL to the application jar
* @param mem Amount of memory for the driver
* @param cores Number of cores for the driver
* @param supervise Supervise the driver for long running app
* @param command The command to launch the driver.
* @param schedulerProperties Extra properties to pass the Mesos scheduler
*/
private[spark] class MesosDriverDescription(
val name: String,
val jarUrl: String,
val mem: Int,
val cores: Double,
val supervise: Boolean,
val command: Command,
val schedulerProperties: Map[String, String],
val submissionId: String,
val submissionDate: Date,
val retryState: Option[MesosClusterRetryState] = None)
extends Serializable {
def copy(
name: String = name,
jarUrl: String = jarUrl,
mem: Int = mem,
cores: Double = cores,
supervise: Boolean = supervise,
command: Command = command,
schedulerProperties: Map[String, String] = schedulerProperties,
submissionId: String = submissionId,
submissionDate: Date = submissionDate,
retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
submissionId, submissionDate, retryState)
}
override def toString: String = s"MesosDriverDescription (${command.mainClass})"
}

View file

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.mesos.ui
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.mesos.Protos.TaskStatus
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
import org.apache.spark.ui.{UIUtils, WebUIPage}
private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val state = parent.scheduler.getSchedulerState()
val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources")
val driverHeaders = queuedHeaders ++
Seq("Start Date", "Mesos Slave ID", "State")
val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers)
val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers)
val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers)
val content =
<p>Mesos Framework ID: {state.frameworkId}</p>
<div class="row-fluid">
<div class="span12">
<h4>Queued Drivers:</h4>
{queuedTable}
<h4>Launched Drivers:</h4>
{launchedTable}
<h4>Finished Drivers:</h4>
{finishedTable}
<h4>Supervise drivers waiting for retry:</h4>
{retryTable}
</div>
</div>;
UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster")
}
private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
<tr>
<td>{submission.submissionId}</td>
<td>{submission.submissionDate}</td>
<td>{submission.command.mainClass}</td>
<td>cpus: {submission.cores}, mem: {submission.mem}</td>
</tr>
}
private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
<tr>
<td>{state.driverDescription.submissionId}</td>
<td>{state.driverDescription.submissionDate}</td>
<td>{state.driverDescription.command.mainClass}</td>
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
<td>{state.startDate}</td>
<td>{state.slaveId.getValue}</td>
<td>{stateString(state.mesosTaskStatus)}</td>
</tr>
}
private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
<tr>
<td>{submission.submissionId}</td>
<td>{submission.submissionDate}</td>
<td>{submission.command.mainClass}</td>
<td>{submission.retryState.get.lastFailureStatus}</td>
<td>{submission.retryState.get.nextRetry}</td>
<td>{submission.retryState.get.retries}</td>
</tr>
}
private def stateString(status: Option[TaskStatus]): String = {
if (status.isEmpty) {
return ""
}
val sb = new StringBuilder
val s = status.get
sb.append(s"State: ${s.getState}")
if (status.get.hasMessage) {
sb.append(s", Message: ${s.getMessage}")
}
if (status.get.hasHealthy) {
sb.append(s", Healthy: ${s.getHealthy}")
}
if (status.get.hasSource) {
sb.append(s", Source: ${s.getSource}")
}
if (status.get.hasReason) {
sb.append(s", Reason: ${s.getReason}")
}
if (status.get.hasTimestamp) {
sb.append(s", Time: ${s.getTimestamp}")
}
sb.toString()
}
}

View file

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.mesos.ui
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.{SparkUI, WebUI}
/**
* UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]]
*/
private[spark] class MesosClusterUI(
securityManager: SecurityManager,
port: Int,
conf: SparkConf,
dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler)
extends WebUI(securityManager, port, conf) {
initialize()
def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort
override def initialize() {
attachPage(new MesosClusterPage(this))
attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
}
}
private object MesosClusterUI {
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
}

View file

@ -30,9 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils
/**
* A client that submits applications to the standalone Master using a REST protocol.
* This client is intended to communicate with the [[StandaloneRestServer]] and is
* currently used for cluster mode only.
* A client that submits applications to a [[RestSubmissionServer]].
*
* In protocol version v1, the REST URL takes the form http://[host:port]/v1/submissions/[action],
* where [action] can be one of create, kill, or status. Each type of request is represented in
@ -53,8 +51,10 @@ import org.apache.spark.util.Utils
* implementation of this client can use that information to retry using the version specified
* by the server.
*/
private[deploy] class StandaloneRestClient extends Logging {
import StandaloneRestClient._
private[spark] class RestSubmissionClient extends Logging {
import RestSubmissionClient._
private val supportedMasterPrefixes = Seq("spark://", "mesos://")
/**
* Submit an application specified by the parameters in the provided request.
@ -62,7 +62,7 @@ private[deploy] class StandaloneRestClient extends Logging {
* If the submission was successful, poll the status of the submission and report
* it to the user. Otherwise, report the error message provided by the server.
*/
private[rest] def createSubmission(
def createSubmission(
master: String,
request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to launch an application in $master.")
@ -107,7 +107,7 @@ private[deploy] class StandaloneRestClient extends Logging {
}
/** Construct a message that captures the specified parameters for submitting an application. */
private[rest] def constructSubmitRequest(
def constructSubmitRequest(
appResource: String,
mainClass: String,
appArgs: Array[String],
@ -219,14 +219,23 @@ private[deploy] class StandaloneRestClient extends Logging {
/** Return the base URL for communicating with the server, including the protocol version. */
private def getBaseUrl(master: String): String = {
val masterUrl = master.stripPrefix("spark://").stripSuffix("/")
var masterUrl = master
supportedMasterPrefixes.foreach { prefix =>
if (master.startsWith(prefix)) {
masterUrl = master.stripPrefix(prefix)
}
}
masterUrl = masterUrl.stripSuffix("/")
s"http://$masterUrl/$PROTOCOL_VERSION/submissions"
}
/** Throw an exception if this is not standalone mode. */
private def validateMaster(master: String): Unit = {
if (!master.startsWith("spark://")) {
throw new IllegalArgumentException("This REST client is only supported in standalone mode.")
val valid = supportedMasterPrefixes.exists { prefix => master.startsWith(prefix) }
if (!valid) {
throw new IllegalArgumentException(
"This REST client only supports master URLs that start with " +
"one of the following: " + supportedMasterPrefixes.mkString(","))
}
}
@ -295,7 +304,7 @@ private[deploy] class StandaloneRestClient extends Logging {
}
}
private[rest] object StandaloneRestClient {
private[spark] object RestSubmissionClient {
private val REPORT_DRIVER_STATUS_INTERVAL = 1000
private val REPORT_DRIVER_STATUS_MAX_TRIES = 10
val PROTOCOL_VERSION = "v1"
@ -315,7 +324,7 @@ private[rest] object StandaloneRestClient {
}
val sparkProperties = conf.getAll.toMap
val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
val client = new StandaloneRestClient
val client = new RestSubmissionClient
val submitRequest = client.constructSubmitRequest(
appResource, mainClass, appArgs, sparkProperties, environmentVariables)
client.createSubmission(master, submitRequest)
@ -323,7 +332,7 @@ private[rest] object StandaloneRestClient {
def main(args: Array[String]): Unit = {
if (args.size < 2) {
sys.error("Usage: StandaloneRestClient [app resource] [main class] [app args*]")
sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]")
sys.exit(1)
}
val appResource = args(0)

View file

@ -0,0 +1,318 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.rest
import java.net.InetSocketAddress
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import scala.io.Source
import com.fasterxml.jackson.core.JsonProcessingException
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils
/**
* A server that responds to requests submitted by the [[RestSubmissionClient]].
*
* This server responds with different HTTP codes depending on the situation:
* 200 OK - Request was processed successfully
* 400 BAD REQUEST - Request was malformed, not successfully validated, or of unexpected type
* 468 UNKNOWN PROTOCOL VERSION - Request specified a protocol this server does not understand
* 500 INTERNAL SERVER ERROR - Server throws an exception internally while processing the request
*
* The server always includes a JSON representation of the relevant [[SubmitRestProtocolResponse]]
* in the HTTP body. If an error occurs, however, the server will include an [[ErrorResponse]]
* instead of the one expected by the client. If the construction of this error response itself
* fails, the response will consist of an empty body with a response code that indicates internal
* server error.
*/
private[spark] abstract class RestSubmissionServer(
val host: String,
val requestedPort: Int,
val masterConf: SparkConf) extends Logging {
protected val submitRequestServlet: SubmitRequestServlet
protected val killRequestServlet: KillRequestServlet
protected val statusRequestServlet: StatusRequestServlet
private var _server: Option[Server] = None
// A mapping from URL prefixes to servlets that serve them. Exposed for testing.
protected val baseContext = s"/${RestSubmissionServer.PROTOCOL_VERSION}/submissions"
protected lazy val contextToServlet = Map[String, RestServlet](
s"$baseContext/create/*" -> submitRequestServlet,
s"$baseContext/kill/*" -> killRequestServlet,
s"$baseContext/status/*" -> statusRequestServlet,
"/*" -> new ErrorServlet // default handler
)
/** Start the server and return the bound port. */
def start(): Int = {
val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
_server = Some(server)
logInfo(s"Started REST server for submitting applications on port $boundPort")
boundPort
}
/**
* Map the servlets to their corresponding contexts and attach them to a server.
* Return a 2-tuple of the started server and the bound port.
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(host, startPort))
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val mainHandler = new ServletContextHandler
mainHandler.setContextPath("/")
contextToServlet.foreach { case (prefix, servlet) =>
mainHandler.addServlet(new ServletHolder(servlet), prefix)
}
server.setHandler(mainHandler)
server.start()
val boundPort = server.getConnectors()(0).getLocalPort
(server, boundPort)
}
def stop(): Unit = {
_server.foreach(_.stop())
}
}
private[rest] object RestSubmissionServer {
val PROTOCOL_VERSION = RestSubmissionClient.PROTOCOL_VERSION
val SC_UNKNOWN_PROTOCOL_VERSION = 468
}
/**
* An abstract servlet for handling requests passed to the [[RestSubmissionServer]].
*/
private[rest] abstract class RestServlet extends HttpServlet with Logging {
/**
* Serialize the given response message to JSON and send it through the response servlet.
* This validates the response before sending it to ensure it is properly constructed.
*/
protected def sendResponse(
responseMessage: SubmitRestProtocolResponse,
responseServlet: HttpServletResponse): Unit = {
val message = validateResponse(responseMessage, responseServlet)
responseServlet.setContentType("application/json")
responseServlet.setCharacterEncoding("utf-8")
responseServlet.getWriter.write(message.toJson)
}
/**
* Return any fields in the client request message that the server does not know about.
*
* The mechanism for this is to reconstruct the JSON on the server side and compare the
* diff between this JSON and the one generated on the client side. Any fields that are
* only in the client JSON are treated as unexpected.
*/
protected def findUnknownFields(
requestJson: String,
requestMessage: SubmitRestProtocolMessage): Array[String] = {
val clientSideJson = parse(requestJson)
val serverSideJson = parse(requestMessage.toJson)
val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
unknown match {
case j: JObject => j.obj.map { case (k, _) => k }.toArray
case _ => Array.empty[String] // No difference
}
}
/** Return a human readable String representation of the exception. */
protected def formatException(e: Throwable): String = {
val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
s"$e\n$stackTraceString"
}
/** Construct an error message to signal the fact that an exception has been thrown. */
protected def handleError(message: String): ErrorResponse = {
val e = new ErrorResponse
e.serverSparkVersion = sparkVersion
e.message = message
e
}
/**
* Parse a submission ID from the relative path, assuming it is the first part of the path.
* For instance, we expect the path to take the form /[submission ID]/maybe/something/else.
* The returned submission ID cannot be empty. If the path is unexpected, return None.
*/
protected def parseSubmissionId(path: String): Option[String] = {
if (path == null || path.isEmpty) {
None
} else {
path.stripPrefix("/").split("/").headOption.filter(_.nonEmpty)
}
}
/**
* Validate the response to ensure that it is correctly constructed.
*
* If it is, simply return the message as is. Otherwise, return an error response instead
* to propagate the exception back to the client and set the appropriate error code.
*/
private def validateResponse(
responseMessage: SubmitRestProtocolResponse,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
try {
responseMessage.validate()
responseMessage
} catch {
case e: Exception =>
responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
handleError("Internal server error: " + formatException(e))
}
}
}
/**
* A servlet for handling kill requests passed to the [[RestSubmissionServer]].
*/
private[rest] abstract class KillRequestServlet extends RestServlet {
/**
* If a submission ID is specified in the URL, have the Master kill the corresponding
* driver and return an appropriate response to the client. Otherwise, return error.
*/
protected override def doPost(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
val submissionId = parseSubmissionId(request.getPathInfo)
val responseMessage = submissionId.map(handleKill).getOrElse {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError("Submission ID is missing in kill request.")
}
sendResponse(responseMessage, response)
}
protected def handleKill(submissionId: String): KillSubmissionResponse
}
/**
* A servlet for handling status requests passed to the [[RestSubmissionServer]].
*/
private[rest] abstract class StatusRequestServlet extends RestServlet {
/**
* If a submission ID is specified in the URL, request the status of the corresponding
* driver from the Master and include it in the response. Otherwise, return error.
*/
protected override def doGet(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
val submissionId = parseSubmissionId(request.getPathInfo)
val responseMessage = submissionId.map(handleStatus).getOrElse {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError("Submission ID is missing in status request.")
}
sendResponse(responseMessage, response)
}
protected def handleStatus(submissionId: String): SubmissionStatusResponse
}
/**
* A servlet for handling submit requests passed to the [[RestSubmissionServer]].
*/
private[rest] abstract class SubmitRequestServlet extends RestServlet {
/**
* Submit an application to the Master with parameters specified in the request.
*
* The request is assumed to be a [[SubmitRestProtocolRequest]] in the form of JSON.
* If the request is successfully processed, return an appropriate response to the
* client indicating so. Otherwise, return error instead.
*/
protected override def doPost(
requestServlet: HttpServletRequest,
responseServlet: HttpServletResponse): Unit = {
val responseMessage =
try {
val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
// The response should have already been validated on the client.
// In case this is not true, validate it ourselves to avoid potential NPEs.
requestMessage.validate()
handleSubmit(requestMessageJson, requestMessage, responseServlet)
} catch {
// The client failed to provide a valid JSON, so this is not our fault
case e @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError("Malformed request: " + formatException(e))
}
sendResponse(responseMessage, responseServlet)
}
protected def handleSubmit(
requestMessageJson: String,
requestMessage: SubmitRestProtocolMessage,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse
}
/**
* A default servlet that handles error cases that are not captured by other servlets.
*/
private class ErrorServlet extends RestServlet {
private val serverVersion = RestSubmissionServer.PROTOCOL_VERSION
/** Service a faulty request by returning an appropriate error message to the client. */
protected override def service(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
val path = request.getPathInfo
val parts = path.stripPrefix("/").split("/").filter(_.nonEmpty).toList
var versionMismatch = false
var msg =
parts match {
case Nil =>
// http://host:port/
"Missing protocol version."
case `serverVersion` :: Nil =>
// http://host:port/correct-version
"Missing the /submissions prefix."
case `serverVersion` :: "submissions" :: tail =>
// http://host:port/correct-version/submissions/*
"Missing an action: please specify one of /create, /kill, or /status."
case unknownVersion :: tail =>
// http://host:port/unknown-version/*
versionMismatch = true
s"Unknown protocol version '$unknownVersion'."
case _ =>
// never reached
s"Malformed path $path."
}
msg += s" Please submit requests through http://[host]:[port]/$serverVersion/submissions/..."
val error = handleError(msg)
// If there is a version mismatch, include the highest protocol version that
// this server supports in case the client wants to retry with our version
if (versionMismatch) {
error.highestProtocolVersion = serverVersion
response.setStatus(RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION)
} else {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
}
sendResponse(error, response)
}
}

View file

@ -18,26 +18,16 @@
package org.apache.spark.deploy.rest
import java.io.File
import java.net.InetSocketAddress
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import scala.io.Source
import javax.servlet.http.HttpServletResponse
import akka.actor.ActorRef
import com.fasterxml.jackson.core.JsonProcessingException
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
/**
* A server that responds to requests submitted by the [[StandaloneRestClient]].
* A server that responds to requests submitted by the [[RestSubmissionClient]].
* This is intended to be embedded in the standalone Master and used in cluster mode only.
*
* This server responds with different HTTP codes depending on the situation:
@ -54,173 +44,31 @@ import org.apache.spark.deploy.ClientArguments._
*
* @param host the address this server should bind to
* @param requestedPort the port this server will attempt to bind to
* @param masterConf the conf used by the Master
* @param masterActor reference to the Master actor to which requests can be sent
* @param masterUrl the URL of the Master new drivers will attempt to connect to
* @param masterConf the conf used by the Master
*/
private[deploy] class StandaloneRestServer(
host: String,
requestedPort: Int,
masterConf: SparkConf,
masterActor: ActorRef,
masterUrl: String,
masterConf: SparkConf)
extends Logging {
masterUrl: String)
extends RestSubmissionServer(host, requestedPort, masterConf) {
import StandaloneRestServer._
private var _server: Option[Server] = None
// A mapping from URL prefixes to servlets that serve them. Exposed for testing.
protected val baseContext = s"/$PROTOCOL_VERSION/submissions"
protected val contextToServlet = Map[String, StandaloneRestServlet](
s"$baseContext/create/*" -> new SubmitRequestServlet(masterActor, masterUrl, masterConf),
s"$baseContext/kill/*" -> new KillRequestServlet(masterActor, masterConf),
s"$baseContext/status/*" -> new StatusRequestServlet(masterActor, masterConf),
"/*" -> new ErrorServlet // default handler
)
/** Start the server and return the bound port. */
def start(): Int = {
val (server, boundPort) = Utils.startServiceOnPort[Server](requestedPort, doStart, masterConf)
_server = Some(server)
logInfo(s"Started REST server for submitting applications on port $boundPort")
boundPort
}
/**
* Map the servlets to their corresponding contexts and attach them to a server.
* Return a 2-tuple of the started server and the bound port.
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(host, startPort))
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val mainHandler = new ServletContextHandler
mainHandler.setContextPath("/")
contextToServlet.foreach { case (prefix, servlet) =>
mainHandler.addServlet(new ServletHolder(servlet), prefix)
}
server.setHandler(mainHandler)
server.start()
val boundPort = server.getConnectors()(0).getLocalPort
(server, boundPort)
}
def stop(): Unit = {
_server.foreach(_.stop())
}
}
private[rest] object StandaloneRestServer {
val PROTOCOL_VERSION = StandaloneRestClient.PROTOCOL_VERSION
val SC_UNKNOWN_PROTOCOL_VERSION = 468
}
/**
* An abstract servlet for handling requests passed to the [[StandaloneRestServer]].
*/
private[rest] abstract class StandaloneRestServlet extends HttpServlet with Logging {
/**
* Serialize the given response message to JSON and send it through the response servlet.
* This validates the response before sending it to ensure it is properly constructed.
*/
protected def sendResponse(
responseMessage: SubmitRestProtocolResponse,
responseServlet: HttpServletResponse): Unit = {
val message = validateResponse(responseMessage, responseServlet)
responseServlet.setContentType("application/json")
responseServlet.setCharacterEncoding("utf-8")
responseServlet.getWriter.write(message.toJson)
}
/**
* Return any fields in the client request message that the server does not know about.
*
* The mechanism for this is to reconstruct the JSON on the server side and compare the
* diff between this JSON and the one generated on the client side. Any fields that are
* only in the client JSON are treated as unexpected.
*/
protected def findUnknownFields(
requestJson: String,
requestMessage: SubmitRestProtocolMessage): Array[String] = {
val clientSideJson = parse(requestJson)
val serverSideJson = parse(requestMessage.toJson)
val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
unknown match {
case j: JObject => j.obj.map { case (k, _) => k }.toArray
case _ => Array.empty[String] // No difference
}
}
/** Return a human readable String representation of the exception. */
protected def formatException(e: Throwable): String = {
val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
s"$e\n$stackTraceString"
}
/** Construct an error message to signal the fact that an exception has been thrown. */
protected def handleError(message: String): ErrorResponse = {
val e = new ErrorResponse
e.serverSparkVersion = sparkVersion
e.message = message
e
}
/**
* Parse a submission ID from the relative path, assuming it is the first part of the path.
* For instance, we expect the path to take the form /[submission ID]/maybe/something/else.
* The returned submission ID cannot be empty. If the path is unexpected, return None.
*/
protected def parseSubmissionId(path: String): Option[String] = {
if (path == null || path.isEmpty) {
None
} else {
path.stripPrefix("/").split("/").headOption.filter(_.nonEmpty)
}
}
/**
* Validate the response to ensure that it is correctly constructed.
*
* If it is, simply return the message as is. Otherwise, return an error response instead
* to propagate the exception back to the client and set the appropriate error code.
*/
private def validateResponse(
responseMessage: SubmitRestProtocolResponse,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
try {
responseMessage.validate()
responseMessage
} catch {
case e: Exception =>
responseServlet.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
handleError("Internal server error: " + formatException(e))
}
}
protected override val submitRequestServlet =
new StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf)
protected override val killRequestServlet =
new StandaloneKillRequestServlet(masterActor, masterConf)
protected override val statusRequestServlet =
new StandaloneStatusRequestServlet(masterActor, masterConf)
}
/**
* A servlet for handling kill requests passed to the [[StandaloneRestServer]].
*/
private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
extends StandaloneRestServlet {
/**
* If a submission ID is specified in the URL, have the Master kill the corresponding
* driver and return an appropriate response to the client. Otherwise, return error.
*/
protected override def doPost(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
val submissionId = parseSubmissionId(request.getPathInfo)
val responseMessage = submissionId.map(handleKill).getOrElse {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError("Submission ID is missing in kill request.")
}
sendResponse(responseMessage, response)
}
private[rest] class StandaloneKillRequestServlet(masterActor: ActorRef, conf: SparkConf)
extends KillRequestServlet {
protected def handleKill(submissionId: String): KillSubmissionResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
@ -238,23 +86,8 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
/**
* A servlet for handling status requests passed to the [[StandaloneRestServer]].
*/
private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
extends StandaloneRestServlet {
/**
* If a submission ID is specified in the URL, request the status of the corresponding
* driver from the Master and include it in the response. Otherwise, return error.
*/
protected override def doGet(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
val submissionId = parseSubmissionId(request.getPathInfo)
val responseMessage = submissionId.map(handleStatus).getOrElse {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError("Submission ID is missing in status request.")
}
sendResponse(responseMessage, response)
}
private[rest] class StandaloneStatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
extends StatusRequestServlet {
protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
@ -276,71 +109,11 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
/**
* A servlet for handling submit requests passed to the [[StandaloneRestServer]].
*/
private[rest] class SubmitRequestServlet(
private[rest] class StandaloneSubmitRequestServlet(
masterActor: ActorRef,
masterUrl: String,
conf: SparkConf)
extends StandaloneRestServlet {
/**
* Submit an application to the Master with parameters specified in the request.
*
* The request is assumed to be a [[SubmitRestProtocolRequest]] in the form of JSON.
* If the request is successfully processed, return an appropriate response to the
* client indicating so. Otherwise, return error instead.
*/
protected override def doPost(
requestServlet: HttpServletRequest,
responseServlet: HttpServletResponse): Unit = {
val responseMessage =
try {
val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
// The response should have already been validated on the client.
// In case this is not true, validate it ourselves to avoid potential NPEs.
requestMessage.validate()
handleSubmit(requestMessageJson, requestMessage, responseServlet)
} catch {
// The client failed to provide a valid JSON, so this is not our fault
case e @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError("Malformed request: " + formatException(e))
}
sendResponse(responseMessage, responseServlet)
}
/**
* Handle the submit request and construct an appropriate response to return to the client.
*
* This assumes that the request message is already successfully validated.
* If the request message is not of the expected type, return error to the client.
*/
private def handleSubmit(
requestMessageJson: String,
requestMessage: SubmitRestProtocolMessage,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val askTimeout = RpcUtils.askTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
val submitResponse = new CreateSubmissionResponse
submitResponse.serverSparkVersion = sparkVersion
submitResponse.message = response.message
submitResponse.success = response.success
submitResponse.submissionId = response.driverId.orNull
val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
if (unknownFields.nonEmpty) {
// If there are fields that the server does not know about, warn the client
submitResponse.unknownFields = unknownFields
}
submitResponse
case unexpected =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError(s"Received message of unexpected type ${unexpected.messageType}.")
}
}
extends SubmitRequestServlet {
/**
* Build a driver description from the fields specified in the submit request.
@ -389,50 +162,37 @@ private[rest] class SubmitRequestServlet(
new DriverDescription(
appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
}
}
/**
* A default servlet that handles error cases that are not captured by other servlets.
*/
private class ErrorServlet extends StandaloneRestServlet {
private val serverVersion = StandaloneRestServer.PROTOCOL_VERSION
/** Service a faulty request by returning an appropriate error message to the client. */
protected override def service(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
val path = request.getPathInfo
val parts = path.stripPrefix("/").split("/").filter(_.nonEmpty).toList
var versionMismatch = false
var msg =
parts match {
case Nil =>
// http://host:port/
"Missing protocol version."
case `serverVersion` :: Nil =>
// http://host:port/correct-version
"Missing the /submissions prefix."
case `serverVersion` :: "submissions" :: tail =>
// http://host:port/correct-version/submissions/*
"Missing an action: please specify one of /create, /kill, or /status."
case unknownVersion :: tail =>
// http://host:port/unknown-version/*
versionMismatch = true
s"Unknown protocol version '$unknownVersion'."
case _ =>
// never reached
s"Malformed path $path."
}
msg += s" Please submit requests through http://[host]:[port]/$serverVersion/submissions/..."
val error = handleError(msg)
// If there is a version mismatch, include the highest protocol version that
// this server supports in case the client wants to retry with our version
if (versionMismatch) {
error.highestProtocolVersion = serverVersion
response.setStatus(StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
} else {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
/**
* Handle the submit request and construct an appropriate response to return to the client.
*
* This assumes that the request message is already successfully validated.
* If the request message is not of the expected type, return error to the client.
*/
protected override def handleSubmit(
requestMessageJson: String,
requestMessage: SubmitRestProtocolMessage,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val askTimeout = RpcUtils.askTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
val submitResponse = new CreateSubmissionResponse
submitResponse.serverSparkVersion = sparkVersion
submitResponse.message = response.message
submitResponse.success = response.success
submitResponse.submissionId = response.driverId.orNull
val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
if (unknownFields.nonEmpty) {
// If there are fields that the server does not know about, warn the client
submitResponse.unknownFields = unknownFields
}
submitResponse
case unexpected =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError(s"Received message of unexpected type ${unexpected.messageType}.")
}
sendResponse(error, response)
}
}

View file

@ -61,7 +61,7 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
assertProperty[Boolean](key, "boolean", _.toBoolean)
private def assertPropertyIsNumeric(key: String): Unit =
assertProperty[Int](key, "numeric", _.toInt)
assertProperty[Double](key, "numeric", _.toDouble)
private def assertPropertyIsMemory(key: String): Unit =
assertProperty[Int](key, "memory", Utils.memoryStringToMb)

View file

@ -35,7 +35,7 @@ private[rest] abstract class SubmitRestProtocolResponse extends SubmitRestProtoc
/**
* A response to a [[CreateSubmissionRequest]] in the REST application submission protocol.
*/
private[rest] class CreateSubmissionResponse extends SubmitRestProtocolResponse {
private[spark] class CreateSubmissionResponse extends SubmitRestProtocolResponse {
var submissionId: String = null
protected override def doValidate(): Unit = {
super.doValidate()
@ -46,7 +46,7 @@ private[rest] class CreateSubmissionResponse extends SubmitRestProtocolResponse
/**
* A response to a kill request in the REST application submission protocol.
*/
private[rest] class KillSubmissionResponse extends SubmitRestProtocolResponse {
private[spark] class KillSubmissionResponse extends SubmitRestProtocolResponse {
var submissionId: String = null
protected override def doValidate(): Unit = {
super.doValidate()
@ -58,7 +58,7 @@ private[rest] class KillSubmissionResponse extends SubmitRestProtocolResponse {
/**
* A response to a status request in the REST application submission protocol.
*/
private[rest] class SubmissionStatusResponse extends SubmitRestProtocolResponse {
private[spark] class SubmissionStatusResponse extends SubmitRestProtocolResponse {
var submissionId: String = null
var driverState: String = null
var workerId: String = null

View file

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.rest.mesos
import java.io.File
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.atomic.AtomicLong
import javax.servlet.http.HttpServletResponse
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest._
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.util.Utils
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
/**
* A server that responds to requests submitted by the [[RestSubmissionClient]].
* All requests are forwarded to
* [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
* This is intended to be used in Mesos cluster mode only.
* For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs.
*/
private[spark] class MesosRestServer(
host: String,
requestedPort: Int,
masterConf: SparkConf,
scheduler: MesosClusterScheduler)
extends RestSubmissionServer(host, requestedPort, masterConf) {
protected override val submitRequestServlet =
new MesosSubmitRequestServlet(scheduler, masterConf)
protected override val killRequestServlet =
new MesosKillRequestServlet(scheduler, masterConf)
protected override val statusRequestServlet =
new MesosStatusRequestServlet(scheduler, masterConf)
}
private[deploy] class MesosSubmitRequestServlet(
scheduler: MesosClusterScheduler,
conf: SparkConf)
extends SubmitRequestServlet {
private val DEFAULT_SUPERVISE = false
private val DEFAULT_MEMORY = 512 // mb
private val DEFAULT_CORES = 1.0
private val nextDriverNumber = new AtomicLong(0)
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
private def newDriverId(submitDate: Date): String = {
"driver-%s-%04d".format(
createDateFormat.format(submitDate), nextDriverNumber.incrementAndGet())
}
/**
* Build a driver description from the fields specified in the submit request.
*
* This involves constructing a command that launches a mesos framework for the job.
* This does not currently consider fields used by python applications since python
* is not supported in mesos cluster mode yet.
*/
private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
// Required fields, including the main class because python is not yet supported
val appResource = Option(request.appResource).getOrElse {
throw new SubmitRestMissingFieldException("Application jar is missing.")
}
val mainClass = Option(request.mainClass).getOrElse {
throw new SubmitRestMissingFieldException("Main class is missing.")
}
// Optional fields
val sparkProperties = request.sparkProperties
val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
val superviseDriver = sparkProperties.get("spark.driver.supervise")
val driverMemory = sparkProperties.get("spark.driver.memory")
val driverCores = sparkProperties.get("spark.driver.cores")
val appArgs = request.appArgs
val environmentVariables = request.environmentVariables
val name = request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
// Construct driver description
val conf = new SparkConf(false).setAll(sparkProperties)
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(
mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
val submitDate = new Date()
val submissionId = newDriverId(submitDate)
new MesosDriverDescription(
name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
command, request.sparkProperties, submissionId, submitDate)
}
protected override def handleSubmit(
requestMessageJson: String,
requestMessage: SubmitRestProtocolMessage,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val driverDescription = buildDriverDescription(submitRequest)
val s = scheduler.submitDriver(driverDescription)
s.serverSparkVersion = sparkVersion
val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
if (unknownFields.nonEmpty) {
// If there are fields that the server does not know about, warn the client
s.unknownFields = unknownFields
}
s
case unexpected =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError(s"Received message of unexpected type ${unexpected.messageType}.")
}
}
}
private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
extends KillRequestServlet {
protected override def handleKill(submissionId: String): KillSubmissionResponse = {
val k = scheduler.killDriver(submissionId)
k.serverSparkVersion = sparkVersion
k
}
}
private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
extends StatusRequestServlet {
protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
val d = scheduler.getDriverStatus(submissionId)
d.serverSparkVersion = sparkVersion
d
}
}

View file

@ -18,20 +18,17 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{List => JList}
import java.util.Collections
import java.util.{Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@ -49,17 +46,10 @@ private[spark] class CoarseMesosSchedulerBackend(
master: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with MScheduler
with Logging {
with MesosSchedulerUtils {
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Lock used to wait for scheduler to be registered
var isRegistered = false
val registeredLock = new Object()
// Driver for talking to Mesos
var driver: SchedulerDriver = null
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
@ -87,26 +77,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def start() {
super.start()
synchronized {
new Thread("CoarseMesosSchedulerBackend driver") {
setDaemon(true)
override def run() {
val scheduler = CoarseMesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try { {
val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
}
} catch {
case e: Exception => logError("driver.run() failed", e)
}
}
}.start()
waitForRegister()
}
val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
}
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
@ -150,8 +122,10 @@ private[spark] class CoarseMesosSchedulerBackend(
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
val uri = conf.getOption("spark.executor.uri")
.orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
if (uri.isEmpty) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
@ -164,7 +138,7 @@ private[spark] class CoarseMesosSchedulerBackend(
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
val basename = uri.get.split('/').last.split('.').head
command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
@ -173,7 +147,7 @@ private[spark] class CoarseMesosSchedulerBackend(
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
command.build()
}
@ -183,18 +157,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
}
}
def waitForRegister() {
registeredLock.synchronized {
while (!isRegistered) {
registeredLock.wait()
}
}
markRegistered()
}
override def disconnected(d: SchedulerDriver) {}
@ -245,14 +208,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}
/** Helper function to pull out a resource from a Mesos Resources protobuf */
private def getResource(res: JList[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
0
}
/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
@ -284,7 +239,8 @@ private[spark] class CoarseMesosSchedulerBackend(
"is Spark installed on it?")
}
}
driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
// In case we'd rejected everything before but have now lost a node
mesosDriver.reviveOffers()
}
}
}
@ -296,8 +252,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def stop() {
super.stop()
if (driver != null) {
driver.stop()
if (mesosDriver != null) {
mesosDriver.stop()
}
}

View file

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import scala.collection.JavaConversions._
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.util.Utils
/**
* Persistence engine factory that is responsible for creating new persistence engines
* to store Mesos cluster mode state.
*/
private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) {
def createEngine(path: String): MesosClusterPersistenceEngine
}
/**
* Mesos cluster persistence engine is responsible for persisting Mesos cluster mode
* specific state, so that on failover all the state can be recovered and the scheduler
* can resume managing the drivers.
*/
private[spark] trait MesosClusterPersistenceEngine {
def persist(name: String, obj: Object): Unit
def expunge(name: String): Unit
def fetch[T](name: String): Option[T]
def fetchAll[T](): Iterable[T]
}
/**
* Zookeeper backed persistence engine factory.
* All Zk engines created from this factory shares the same Zookeeper client, so
* all of them reuses the same connection pool.
*/
private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf)
extends MesosClusterPersistenceEngineFactory(conf) {
lazy val zk = SparkCuratorUtil.newClient(conf, "spark.mesos.deploy.zookeeper.url")
def createEngine(path: String): MesosClusterPersistenceEngine = {
new ZookeeperMesosClusterPersistenceEngine(path, zk, conf)
}
}
/**
* Black hole persistence engine factory that creates black hole
* persistence engines, which stores nothing.
*/
private[spark] class BlackHoleMesosClusterPersistenceEngineFactory
extends MesosClusterPersistenceEngineFactory(null) {
def createEngine(path: String): MesosClusterPersistenceEngine = {
new BlackHoleMesosClusterPersistenceEngine
}
}
/**
* Black hole persistence engine that stores nothing.
*/
private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine {
override def persist(name: String, obj: Object): Unit = {}
override def fetch[T](name: String): Option[T] = None
override def expunge(name: String): Unit = {}
override def fetchAll[T](): Iterable[T] = Iterable.empty[T]
}
/**
* Zookeeper based Mesos cluster persistence engine, that stores cluster mode state
* into Zookeeper. Each engine object is operating under one folder in Zookeeper, but
* reuses a shared Zookeeper client.
*/
private[spark] class ZookeeperMesosClusterPersistenceEngine(
baseDir: String,
zk: CuratorFramework,
conf: SparkConf)
extends MesosClusterPersistenceEngine with Logging {
private val WORKING_DIR =
conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
def path(name: String): String = {
WORKING_DIR + "/" + name
}
override def expunge(name: String): Unit = {
zk.delete().forPath(path(name))
}
override def persist(name: String, obj: Object): Unit = {
val serialized = Utils.serialize(obj)
val zkPath = path(name)
zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized)
}
override def fetch[T](name: String): Option[T] = {
val zkPath = path(name)
try {
val fileData = zk.getData().forPath(zkPath)
Some(Utils.deserialize[T](fileData))
} catch {
case e: NoNodeException => None
case e: Exception => {
logWarning("Exception while reading persisted file, deleting", e)
zk.delete().forPath(zkPath)
None
}
}
}
override def fetchAll[T](): Iterable[T] = {
zk.getChildren.forPath(WORKING_DIR).map(fetch[T]).flatten
}
}

View file

@ -0,0 +1,608 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Date, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
/**
* Tracks the current state of a Mesos Task that runs a Spark driver.
* @param driverDescription Submitted driver description from
* [[org.apache.spark.deploy.rest.mesos.MesosRestServer]]
* @param taskId Mesos TaskID generated for the task
* @param slaveId Slave ID that the task is assigned to
* @param mesosTaskStatus The last known task status update.
* @param startDate The date the task was launched
*/
private[spark] class MesosClusterSubmissionState(
val driverDescription: MesosDriverDescription,
val taskId: TaskID,
val slaveId: SlaveID,
var mesosTaskStatus: Option[TaskStatus],
var startDate: Date)
extends Serializable {
def copy(): MesosClusterSubmissionState = {
new MesosClusterSubmissionState(
driverDescription, taskId, slaveId, mesosTaskStatus, startDate)
}
}
/**
* Tracks the retry state of a driver, which includes the next time it should be scheduled
* and necessary information to do exponential backoff.
* This class is not thread-safe, and we expect the caller to handle synchronizing state.
* @param lastFailureStatus Last Task status when it failed.
* @param retries Number of times it has been retried.
* @param nextRetry Time at which it should be retried next
* @param waitTime The amount of time driver is scheduled to wait until next retry.
*/
private[spark] class MesosClusterRetryState(
val lastFailureStatus: TaskStatus,
val retries: Int,
val nextRetry: Date,
val waitTime: Int) extends Serializable {
def copy(): MesosClusterRetryState =
new MesosClusterRetryState(lastFailureStatus, retries, nextRetry, waitTime)
}
/**
* The full state of the cluster scheduler, currently being used for displaying
* information on the UI.
* @param frameworkId Mesos Framework id for the cluster scheduler.
* @param masterUrl The Mesos master url
* @param queuedDrivers All drivers queued to be launched
* @param launchedDrivers All launched or running drivers
* @param finishedDrivers All terminated drivers
* @param pendingRetryDrivers All drivers pending to be retried
*/
private[spark] class MesosClusterSchedulerState(
val frameworkId: String,
val masterUrl: Option[String],
val queuedDrivers: Iterable[MesosDriverDescription],
val launchedDrivers: Iterable[MesosClusterSubmissionState],
val finishedDrivers: Iterable[MesosClusterSubmissionState],
val pendingRetryDrivers: Iterable[MesosDriverDescription])
/**
* A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
* as Mesos tasks in a Mesos cluster.
* All drivers are launched asynchronously by the framework, which will eventually be launched
* by one of the slaves in the cluster. The results of the driver will be stored in slave's task
* sandbox which is accessible by visiting the Mesos UI.
* This scheduler supports recovery by persisting all its state and performs task reconciliation
* on recover, which gets all the latest state for all the drivers from Mesos master.
*/
private[spark] class MesosClusterScheduler(
engineFactory: MesosClusterPersistenceEngineFactory,
conf: SparkConf)
extends Scheduler with MesosSchedulerUtils {
var frameworkUrl: String = _
private val metricsSystem =
MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
private val master = conf.get("spark.master")
private val appName = conf.get("spark.app.name")
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new ReentrantLock()
private val finishedDrivers =
new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
private var frameworkId: String = null
// Holds all the launched drivers and current launch state, keyed by driver id.
private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]()
// Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation.
// All drivers that are loaded after failover are added here, as we need get the latest
// state of the tasks from Mesos.
private val pendingRecover = new mutable.HashMap[String, SlaveID]()
// Stores all the submitted drivers that hasn't been launched.
private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
// All supervised drivers that are waiting to retry after termination.
private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
private val queuedDriversState = engineFactory.createEngine("driverQueue")
private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
private val pendingRetryDriversState = engineFactory.createEngine("retryList")
// Flag to mark if the scheduler is ready to be called, which is until the scheduler
// is registered with Mesos master.
@volatile protected var ready = false
private var masterInfo: Option[MasterInfo] = None
def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
val c = new CreateSubmissionResponse
if (!ready) {
c.success = false
c.message = "Scheduler is not ready to take requests"
return c
}
stateLock.synchronized {
if (isQueueFull()) {
c.success = false
c.message = "Already reached maximum submission size"
return c
}
c.submissionId = desc.submissionId
queuedDriversState.persist(desc.submissionId, desc)
queuedDrivers += desc
c.success = true
}
c
}
def killDriver(submissionId: String): KillSubmissionResponse = {
val k = new KillSubmissionResponse
if (!ready) {
k.success = false
k.message = "Scheduler is not ready to take requests"
return k
}
k.submissionId = submissionId
stateLock.synchronized {
// We look for the requested driver in the following places:
// 1. Check if submission is running or launched.
// 2. Check if it's still queued.
// 3. Check if it's in the retry list.
// 4. Check if it has already completed.
if (launchedDrivers.contains(submissionId)) {
val task = launchedDrivers(submissionId)
mesosDriver.killTask(task.taskId)
k.success = true
k.message = "Killing running driver"
} else if (removeFromQueuedDrivers(submissionId)) {
k.success = true
k.message = "Removed driver while it's still pending"
} else if (removeFromPendingRetryDrivers(submissionId)) {
k.success = true
k.message = "Removed driver while it's being retried"
} else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
k.success = false
k.message = "Driver already terminated"
} else {
k.success = false
k.message = "Cannot find driver"
}
}
k
}
def getDriverStatus(submissionId: String): SubmissionStatusResponse = {
val s = new SubmissionStatusResponse
if (!ready) {
s.success = false
s.message = "Scheduler is not ready to take requests"
return s
}
s.submissionId = submissionId
stateLock.synchronized {
if (queuedDrivers.exists(_.submissionId.equals(submissionId))) {
s.success = true
s.driverState = "QUEUED"
} else if (launchedDrivers.contains(submissionId)) {
s.success = true
s.driverState = "RUNNING"
launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString)
} else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
s.success = true
s.driverState = "FINISHED"
finishedDrivers
.find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus
.foreach(state => s.message = state.toString)
} else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) {
val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId))
.get.retryState.get.lastFailureStatus
s.success = true
s.driverState = "RETRYING"
s.message = status.toString
} else {
s.success = false
s.driverState = "NOT_FOUND"
}
}
s
}
private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity
/**
* Recover scheduler state that is persisted.
* We still need to do task reconciliation to be up to date of the latest task states
* as it might have changed while the scheduler is failing over.
*/
private def recoverState(): Unit = {
stateLock.synchronized {
launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state =>
launchedDrivers(state.taskId.getValue) = state
pendingRecover(state.taskId.getValue) = state.slaveId
}
queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d)
// There is potential timing issue where a queued driver might have been launched
// but the scheduler shuts down before the queued driver was able to be removed
// from the queue. We try to mitigate this issue by walking through all queued drivers
// and remove if they're already launched.
queuedDrivers
.filter(d => launchedDrivers.contains(d.submissionId))
.foreach(d => removeFromQueuedDrivers(d.submissionId))
pendingRetryDriversState.fetchAll[MesosDriverDescription]()
.foreach(s => pendingRetryDrivers += s)
// TODO: Consider storing finished drivers so we can show them on the UI after
// failover. For now we clear the history on each recovery.
finishedDrivers.clear()
}
}
/**
* Starts the cluster scheduler and wait until the scheduler is registered.
* This also marks the scheduler to be ready for requests.
*/
def start(): Unit = {
// TODO: Implement leader election to make sure only one framework running in the cluster.
val fwId = schedulerState.fetch[String]("frameworkId")
val builder = FrameworkInfo.newBuilder()
.setUser(Utils.getCurrentUserName())
.setName(appName)
.setWebuiUrl(frameworkUrl)
.setCheckpoint(true)
.setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep running on crash
fwId.foreach { id =>
builder.setId(FrameworkID.newBuilder().setValue(id).build())
frameworkId = id
}
recoverState()
metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
metricsSystem.start()
startScheduler(master, MesosClusterScheduler.this, builder.build())
ready = true
}
def stop(): Unit = {
ready = false
metricsSystem.report()
metricsSystem.stop()
mesosDriver.stop(true)
}
override def registered(
driver: SchedulerDriver,
newFrameworkId: FrameworkID,
masterInfo: MasterInfo): Unit = {
logInfo("Registered as framework ID " + newFrameworkId.getValue)
if (newFrameworkId.getValue != frameworkId) {
frameworkId = newFrameworkId.getValue
schedulerState.persist("frameworkId", frameworkId)
}
markRegistered()
stateLock.synchronized {
this.masterInfo = Some(masterInfo)
if (!pendingRecover.isEmpty) {
// Start task reconciliation if we need to recover.
val statuses = pendingRecover.collect {
case (taskId, slaveId) =>
val newStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId).build())
.setSlaveId(slaveId)
.setState(MesosTaskState.TASK_STAGING)
.build()
launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus))
.getOrElse(newStatus)
}
// TODO: Page the status updates to avoid trying to reconcile
// a large amount of tasks at once.
driver.reconcileTasks(statuses)
}
}
}
private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = {
val appJar = CommandInfo.URI.newBuilder()
.setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
val builder = CommandInfo.newBuilder().addUris(appJar)
val entries =
(conf.getOption("spark.executor.extraLibraryPath").toList ++
desc.command.libraryPathEntries)
val prefixEnv = if (!entries.isEmpty) {
Utils.libraryPathEnvPrefix(entries)
} else {
""
}
val envBuilder = Environment.newBuilder()
desc.command.environment.foreach { case (k, v) =>
envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v).build())
}
// Pass all spark properties to executor.
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
envBuilder.addVariables(
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
val cmdOptions = generateCmdOption(desc)
val executorUri = desc.schedulerProperties.get("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
val appArguments = desc.command.arguments.mkString(" ")
val cmd = if (executorUri.isDefined) {
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
val folderBasename = executorUri.get.split('/').last.split('.').head
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
val cmdJar = s"../${desc.jarUrl.split("/").last}"
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
} else {
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
.orElse(Option(System.getenv("SPARK_HOME")))
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
val cmdJar = desc.jarUrl.split("/").last
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
}
builder.setValue(cmd)
builder.setEnvironment(envBuilder.build())
builder.build()
}
private def generateCmdOption(desc: MesosDriverDescription): Seq[String] = {
var options = Seq(
"--name", desc.schedulerProperties("spark.app.name"),
"--class", desc.command.mainClass,
"--master", s"mesos://${conf.get("spark.master")}",
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
desc.schedulerProperties.get("spark.executor.memory").map { v =>
options ++= Seq("--executor-memory", v)
}
desc.schedulerProperties.get("spark.cores.max").map { v =>
options ++= Seq("--total-executor-cores", v)
}
options
}
private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
override def toString(): String = {
s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
}
}
/**
* This method takes all the possible candidates and attempt to schedule them with Mesos offers.
* Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
* logic on each task.
*/
private def scheduleTasks(
candidates: Seq[MesosDriverDescription],
afterLaunchCallback: (String) => Boolean,
currentOffers: List[ResourceOffer],
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
for (submission <- candidates) {
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
o.cpu >= driverCpu && o.mem >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
offer.cpu -= driverCpu
offer.mem -= driverMem
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
val cpuResource = Resource.newBuilder()
.setName("cpus").setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
val memResource = Resource.newBuilder()
.setName("mem").setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for $appName")
.setSlaveId(offer.offer.getSlaveId)
.setCommand(commandInfo)
.addResources(cpuResource)
.addResources(memResource)
.build()
val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
None, new Date())
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
}
}
}
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
val currentOffers = offers.map { o =>
new ResourceOffer(
o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
}.toList
logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
val currentTime = new Date()
stateLock.synchronized {
// We first schedule all the supervised drivers that are ready to retry.
// This list will be empty if none of the drivers are marked as supervise.
val driversToRetry = pendingRetryDrivers.filter { d =>
d.retryState.get.nextRetry.before(currentTime)
}
scheduleTasks(
driversToRetry,
removeFromPendingRetryDrivers,
currentOffers,
tasks)
// Then we walk through the queued drivers and try to schedule them.
scheduleTasks(
queuedDrivers,
removeFromQueuedDrivers,
currentOffers,
tasks)
}
tasks.foreach { case (offerId, tasks) =>
driver.launchTasks(Collections.singleton(offerId), tasks)
}
offers
.filter(o => !tasks.keySet.contains(o.getId))
.foreach(o => driver.declineOffer(o.getId))
}
def getSchedulerState(): MesosClusterSchedulerState = {
def copyBuffer(
buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
buffer.copyToBuffer(newBuffer)
newBuffer
}
stateLock.synchronized {
new MesosClusterSchedulerState(
frameworkId,
masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"),
copyBuffer(queuedDrivers),
launchedDrivers.values.map(_.copy()).toList,
finishedDrivers.map(_.copy()).toList,
copyBuffer(pendingRetryDrivers))
}
}
override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {}
override def disconnected(driver: SchedulerDriver): Unit = {}
override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = {
logInfo(s"Framework re-registered with master ${masterInfo.getId}")
}
override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
override def error(driver: SchedulerDriver, error: String): Unit = {
logError("Error received: " + error)
}
/**
* Check if the task state is a recoverable state that we can relaunch the task.
* Task state like TASK_ERROR are not relaunchable state since it wasn't able
* to be validated by Mesos.
*/
private def shouldRelaunch(state: MesosTaskState): Boolean = {
state == MesosTaskState.TASK_FAILED ||
state == MesosTaskState.TASK_KILLED ||
state == MesosTaskState.TASK_LOST
}
override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
val taskId = status.getTaskId.getValue
stateLock.synchronized {
if (launchedDrivers.contains(taskId)) {
if (status.getReason == Reason.REASON_RECONCILIATION &&
!pendingRecover.contains(taskId)) {
// Task has already received update and no longer requires reconciliation.
return
}
val state = launchedDrivers(taskId)
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
removeFromLaunchedDrivers(taskId)
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
.getOrElse{ (1, 1) }
val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
val newDriverDescription = state.driverDescription.copy(
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
pendingRetryDrivers += newDriverDescription
pendingRetryDriversState.persist(taskId, newDriverDescription)
} else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
removeFromLaunchedDrivers(taskId)
if (finishedDrivers.size >= retainedDrivers) {
val toRemove = math.max(retainedDrivers / 10, 1)
finishedDrivers.trimStart(toRemove)
}
finishedDrivers += state
}
state.mesosTaskStatus = Option(status)
} else {
logError(s"Unable to find driver $taskId in status update")
}
}
}
override def frameworkMessage(
driver: SchedulerDriver,
executorId: ExecutorID,
slaveId: SlaveID,
message: Array[Byte]): Unit = {}
override def executorLost(
driver: SchedulerDriver,
executorId: ExecutorID,
slaveId: SlaveID,
status: Int): Unit = {}
private def removeFromQueuedDrivers(id: String): Boolean = {
val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
if (index != -1) {
queuedDrivers.remove(index)
queuedDriversState.expunge(id)
true
} else {
false
}
}
private def removeFromLaunchedDrivers(id: String): Boolean = {
if (launchedDrivers.remove(id).isDefined) {
launchedDriversState.expunge(id)
true
} else {
false
}
}
private def removeFromPendingRetryDrivers(id: String): Boolean = {
val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
if (index != -1) {
pendingRetryDrivers.remove(index)
pendingRetryDriversState.expunge(id)
true
} else {
false
}
}
def getQueuedDriversSize: Int = queuedDrivers.size
def getLaunchedDriversSize: Int = launchedDrivers.size
def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
}

View file

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler)
extends Source {
override def sourceName: String = "mesos_cluster"
override def metricRegistry: MetricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] {
override def getValue: Int = scheduler.getQueuedDriversSize
})
metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] {
override def getValue: Int = scheduler.getLaunchedDriversSize
})
metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] {
override def getValue: Int = scheduler.getPendingRetryDriversSize
})
}

View file

@ -18,23 +18,19 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState,
ExecutorInfo => MesosExecutorInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkException, TaskState}
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
@ -47,14 +43,7 @@ private[spark] class MesosSchedulerBackend(
master: String)
extends SchedulerBackend
with MScheduler
with Logging {
// Lock used to wait for scheduler to be registered
var isRegistered = false
val registeredLock = new Object()
// Driver for talking to Mesos
var driver: SchedulerDriver = null
with MesosSchedulerUtils {
// Which slave IDs we have executors on
val slaveIdsWithExecutors = new HashSet[String]
@ -73,26 +62,9 @@ private[spark] class MesosSchedulerBackend(
@volatile var appId: String = _
override def start() {
synchronized {
classLoader = Thread.currentThread.getContextClassLoader
new Thread("MesosSchedulerBackend driver") {
setDaemon(true)
override def run() {
val scheduler = MesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try {
val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
} catch {
case e: Exception => logError("driver.run() failed", e)
}
}
}.start()
waitForRegister()
}
val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
classLoader = Thread.currentThread.getContextClassLoader
startScheduler(master, MesosSchedulerBackend.this, fwInfo)
}
def createExecutorInfo(execId: String): MesosExecutorInfo = {
@ -125,17 +97,19 @@ private[spark] class MesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = sc.conf.get("spark.executor.uri", null)
val uri = sc.conf.getOption("spark.executor.uri")
.orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
val executorBackendName = classOf[MesosExecutorBackend].getName
if (uri == null) {
if (uri.isEmpty) {
val executorPath = new File(executorSparkHome, "/bin/spark-class").getCanonicalPath
command.setValue(s"$prefixEnv $executorPath $executorBackendName")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
val basename = uri.get.split('/').last.split('.').head
command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
val cpus = Resource.newBuilder()
.setName("cpus")
@ -181,18 +155,7 @@ private[spark] class MesosSchedulerBackend(
inClassLoader() {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
}
}
}
def waitForRegister() {
registeredLock.synchronized {
while (!isRegistered) {
registeredLock.wait()
}
markRegistered()
}
}
@ -287,14 +250,6 @@ private[spark] class MesosSchedulerBackend(
}
}
/** Helper function to pull out a resource from a Mesos Resources protobuf */
def getResource(res: JList[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
0
}
/** Turn a Spark TaskDescription into a Mesos task */
def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
@ -339,13 +294,13 @@ private[spark] class MesosSchedulerBackend(
}
override def stop() {
if (driver != null) {
driver.stop()
if (mesosDriver != null) {
mesosDriver.stop()
}
}
override def reviveOffers() {
driver.reviveOffers()
mesosDriver.reviveOffers()
}
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
@ -380,7 +335,7 @@ private[spark] class MesosSchedulerBackend(
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
driver.killTask(
mesosDriver.killTask(
TaskID.newBuilder()
.setValue(taskId.toString).build()
)

View file

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import java.util.List
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConversions._
import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status}
import org.apache.mesos.{MesosSchedulerDriver, Scheduler}
import org.apache.spark.Logging
import org.apache.spark.util.Utils
/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
*/
private[mesos] trait MesosSchedulerUtils extends Logging {
// Lock used to wait for scheduler to be registered
private final val registerLatch = new CountDownLatch(1)
// Driver for talking to Mesos
protected var mesosDriver: MesosSchedulerDriver = null
/**
* Starts the MesosSchedulerDriver with the provided information. This method returns
* only after the scheduler has registered with Mesos.
* @param masterUrl Mesos master connection URL
* @param scheduler Scheduler object
* @param fwInfo FrameworkInfo to pass to the Mesos master
*/
def startScheduler(masterUrl: String, scheduler: Scheduler, fwInfo: FrameworkInfo): Unit = {
synchronized {
if (mesosDriver != null) {
registerLatch.await()
return
}
new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
setDaemon(true)
override def run() {
mesosDriver = new MesosSchedulerDriver(scheduler, fwInfo, masterUrl)
try {
val ret = mesosDriver.run()
logInfo("driver.run() returned with code " + ret)
if (ret.equals(Status.DRIVER_ABORTED)) {
System.exit(1)
}
} catch {
case e: Exception => {
logError("driver.run() failed", e)
System.exit(1)
}
}
}
}.start()
registerLatch.await()
}
}
/**
* Signal that the scheduler has registered with Mesos.
*/
protected def markRegistered(): Unit = {
registerLatch.countDown()
}
/**
* Get the amount of resources for the specified type from the resource list
*/
protected def getResource(res: List[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
0.0
}
}

View file

@ -231,7 +231,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
val childArgsStr = childArgs.mkString(" ")
if (useRest) {
childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
mainClass should be ("org.apache.spark.deploy.rest.StandaloneRestClient")
mainClass should be ("org.apache.spark.deploy.rest.RestSubmissionClient")
} else {
childArgsStr should startWith ("--supervise --memory 4g --cores 5")
childArgsStr should include regex "launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2"

View file

@ -39,9 +39,9 @@ import org.apache.spark.deploy.master.DriverState._
* Tests for the REST application submission protocol used in standalone cluster mode.
*/
class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
private val client = new StandaloneRestClient
private val client = new RestSubmissionClient
private var actorSystem: Option[ActorSystem] = None
private var server: Option[StandaloneRestServer] = None
private var server: Option[RestSubmissionServer] = None
override def afterEach() {
actorSystem.foreach(_.shutdown())
@ -89,7 +89,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
conf.set("spark.app.name", "dreamer")
val appArgs = Array("one", "two", "six")
// main method calls this
val response = StandaloneRestClient.run("app-resource", "main-class", appArgs, conf)
val response = RestSubmissionClient.run("app-resource", "main-class", appArgs, conf)
val submitResponse = getSubmitResponse(response)
assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
assert(submitResponse.serverSparkVersion === SPARK_VERSION)
@ -208,7 +208,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("good request paths") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
val v = StandaloneRestServer.PROTOCOL_VERSION
val v = RestSubmissionServer.PROTOCOL_VERSION
val json = constructSubmitRequest(masterUrl).toJson
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill"
@ -238,7 +238,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("good request paths, bad requests") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
val v = StandaloneRestServer.PROTOCOL_VERSION
val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill"
val statusRequestPath = s"$httpUrl/$v/submissions/status"
@ -276,7 +276,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("bad request paths") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
val v = StandaloneRestServer.PROTOCOL_VERSION
val v = RestSubmissionServer.PROTOCOL_VERSION
val (response1, code1) = sendHttpRequestWithResponse(httpUrl, "GET")
val (response2, code2) = sendHttpRequestWithResponse(s"$httpUrl/", "GET")
val (response3, code3) = sendHttpRequestWithResponse(s"$httpUrl/$v", "GET")
@ -292,7 +292,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
assert(code5 === HttpServletResponse.SC_BAD_REQUEST)
assert(code6 === HttpServletResponse.SC_BAD_REQUEST)
assert(code7 === HttpServletResponse.SC_BAD_REQUEST)
assert(code8 === StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
assert(code8 === RestSubmissionServer.SC_UNKNOWN_PROTOCOL_VERSION)
// all responses should be error responses
val errorResponse1 = getErrorResponse(response1)
val errorResponse2 = getErrorResponse(response2)
@ -310,13 +310,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
assert(errorResponse5.highestProtocolVersion === null)
assert(errorResponse6.highestProtocolVersion === null)
assert(errorResponse7.highestProtocolVersion === null)
assert(errorResponse8.highestProtocolVersion === StandaloneRestServer.PROTOCOL_VERSION)
assert(errorResponse8.highestProtocolVersion === RestSubmissionServer.PROTOCOL_VERSION)
}
test("server returns unknown fields") {
val masterUrl = startSmartServer()
val httpUrl = masterUrl.replace("spark://", "http://")
val v = StandaloneRestServer.PROTOCOL_VERSION
val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val oldJson = constructSubmitRequest(masterUrl).toJson
val oldFields = parse(oldJson).asInstanceOf[JObject].obj
@ -340,7 +340,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
test("client handles faulty server") {
val masterUrl = startFaultyServer()
val httpUrl = masterUrl.replace("spark://", "http://")
val v = StandaloneRestServer.PROTOCOL_VERSION
val v = RestSubmissionServer.PROTOCOL_VERSION
val submitRequestPath = s"$httpUrl/$v/submissions/create"
val killRequestPath = s"$httpUrl/$v/submissions/kill/anything"
val statusRequestPath = s"$httpUrl/$v/submissions/status/anything"
@ -400,9 +400,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val fakeMasterRef = _actorSystem.actorOf(Props(makeFakeMaster))
val _server =
if (faulty) {
new FaultyStandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf)
new FaultyStandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077")
} else {
new StandaloneRestServer(localhost, 0, fakeMasterRef, "spark://fake:7077", conf)
new StandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077")
}
val port = _server.start()
// set these to clean them up after every test
@ -563,20 +563,18 @@ private class SmarterMaster extends Actor {
private class FaultyStandaloneRestServer(
host: String,
requestedPort: Int,
masterConf: SparkConf,
masterActor: ActorRef,
masterUrl: String,
masterConf: SparkConf)
extends StandaloneRestServer(host, requestedPort, masterActor, masterUrl, masterConf) {
masterUrl: String)
extends RestSubmissionServer(host, requestedPort, masterConf) {
protected override val contextToServlet = Map[String, StandaloneRestServlet](
s"$baseContext/create/*" -> new MalformedSubmitServlet,
s"$baseContext/kill/*" -> new InvalidKillServlet,
s"$baseContext/status/*" -> new ExplodingStatusServlet,
"/*" -> new ErrorServlet
)
protected override val submitRequestServlet = new MalformedSubmitServlet
protected override val killRequestServlet = new InvalidKillServlet
protected override val statusRequestServlet = new ExplodingStatusServlet
/** A faulty servlet that produces malformed responses. */
class MalformedSubmitServlet extends SubmitRequestServlet(masterActor, masterUrl, masterConf) {
class MalformedSubmitServlet
extends StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf) {
protected override def sendResponse(
responseMessage: SubmitRestProtocolResponse,
responseServlet: HttpServletResponse): Unit = {
@ -586,7 +584,7 @@ private class FaultyStandaloneRestServer(
}
/** A faulty servlet that produces invalid responses. */
class InvalidKillServlet extends KillRequestServlet(masterActor, masterConf) {
class InvalidKillServlet extends StandaloneKillRequestServlet(masterActor, masterConf) {
protected override def handleKill(submissionId: String): KillSubmissionResponse = {
val k = super.handleKill(submissionId)
k.submissionId = null
@ -595,7 +593,7 @@ private class FaultyStandaloneRestServer(
}
/** A faulty status servlet that explodes. */
class ExplodingStatusServlet extends StatusRequestServlet(masterActor, masterConf) {
class ExplodingStatusServlet extends StandaloneStatusRequestServlet(masterActor, masterConf) {
private def explode: Int = 1 / 0
protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
val s = super.handleStatus(submissionId)

View file

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.mesos
import java.util.Date
import org.scalatest.FunSuite
import org.scalatest.mock.MockitoSugar
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.scheduler.cluster.mesos._
import org.apache.spark.{LocalSparkContext, SparkConf}
class MesosClusterSchedulerSuite extends FunSuite with LocalSparkContext with MockitoSugar {
private val command = new Command("mainClass", Seq("arg"), null, null, null, null)
test("can queue drivers") {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
val scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = { ready = true }
}
scheduler.start()
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date()))
assert(response.success)
val response2 =
scheduler.submitDriver(new MesosDriverDescription(
"d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
assert(response2.success)
val state = scheduler.getSchedulerState()
val queuedDrivers = state.queuedDrivers.toList
assert(queuedDrivers(0).submissionId == response.submissionId)
assert(queuedDrivers(1).submissionId == response2.submissionId)
}
test("can kill queued drivers") {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
val scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = { ready = true }
}
scheduler.start()
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date()))
assert(response.success)
val killResponse = scheduler.killDriver(response.submissionId)
assert(killResponse.success)
val state = scheduler.getSchedulerState()
assert(state.queuedDrivers.isEmpty)
}
}

View file

@ -78,6 +78,9 @@ To verify that the Mesos cluster is ready for Spark, navigate to the Mesos maste
To use Mesos from Spark, you need a Spark binary package available in a place accessible by Mesos, and
a Spark driver program configured to connect to Mesos.
Alternatively, you can also install Spark in the same location in all the Mesos slaves, and configure
`spark.mesos.executor.home` (defaults to SPARK_HOME) to point to that location.
## Uploading Spark Package
When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary
@ -107,7 +110,11 @@ the `make-distribution.sh` script included in a Spark source tarball/checkout.
The Master URLs for Mesos are in the form `mesos://host:5050` for a single-master Mesos
cluster, or `mesos://zk://host:2181` for a multi-master Mesos cluster using ZooKeeper.
The driver also needs some configuration in `spark-env.sh` to interact properly with Mesos:
## Client Mode
In client mode, a Spark Mesos framework is launched directly on the client machine and waits for the driver output.
The driver needs some configuration in `spark-env.sh` to interact properly with Mesos:
1. In `spark-env.sh` set some environment variables:
* `export MESOS_NATIVE_JAVA_LIBRARY=<path to libmesos.so>`. This path is typically
@ -129,8 +136,7 @@ val sc = new SparkContext(conf)
{% endhighlight %}
(You can also use [`spark-submit`](submitting-applications.html) and configure `spark.executor.uri`
in the [conf/spark-defaults.conf](configuration.html#loading-default-configurations) file. Note
that `spark-submit` currently only supports deploying the Spark driver in `client` mode for Mesos.)
in the [conf/spark-defaults.conf](configuration.html#loading-default-configurations) file.)
When running a shell, the `spark.executor.uri` parameter is inherited from `SPARK_EXECUTOR_URI`, so
it does not need to be redundantly passed in as a system property.
@ -139,6 +145,17 @@ it does not need to be redundantly passed in as a system property.
./bin/spark-shell --master mesos://host:5050
{% endhighlight %}
## Cluster mode
Spark on Mesos also supports cluster mode, where the driver is launched in the cluster and the client
can find the results of the driver from the Mesos Web UI.
To use cluster mode, you must start the MesosClusterDispatcher in your cluster via the `sbin/start-mesos-dispatcher.sh` script,
passing in the Mesos master url (e.g: mesos://host:5050).
From the client, you can submit a job to Mesos cluster by running `spark-submit` and specifying the master url
to the url of the MesosClusterDispatcher (e.g: mesos://dispatcher:7077). You can view driver statuses on the
Spark cluster Web UI.
# Mesos Run Modes

40
sbin/start-mesos-dispatcher.sh Executable file
View file

@ -0,0 +1,40 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Starts the Mesos Cluster Dispatcher on the machine this script is executed on.
# The Mesos Cluster Dispatcher is responsible for launching the Mesos framework and
# Rest server to handle driver requests for Mesos cluster mode.
# Only one cluster dispatcher is needed per Mesos cluster.
sbin="`dirname "$0"`"
sbin="`cd "$sbin"; pwd`"
. "$sbin/spark-config.sh"
. "$SPARK_PREFIX/bin/load-spark-env.sh"
if [ "$SPARK_MESOS_DISPATCHER_PORT" = "" ]; then
SPARK_MESOS_DISPATCHER_PORT=7077
fi
if [ "$SPARK_MESOS_DISPATCHER_HOST" = "" ]; then
SPARK_MESOS_DISPATCHER_HOST=`hostname`
fi
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.mesos.MesosClusterDispatcher 1 --host $SPARK_MESOS_DISPATCHER_HOST --port $SPARK_MESOS_DISPATCHER_PORT "$@"

27
sbin/stop-mesos-dispatcher.sh Executable file
View file

@ -0,0 +1,27 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Stop the Mesos Cluster dispatcher on the machine this script is executed on.
sbin=`dirname "$0"`
sbin=`cd "$sbin"; pwd`
. "$sbin/spark-config.sh"
"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.mesos.MesosClusterDispatcher 1