Merge branch 'master' of github.com:mesos/spark into spark-633

This commit is contained in:
Reynold Xin 2012-12-13 22:06:47 -08:00
commit 41e58a519a
12 changed files with 138 additions and 21 deletions

View file

@ -11,7 +11,7 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh" . "$bin/spark-config.sh"
# Start Master # Start Master
"$bin"/start-master.sh --config $SPARK_CONF_DIR "$bin"/start-master.sh
# Start Workers # Start Workers
"$bin"/start-slaves.sh --config $SPARK_CONF_DIR "$bin"/start-slaves.sh

View file

@ -7,13 +7,28 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh" . "$bin/spark-config.sh"
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
. "${SPARK_CONF_DIR}/spark-env.sh"
fi
if [ "$SPARK_MASTER_PORT" = "" ]; then
SPARK_MASTER_PORT=7077
fi
if [ "$SPARK_MASTER_IP" = "" ]; then
SPARK_MASTER_IP=`hostname`
fi
if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
SPARK_MASTER_WEBUI_PORT=8080
fi
# Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves # Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
if [ "$SPARK_PUBLIC_DNS" = "" ]; then if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default: # If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then if [[ `hostname` == *ec2.internal ]]; then
echo "RUNNING ON EC2"
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname` export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi fi
fi fi
"$bin"/spark-daemon.sh start spark.deploy.master.Master "$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

View file

@ -7,7 +7,6 @@ bin=`cd "$bin"; pwd`
if [ "$SPARK_PUBLIC_DNS" = "" ]; then if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default: # If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then if [[ `hostname` == *ec2.internal ]]; then
echo "RUNNING ON EC2"
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname` export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi fi
fi fi

View file

@ -50,9 +50,14 @@ private[spark] class Executor extends Logging {
override def uncaughtException(thread: Thread, exception: Throwable) { override def uncaughtException(thread: Thread, exception: Throwable) {
try { try {
logError("Uncaught exception in thread " + thread, exception) logError("Uncaught exception in thread " + thread, exception)
System.exit(1) if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
} catch { } catch {
case t: Throwable => System.exit(2) case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM)
case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
} }
} }
} }

View file

@ -0,0 +1,43 @@
package spark.executor
/**
* These are exit codes that executors should use to provide the master with information about
* executor failures assuming that cluster management framework can capture the exit codes (but
* perhaps not log files). The exit code constants here are chosen to be unlikely to conflict
* with "natural" exit statuses that may be caused by the JVM or user code. In particular,
* exit codes 128+ arise on some Unix-likes as a result of signals, and it appears that the
* OpenJDK JVM may use exit code 1 in some of its own "last chance" code.
*/
private[spark]
object ExecutorExitCode {
/** The default uncaught exception handler was reached. */
val UNCAUGHT_EXCEPTION = 50
/** The default uncaught exception handler was called and an exception was encountered while
logging the exception. */
val UNCAUGHT_EXCEPTION_TWICE = 51
/** The default uncaught exception handler was reached, and the uncaught exception was an
OutOfMemoryError. */
val OOM = 52
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed"
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128)
" (died from signal " + (exitCode - 128) + "?)"
else
""
)
}
}
}

View file

@ -249,15 +249,22 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
} }
def slaveLost(slaveId: String) { def slaveLost(slaveId: String, reason: ExecutorLossReason) {
var failedHost: Option[String] = None var failedHost: Option[String] = None
synchronized { synchronized {
val host = slaveIdToHost(slaveId) val host = slaveIdToHost(slaveId)
if (hostsAlive.contains(host)) { if (hostsAlive.contains(host)) {
logError("Lost an executor on " + host + ": " + reason)
slaveIdsWithExecutors -= slaveId slaveIdsWithExecutors -= slaveId
hostsAlive -= host hostsAlive -= host
activeTaskSetsQueue.foreach(_.hostLost(host)) activeTaskSetsQueue.foreach(_.hostLost(host))
failedHost = Some(host) failedHost = Some(host)
} else {
// We may get multiple slaveLost() calls with different loss reasons. For example, one
// may be triggered by a dropped connection from the slave while another may be a report
// of executor termination from Mesos. We produce log messages for both so we eventually
// report the termination reason.
logError("Lost an executor on " + host + " (already removed): " + reason)
} }
} }
if (failedHost != None) { if (failedHost != None) {

View file

@ -0,0 +1,21 @@
package spark.scheduler.cluster
import spark.executor.ExecutorExitCode
/**
* Represents an explanation for a executor or whole slave failing or exiting.
*/
private[spark]
class ExecutorLossReason(val message: String) {
override def toString: String = message
}
private[spark]
case class ExecutorExited(val exitCode: Int)
extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
}
private[spark]
case class SlaveLost(_message: String = "Slave lost")
extends ExecutorLossReason(_message) {
}

View file

@ -19,6 +19,7 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
val executorIdToSlaveId = new HashMap[String, String]
// Memory used by each executor (in megabytes) // Memory used by each executor (in megabytes)
val executorMemory = { val executorMemory = {
@ -65,9 +66,27 @@ private[spark] class SparkDeploySchedulerBackend(
} }
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) { def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
executorIdToSlaveId += id -> workerId
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format( logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
id, host, cores, Utils.memoryMegabytesToString(memory))) id, host, cores, Utils.memoryMegabytesToString(memory)))
} }
def executorRemoved(id: String, message: String) {} def executorRemoved(id: String, message: String) {
var reason: ExecutorLossReason = SlaveLost(message)
if (message.startsWith("Command exited with code ")) {
try {
reason = ExecutorExited(message.substring("Command exited with code ".length).toInt)
} catch {
case nfe: NumberFormatException => {}
}
}
logInfo("Executor %s removed: %s".format(id, message))
executorIdToSlaveId.get(id) match {
case Some(slaveId) =>
executorIdToSlaveId.remove(id)
scheduler.slaveLost(slaveId, reason)
case None =>
logInfo("No slave ID known for executor %s".format(id))
}
}
} }

View file

@ -69,13 +69,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
context.stop(self) context.stop(self)
case Terminated(actor) => case Terminated(actor) =>
actorToSlaveId.get(actor).foreach(removeSlave) actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated"))
case RemoteClientDisconnected(transport, address) => case RemoteClientDisconnected(transport, address) =>
addressToSlaveId.get(address).foreach(removeSlave) addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected"))
case RemoteClientShutdown(transport, address) => case RemoteClientShutdown(transport, address) =>
addressToSlaveId.get(address).foreach(removeSlave) addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown"))
} }
// Make fake resource offers on all slaves // Make fake resource offers on all slaves
@ -99,7 +99,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
} }
// Remove a disconnected slave from the cluster // Remove a disconnected slave from the cluster
def removeSlave(slaveId: String) { def removeSlave(slaveId: String, reason: String) {
logInfo("Slave " + slaveId + " disconnected, so removing it") logInfo("Slave " + slaveId + " disconnected, so removing it")
val numCores = freeCores(slaveId) val numCores = freeCores(slaveId)
actorToSlaveId -= slaveActor(slaveId) actorToSlaveId -= slaveActor(slaveId)
@ -109,7 +109,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
freeCores -= slaveId freeCores -= slaveId
slaveHost -= slaveId slaveHost -= slaveId
totalCoreCount.addAndGet(-numCores) totalCoreCount.addAndGet(-numCores)
scheduler.slaveLost(slaveId) scheduler.slaveLost(slaveId, SlaveLost(reason))
} }
} }

View file

@ -267,17 +267,23 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
logInfo("Mesos slave lost: " + slaveId.getValue) logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized { synchronized {
slaveIdsWithExecutors -= slaveId.getValue slaveIdsWithExecutors -= slaveId.getValue
} }
scheduler.slaveLost(slaveId.getValue) scheduler.slaveLost(slaveId.getValue, reason)
} }
override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) recordSlaveLost(d, slaveId, SlaveLost())
slaveLost(d, s) }
override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
recordSlaveLost(d, slaveId, ExecutorExited(status))
} }
// TODO: query Mesos for number of cores // TODO: query Mesos for number of cores

View file

@ -10,6 +10,8 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import spark.executor.ExecutorExitCode
import spark.Utils import spark.Utils
/** /**
@ -162,7 +164,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
if (!foundLocalDir) { if (!foundLocalDir) {
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
" attempts to create local dir in " + rootDir) " attempts to create local dir in " + rootDir)
System.exit(1) System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
} }
logInfo("Created local directory at " + localDir) logInfo("Created local directory at " + localDir)
localDir localDir

View file

@ -68,7 +68,7 @@ Finally, the following configuration options can be passed to the master and wor
To launch a Spark standalone cluster with the deploy scripts, you need to set up two files, `conf/spark-env.sh` and `conf/slaves`. The `conf/spark-env.sh` file lets you specify global settings for the master and slave instances, such as memory, or port numbers to bind to, while `conf/slaves` is a list of slave nodes. The system requires that all the slave machines have the same configuration files, so *copy these files to each machine*. To launch a Spark standalone cluster with the deploy scripts, you need to set up two files, `conf/spark-env.sh` and `conf/slaves`. The `conf/spark-env.sh` file lets you specify global settings for the master and slave instances, such as memory, or port numbers to bind to, while `conf/slaves` is a list of slave nodes. The system requires that all the slave machines have the same configuration files, so *copy these files to each machine*.
In `conf/spark-env.sh`, you can set the following parameters, in addition to the [standard Spark configuration settongs](configuration.html): In `conf/spark-env.sh`, you can set the following parameters, in addition to the [standard Spark configuration settings](configuration.html):
<table class="table"> <table class="table">
<tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr> <tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr>