Merge pull request #108 from alig/master

Changes to enable executing by using HDFS as a synchronization point between driver and executors, as well as ensuring executors exit properly.
This commit is contained in:
Matei Zaharia 2013-10-25 18:28:43 -07:00
commit bab496c120
7 changed files with 123 additions and 8 deletions

View file

@ -56,16 +56,15 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._ import org.apache.spark.rdd._
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, ClusterScheduler} SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
TimeStampedHashMap, Utils} TimeStampedHashMap, Utils}
/** /**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
@ -158,6 +157,8 @@ class SparkContext(
val SPARK_REGEX = """spark://(.*)""".r val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster // Regular expression for connection to Mesos cluster
val MESOS_REGEX = """mesos://(.*)""".r val MESOS_REGEX = """mesos://(.*)""".r
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r
master match { master match {
case "local" => case "local" =>
@ -176,6 +177,12 @@ class SparkContext(
scheduler.initialize(backend) scheduler.initialize(backend)
scheduler scheduler
case SIMR_REGEX(simrUrl) =>
val scheduler = new ClusterScheduler(this)
val backend = new SimrSchedulerBackend(scheduler, this, simrUrl)
scheduler.initialize(backend)
scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt val memoryPerSlaveInt = memoryPerSlave.toInt
@ -680,8 +687,7 @@ class SparkContext(
*/ */
def addJar(path: String) { def addJar(path: String) {
if (path == null) { if (path == null) {
logWarning("null specified as parameter to addJar", logWarning("null specified as parameter to addJar")
new SparkException("null specified as parameter to addJar"))
} else { } else {
var key = "" var key = ""
if (path.contains("\\")) { if (path.contains("\\")) {

View file

@ -80,6 +80,11 @@ private[spark] class CoarseGrainedExecutorBackend(
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.") logError("Driver terminated or disconnected! Shutting down.")
System.exit(1) System.exit(1)
case StopExecutor =>
logInfo("Driver commanded a shutdown")
context.stop(self)
context.system.shutdown()
} }
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {

View file

@ -60,6 +60,10 @@ private[spark] object CoarseGrainedClusterMessages {
case object StopDriver extends CoarseGrainedClusterMessage case object StopDriver extends CoarseGrainedClusterMessage
case object StopExecutor extends CoarseGrainedClusterMessage
case object StopExecutors extends CoarseGrainedClusterMessage
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
} }

View file

@ -101,6 +101,13 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
sender ! true sender ! true
context.stop(self) context.stop(self)
case StopExecutors =>
logInfo("Asking each executor to shut down")
for (executor <- executorActor.values) {
executor ! StopExecutor
}
sender ! true
case RemoveExecutor(executorId, reason) => case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason) removeExecutor(executorId, reason)
sender ! true sender ! true
@ -170,11 +177,24 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
def stopExecutors() {
try {
if (driverActor != null) {
logInfo("Shutting down all executors")
val future = driverActor.ask(StopExecutors)(timeout)
Await.ready(future, timeout)
}
} catch {
case e: Exception =>
throw new SparkException("Error asking standalone scheduler to shut down executors", e)
}
}
override def stop() { override def stop() {
try { try {
if (driverActor != null) { if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout) val future = driverActor.ask(StopDriver)(timeout)
Await.result(future, timeout) Await.ready(future, timeout)
} }
} catch { } catch {
case e: Exception => case e: Exception =>
@ -197,7 +217,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
def removeExecutor(executorId: String, reason: String) { def removeExecutor(executorId: String, reason: String) {
try { try {
val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
Await.result(future, timeout) Await.ready(future, timeout)
} catch { } catch {
case e: Exception => case e: Exception =>
throw new SparkException("Error notifying standalone scheduler's driver actor", e) throw new SparkException("Error notifying standalone scheduler's driver actor", e)

View file

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext}
private[spark] class SimrSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
driverFilePath: String)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
val tmpPath = new Path(driverFilePath + "_tmp")
val filePath = new Path(driverFilePath)
val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
override def start() {
super.start()
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val conf = new Configuration()
val fs = FileSystem.get(conf)
logInfo("Writing to HDFS file: " + driverFilePath)
logInfo("Writing Akka address: " + driverUrl)
// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
temp.writeUTF(driverUrl)
temp.writeInt(maxCores)
temp.close()
// "Atomic" rename
fs.rename(tmpPath, filePath)
}
override def stop() {
val conf = new Configuration()
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
super.stopExecutors()
super.stop()
}
}

View file

@ -38,6 +38,6 @@ object SparkPi {
if (x*x + y*y < 1) 1 else 0 if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _) }.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n) println("Pi is roughly " + 4.0 * count / n)
System.exit(0) spark.stop()
} }
} }

View file

@ -633,6 +633,20 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
Result(true, shouldReplay) Result(true, shouldReplay)
} }
def addAllClasspath(args: Seq[String]): Unit = {
var added = false
var totalClasspath = ""
for (arg <- args) {
val f = File(arg).normalize
if (f.exists) {
added = true
addedClasspath = ClassPath.join(addedClasspath, f.path)
totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath)
}
}
if (added) replay()
}
def addClasspath(arg: String): Unit = { def addClasspath(arg: String): Unit = {
val f = File(arg).normalize val f = File(arg).normalize
if (f.exists) { if (f.exists) {