From fbe40c5806a01e38c56b12a09bc4b84681a99602 Mon Sep 17 00:00:00 2001 From: Vadim Chekan Date: Fri, 20 Sep 2013 12:13:48 -0700 Subject: [PATCH 01/16] Serialize and restore spark.cleaner.ttl to savepoint --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 2 ++ .../scala/org/apache/spark/streaming/StreamingContext.scala | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 2d8f072624..bb9febad38 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.MetadataCleaner private[streaming] @@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.jobManager.getPendingTimes() + val delaySeconds = MetadataCleaner.getDelaySeconds def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 878725c705..098081d245 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -100,6 +100,10 @@ class StreamingContext private ( "both SparkContext and checkpoint as null") } + if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) { + MetadataCleaner.setDelaySeconds(cp_.delaySeconds) + } + if (MetadataCleaner.getDelaySeconds < 0) { throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") From 7b5ae23a376f63f70eb4a368e76cfd5c7653309d Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 4 Oct 2013 13:56:43 -0700 Subject: [PATCH 02/16] Renamed StandaloneX to CoarseGrainedX. The previous names were confusing because the components weren't just used in Standalone mode -- in fact, the scheduler used for Standalone mode is called SparkDeploySchedulerBackend. So, the previous names were misleading. --- .../scala/org/apache/spark/SparkContext.scala | 4 ++-- ...ala => CoarseGrainedExecutorBackend.scala} | 12 ++++++----- ...cala => CoarseGrainedClusterMessage.scala} | 20 +++++++++---------- ...la => CoarseGrainedSchedulerBackend.scala} | 19 ++++++++++-------- .../cluster/SparkDeploySchedulerBackend.scala | 6 +++--- .../mesos/CoarseMesosSchedulerBackend.scala | 15 +++++++------- 6 files changed, 41 insertions(+), 35 deletions(-) rename core/src/main/scala/org/apache/spark/executor/{StandaloneExecutorBackend.scala => CoarseGrainedExecutorBackend.scala} (89%) rename core/src/main/scala/org/apache/spark/scheduler/cluster/{StandaloneClusterMessage.scala => CoarseGrainedClusterMessage.scala} (74%) rename core/src/main/scala/org/apache/spark/scheduler/cluster/{StandaloneSchedulerBackend.scala => CoarseGrainedSchedulerBackend.scala} (89%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2fb4a53072..62409e7257 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -55,7 +55,7 @@ import org.apache.spark.deploy.LocalSparkCluster import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} @@ -205,7 +205,7 @@ class SparkContext( throw new SparkException("YARN mode not available ?", th) } } - val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem) + val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem) scheduler.initialize(backend) scheduler diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala similarity index 89% rename from core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala rename to core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 7839023868..7d74786f6b 100644 --- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -24,11 +24,11 @@ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClie import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.TaskState.TaskState -import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{Utils, AkkaUtils} -private[spark] class StandaloneExecutorBackend( +private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, @@ -79,7 +79,7 @@ private[spark] class StandaloneExecutorBackend( } } -private[spark] object StandaloneExecutorBackend { +private[spark] object CoarseGrainedExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { // Debug code Utils.checkHost(hostname) @@ -91,7 +91,7 @@ private[spark] object StandaloneExecutorBackend { val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)), + Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)), name = "Executor") actorSystem.awaitTermination() } @@ -99,7 +99,9 @@ private[spark] object StandaloneExecutorBackend { def main(args: Array[String]) { if (args.length < 4) { //the reason we allow the last frameworkId argument is to make it easy to kill rogue executors - System.err.println("Usage: StandaloneExecutorBackend []") + System.err.println( + "Usage: CoarseGrainedExecutorBackend " + + "[]") System.exit(1) } run(args(0), args(1), args(2), args(3).toInt) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala similarity index 74% rename from core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala rename to core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index c0b836bf1a..d017725eac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -24,26 +24,26 @@ import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{Utils, SerializableBuffer} -private[spark] sealed trait StandaloneClusterMessage extends Serializable +private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable -private[spark] object StandaloneClusterMessages { +private[spark] object CoarseGrainedClusterMessages { // Driver to executors - case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage + case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) - extends StandaloneClusterMessage + extends CoarseGrainedClusterMessage - case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage + case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage // Executors to driver case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) - extends StandaloneClusterMessage { + extends CoarseGrainedClusterMessage { Utils.checkHostPort(hostPort, "Expected host port") } case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, - data: SerializableBuffer) extends StandaloneClusterMessage + data: SerializableBuffer) extends CoarseGrainedClusterMessage object StatusUpdate { /** Alternate factory method that takes a ByteBuffer directly for the data field */ @@ -54,10 +54,10 @@ private[spark] object StandaloneClusterMessages { } // Internal messages in driver - case object ReviveOffers extends StandaloneClusterMessage + case object ReviveOffers extends CoarseGrainedClusterMessage - case object StopDriver extends StandaloneClusterMessage + case object StopDriver extends CoarseGrainedClusterMessage - case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage + case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala similarity index 89% rename from core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala rename to core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f3aeea43d5..11ed0e9b62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -30,16 +30,19 @@ import akka.util.duration._ import org.apache.spark.{SparkException, Logging, TaskState} import org.apache.spark.scheduler.TaskDescription -import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.Utils /** - * A standalone scheduler backend, which waits for standalone executors to connect to it through - * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained - * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*). + * A scheduler backend that waits for coarse grained executors to connect to it through Akka. + * This backend holds onto each executor for the duration of the Spark job rather than relinquishing + * executors whenever a task is done and asking the scheduler to launch a new executor for + * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the + * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode + * (spark.deploy.*). */ private[spark] -class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) +class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) extends SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed @@ -159,7 +162,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } } driverActor = actorSystem.actorOf( - Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME) + Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") @@ -195,6 +198,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } } -private[spark] object StandaloneSchedulerBackend { - val ACTOR_NAME = "StandaloneScheduler" +private[spark] object CoarseGrainedSchedulerBackend { + val ACTOR_NAME = "CoarseGrainedScheduler" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9c49768c0c..226ac59427 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -28,7 +28,7 @@ private[spark] class SparkDeploySchedulerBackend( sc: SparkContext, master: String, appName: String) - extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with ClientListener with Logging { @@ -44,10 +44,10 @@ private[spark] class SparkDeploySchedulerBackend( // The endpoint for executors to talk to us val driverUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") val command = Command( - "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) + "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 8f2eef9a53..4b131d87ce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -30,13 +30,14 @@ import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} import org.apache.spark.{SparkException, Logging, SparkContext, TaskState} -import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend} +import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the - * StandaloneBackend mechanism. This class is useful for lower and more predictable latency. + * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable + * latency. * * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to * remove this. @@ -46,7 +47,7 @@ private[spark] class CoarseMesosSchedulerBackend( sc: SparkContext, master: String, appName: String) - extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with MScheduler with Logging { @@ -122,20 +123,20 @@ private[spark] class CoarseMesosSchedulerBackend( val driverUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME) val uri = System.getProperty("spark.executor.uri") if (uri == null) { val runScript = new File(sparkHome, "spark-class").getCanonicalPath command.setValue( - "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + "\"%s\" CoarseGrainedExecutorBackend %s %s %s %d".format( runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.split('/').last.split('.').head command.setValue( - "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format( - basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + "cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d" + .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } return command.build() From fdc52b2f8b2fda38252bc1b6d56c74b21ec234d4 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Sun, 6 Oct 2013 18:45:43 -0700 Subject: [PATCH 03/16] Added back fully qualified class name --- .../scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 4b131d87ce..300fe693f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -128,7 +128,7 @@ private[spark] class CoarseMesosSchedulerBackend( if (uri == null) { val runScript = new File(sparkHome, "spark-class").getCanonicalPath command.setValue( - "\"%s\" CoarseGrainedExecutorBackend %s %s %s %d".format( + "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format( runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) } else { // Grab everything to the first '.'. We'll use that and '*' to From d2c86e718891b40e1cea8b0719462673b6b64e4e Mon Sep 17 00:00:00 2001 From: KarthikTunga Date: Tue, 15 Oct 2013 00:35:44 -0700 Subject: [PATCH 04/16] SPARK-627 - reading --config argument --- bin/slaves.sh | 16 ++++++++++++++++ bin/spark-daemon.sh | 17 +++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/bin/slaves.sh b/bin/slaves.sh index 752565b759..bcb7760130 100755 --- a/bin/slaves.sh +++ b/bin/slaves.sh @@ -46,6 +46,22 @@ bin=`cd "$bin"; pwd` # spark-env.sh. Save it here. HOSTLIST=$SPARK_SLAVES +#check if conf dir passed as an argument +if [ "$1" == "--config" ] +then + shift + conf_dir=$1 + if [ ! -d "$conf_dir" ] + then + echo "$conf_dir is not a valid directory" + echo $usage + exit 1 + else + export SPARK_CONF_DIR=$conf_dir + fi + shift +fi + if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then . "${SPARK_CONF_DIR}/spark-env.sh" fi diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh index 5bfe967fbf..48d552f3db 100755 --- a/bin/spark-daemon.sh +++ b/bin/spark-daemon.sh @@ -43,6 +43,23 @@ bin=`cd "$bin"; pwd` . "$bin/spark-config.sh" # get arguments + +# check if conf dir passed as an argument +if [ "$1" == "--config" ] +then + shift + conf_dir=$1 + if [ ! -d "$conf_dir" ] + then + echo "$conf_dir is not a valid directory" + echo "FOUL :"$usage + exit 1 + else + export SPARK_CONF_DIR=$conf_dir + fi + shift +fi + startStop=$1 shift command=$1 From 707ad8cc4fe9bd65c5bd20b251efbdbe1d00c1ba Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 15 Oct 2013 14:20:27 -0700 Subject: [PATCH 05/16] Unified daemon thread pools --- .../spark/broadcast/BitTorrentBroadcast.scala | 8 +++--- .../apache/spark/broadcast/MultiTracker.scala | 2 +- .../spark/broadcast/TreeBroadcast.scala | 4 +-- .../org/apache/spark/executor/Executor.scala | 3 +-- .../spark/network/ConnectionManager.scala | 3 ++- .../scheduler/cluster/TaskResultGetter.scala | 25 +++---------------- .../scala/org/apache/spark/util/Utils.scala | 22 ++++++++++------ 7 files changed, 29 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala index b6c484bfe1..5332510e87 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala @@ -326,7 +326,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: private var blocksInRequestBitVector = new BitSet(totalBlocks) override def run() { - var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots) + var threadPool = Utils.newDaemonFixedThreadPool( + MultiTracker.MaxChatSlots, "Bit Torrent Chatter") while (hasBlocks.get < totalBlocks) { var numThreadsToCreate = 0 @@ -736,7 +737,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: private var setOfCompletedSources = Set[SourceInfo]() override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() + var threadPool = Utils.newDaemonCachedThreadPool("Bit torrent guide multiple requests") var serverSocket: ServerSocket = null serverSocket = new ServerSocket(0) @@ -927,7 +928,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: class ServeMultipleRequests extends Thread with Logging { // Server at most MultiTracker.MaxChatSlots peers - var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots) + var threadPool = Utils.newDaemonFixedThreadPool( + MultiTracker.MaxChatSlots, "Bit torrent serve multiple requests") override def run() { var serverSocket = new ServerSocket(0) diff --git a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala index 21ec94659e..82ed64f190 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala @@ -137,7 +137,7 @@ extends Logging { class TrackMultipleValues extends Thread with Logging { override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() + var threadPool = Utils.newDaemonCachedThreadPool("Track multiple values") var serverSocket: ServerSocket = null serverSocket = new ServerSocket(DriverTrackerPort) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala index e6674d49a7..51af80a35e 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala @@ -291,7 +291,7 @@ extends Broadcast[T](id) with Logging with Serializable { private var setOfCompletedSources = Set[SourceInfo]() override def run() { - var threadPool = Utils.newDaemonCachedThreadPool() + var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast guide multiple requests") var serverSocket: ServerSocket = null serverSocket = new ServerSocket(0) @@ -493,7 +493,7 @@ extends Broadcast[T](id) with Logging with Serializable { class ServeMultipleRequests extends Thread with Logging { - var threadPool = Utils.newDaemonCachedThreadPool() + var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast serve multiple requests") override def run() { var serverSocket = new ServerSocket(0) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 20323ea038..032eb04f43 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -121,8 +121,7 @@ private[spark] class Executor( } // Start worker thread pool - val threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory) + val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index e15a839c4e..9c2fee4023 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -79,7 +79,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging { private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] private val registerRequests = new SynchronizedQueue[SendingConnection] - implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool()) + implicit val futureExecContext = ExecutionContext.fromExecutor( + Utils.newDaemonCachedThreadPool("Connection manager future execution context")) private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala index feec8ecfe4..4312c46cc1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala @@ -24,33 +24,16 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} import org.apache.spark.serializer.SerializerInstance +import org.apache.spark.util.Utils /** * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. */ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) extends Logging { - private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt - private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt - private val getTaskResultExecutor = new ThreadPoolExecutor( - MIN_THREADS, - MAX_THREADS, - 0L, - TimeUnit.SECONDS, - new LinkedBlockingDeque[Runnable], - new ResultResolverThreadFactory) - - class ResultResolverThreadFactory extends ThreadFactory { - private var counter = 0 - private var PREFIX = "Result resolver thread" - - override def newThread(r: Runnable): Thread = { - val thread = new Thread(r, "%s-%s".format(PREFIX, counter)) - counter += 1 - thread.setDaemon(true) - return thread - } - } + private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt + private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool( + THREADS, "Result resolver thread") protected val serializer = new ThreadLocal[SerializerInstance] { override def initialValue(): SerializerInstance = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f384875cc9..a3b3968c5e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -447,14 +447,17 @@ private[spark] object Utils extends Logging { hostPortParseResults.get(hostPort) } - private[spark] val daemonThreadFactory: ThreadFactory = - new ThreadFactoryBuilder().setDaemon(true).build() + private val daemonThreadFactoryBuilder: ThreadFactoryBuilder = + new ThreadFactoryBuilder().setDaemon(true) /** - * Wrapper over newCachedThreadPool. + * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(): ThreadPoolExecutor = - Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor] + def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { + val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build() + Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] + } /** * Return the string to tell how long has passed in seconds. The passing parameter should be in @@ -465,10 +468,13 @@ private[spark] object Utils extends Logging { } /** - * Wrapper over newFixedThreadPool. + * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. */ - def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = - Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor] + def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { + val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build() + Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor] + } private def listFilesSafely(file: File): Seq[File] = { val files = file.listFiles() From f95a2be04569a63763fa6740ee212b658f0808ab Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 15 Oct 2013 14:51:37 -0700 Subject: [PATCH 06/16] Fixed build error after merging in master --- .../spark/scheduler/cluster/CoarseGrainedClusterMessage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 3d262063d0..a8230ec6bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -31,7 +31,7 @@ private[spark] object CoarseGrainedClusterMessages { // Driver to executors case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage - case class KillTask(taskId: Long, executor: String) extends StandaloneClusterMessage + case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) extends CoarseGrainedClusterMessage From 5b8083fee5dec145f61684070a1977c9beb98cb2 Mon Sep 17 00:00:00 2001 From: Harvey Feng Date: Tue, 15 Oct 2013 18:05:34 -0700 Subject: [PATCH 07/16] Make TaskContext's stageId publicly accessible. --- core/src/main/scala/org/apache/spark/TaskContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 51584d686d..cae983ed4c 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.executor.TaskMetrics class TaskContext( - private[spark] val stageId: Int, + val stageId: Int, val partitionId: Int, val attemptId: Long, val runningLocally: Boolean = false, From c4c76e37a7d66a1fccc532cf18c17046f4f2fdc0 Mon Sep 17 00:00:00 2001 From: Harvey Feng Date: Tue, 15 Oct 2013 18:35:59 -0700 Subject: [PATCH 08/16] Fix line length > 100 chars in SparkHadoopWriter --- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 2bab9d6e3d..2b9f005dc1 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -36,7 +36,8 @@ import org.apache.spark.SerializableWritable * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ -class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable { +class SparkHadoopWriter(@transient jobConf: JobConf) + extends Logging with SparkHadoopMapRedUtil with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) From 65b46236e7bb8273ab99252322ffff84752bb763 Mon Sep 17 00:00:00 2001 From: Harvey Feng Date: Tue, 15 Oct 2013 21:51:52 -0700 Subject: [PATCH 09/16] Proper formatting for SparkHadoopWriter class extensions. --- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 2b9f005dc1..afa76a4a76 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -37,7 +37,9 @@ import org.apache.spark.SerializableWritable * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ class SparkHadoopWriter(@transient jobConf: JobConf) - extends Logging with SparkHadoopMapRedUtil with Serializable { + extends Logging + with SparkHadoopMapRedUtil + with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) From 35befe07bb79411f453cf05d482ff051fa827b47 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 15 Oct 2013 22:51:58 -0700 Subject: [PATCH 10/16] Fixing spark streaming example and a bug in examples build. - Examples assembly included a log4j.properties which clobbered Spark's - Example had an error where some classes weren't serializable - Did some other clean-up in this example --- .../examples/clickstream/PageViewGenerator.scala | 13 +++++++++---- project/SparkBuild.scala | 1 + 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala index 884d6d6f34..de70c50473 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -17,17 +17,19 @@ package org.apache.spark.streaming.examples.clickstream -import java.net.{InetAddress,ServerSocket,Socket,SocketException} -import java.io.{InputStreamReader, BufferedReader, PrintWriter} +import java.net.ServerSocket +import java.io.PrintWriter import util.Random /** Represents a page view on a website with associated dimension data.*/ -class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) { +class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) + extends Serializable { override def toString() : String = { "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) } } -object PageView { + +object PageView extends Serializable { def fromString(in : String) : PageView = { val parts = in.split("\t") new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) @@ -39,6 +41,9 @@ object PageView { * This should be used in tandem with PageViewStream.scala. Example: * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10 * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * + * When running this, you may want to set the root logging level to ERROR in + * conf/log4j.properties to reduce the verbosity of the output. * */ object PageViewGenerator { val pages = Map("http://foo.com/" -> .7, diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 973f1e2f11..f2bbe5358f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -311,6 +311,7 @@ object SparkBuild extends Build { mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard + case "log4j.properties" => MergeStrategy.discard case "META-INF/services/org.apache.hadoop.fs.FileSystem" => MergeStrategy.concat case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first From cc7df2b3ccdee602a6a90964628676e7dc4e0954 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Wed, 16 Oct 2013 10:09:16 -0500 Subject: [PATCH 11/16] Fix yarn build --- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 6d6ef149cc..25da9aa917 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -22,7 +22,7 @@ import org.apache.spark.util.Utils import org.apache.spark.scheduler.SplitInfo import scala.collection import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container} -import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend} +import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse} import org.apache.hadoop.yarn.util.{RackResolver, Records} import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} @@ -211,7 +211,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but .. From ec512583ab95e44d7372bd6222ffa94264e61f24 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 16 Oct 2013 16:57:42 -0700 Subject: [PATCH 12/16] Removed TaskSchedulerListener interface. The interface was used only by the DAG scheduler (so it wasn't necessary to define the additional interface), and the naming makes it very confusing when reading the code (because "listener" was used to describe the DAG scheduler, rather than SparkListeners, which implement a nearly-identical interface but serve a different function). --- .../apache/spark/scheduler/DAGScheduler.scala | 14 +++--- .../spark/scheduler/TaskScheduler.scala | 7 ++- .../scheduler/TaskSchedulerListener.scala | 44 ------------------- .../scheduler/cluster/ClusterScheduler.scala | 14 +++--- .../cluster/ClusterTaskSetManager.scala | 21 ++++----- .../scheduler/local/LocalScheduler.scala | 6 +-- .../scheduler/local/LocalTaskSetManager.scala | 11 ++--- 7 files changed, 37 insertions(+), 80 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7fb614402b..15a04e6558 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -55,20 +55,20 @@ class DAGScheduler( mapOutputTracker: MapOutputTracker, blockManagerMaster: BlockManagerMaster, env: SparkEnv) - extends TaskSchedulerListener with Logging { + extends Logging { def this(taskSched: TaskScheduler) { this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get) } - taskSched.setListener(this) + taskSched.setDAGScheduler(this) // Called by TaskScheduler to report task's starting. - override def taskStarted(task: Task[_], taskInfo: TaskInfo) { + def taskStarted(task: Task[_], taskInfo: TaskInfo) { eventQueue.put(BeginEvent(task, taskInfo)) } // Called by TaskScheduler to report task completions or failures. - override def taskEnded( + def taskEnded( task: Task[_], reason: TaskEndReason, result: Any, @@ -79,18 +79,18 @@ class DAGScheduler( } // Called by TaskScheduler when an executor fails. - override def executorLost(execId: String) { + def executorLost(execId: String) { eventQueue.put(ExecutorLost(execId)) } // Called by TaskScheduler when a host is added - override def executorGained(execId: String, host: String) { + def executorGained(execId: String, host: String) { eventQueue.put(ExecutorGained(execId, host)) } // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or // cancellation of the job itself. - override def taskSetFailed(taskSet: TaskSet, reason: String) { + def taskSetFailed(taskSet: TaskSet, reason: String) { eventQueue.put(TaskSetFailed(taskSet, reason)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 6a51efe8d6..10e0478108 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -24,8 +24,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode * Each TaskScheduler schedulers task for a single SparkContext. * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, * and are responsible for sending the tasks to the cluster, running them, retrying if there - * are failures, and mitigating stragglers. They return events to the DAGScheduler through - * the TaskSchedulerListener interface. + * are failures, and mitigating stragglers. They return events to the DAGScheduler. */ private[spark] trait TaskScheduler { @@ -48,8 +47,8 @@ private[spark] trait TaskScheduler { // Cancel a stage. def cancelTasks(stageId: Int) - // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called. - def setListener(listener: TaskSchedulerListener): Unit + // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. + def setDAGScheduler(dagScheduler: DAGScheduler): Unit // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala deleted file mode 100644 index 593fa9fb93..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import scala.collection.mutable.Map - -import org.apache.spark.TaskEndReason -import org.apache.spark.executor.TaskMetrics - -/** - * Interface for getting events back from the TaskScheduler. - */ -private[spark] trait TaskSchedulerListener { - // A task has started. - def taskStarted(task: Task[_], taskInfo: TaskInfo) - - // A task has finished or failed. - def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any], - taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit - - // A node was added to the cluster. - def executorGained(execId: String, host: String): Unit - - // A node was lost from the cluster. - def executorLost(execId: String): Unit - - // The TaskScheduler wants to abort an entire task set. - def taskSetFailed(taskSet: TaskSet, reason: String): Unit -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 7a72ff0474..4ea8bf8853 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -79,7 +79,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) private val executorIdToHost = new HashMap[String, String] // Listener object to pass upcalls into - var listener: TaskSchedulerListener = null + var dagScheduler: DAGScheduler = null var backend: SchedulerBackend = null @@ -94,8 +94,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) - override def setListener(listener: TaskSchedulerListener) { - this.listener = listener + override def setDAGScheduler(dagScheduler: DAGScheduler) { + this.dagScheduler = dagScheduler } def initialize(context: SchedulerBackend) { @@ -297,7 +297,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor != None) { - listener.executorLost(failedExecutor.get) + dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } if (taskFailed) { @@ -397,9 +397,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) logError("Lost an executor " + executorId + " (already removed): " + reason) } } - // Call listener.executorLost without holding the lock on this to prevent deadlock + // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock if (failedExecutor != None) { - listener.executorLost(failedExecutor.get) + dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } } @@ -418,7 +418,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } def executorGained(execId: String, host: String) { - listener.executorGained(execId, host) + dagScheduler.executorGained(execId, host) } def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index 7bd3499300..29093e3b4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -415,11 +415,11 @@ private[spark] class ClusterTaskSetManager( } private def taskStarted(task: Task[_], info: TaskInfo) { - sched.listener.taskStarted(task, info) + sched.dagScheduler.taskStarted(task, info) } /** - * Marks the task as successful and notifies the listener that a task has ended. + * Marks the task as successful and notifies the DAGScheduler that a task has ended. */ def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = { val info = taskInfos(tid) @@ -429,7 +429,7 @@ private[spark] class ClusterTaskSetManager( if (!successful(index)) { logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format( tid, info.duration, info.host, tasksSuccessful, numTasks)) - sched.listener.taskEnded( + sched.dagScheduler.taskEnded( tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) // Mark successful and stop if all the tasks have succeeded. @@ -445,7 +445,8 @@ private[spark] class ClusterTaskSetManager( } /** - * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener. + * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the + * DAG Scheduler. */ def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) { val info = taskInfos(tid) @@ -463,7 +464,7 @@ private[spark] class ClusterTaskSetManager( reason.foreach { case fetchFailed: FetchFailed => logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress) - sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null) + sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null) successful(index) = true tasksSuccessful += 1 sched.taskSetFinished(this) @@ -472,11 +473,11 @@ private[spark] class ClusterTaskSetManager( case TaskKilled => logWarning("Task %d was killed.".format(tid)) - sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null) + sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null) return case ef: ExceptionFailure => - sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) + sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) val key = ef.description val now = clock.getTime() val (printFull, dupCount) = { @@ -504,7 +505,7 @@ private[spark] class ClusterTaskSetManager( case TaskResultLost => logWarning("Lost result for TID %s on host %s".format(tid, info.host)) - sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null) + sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null) case _ => {} } @@ -533,7 +534,7 @@ private[spark] class ClusterTaskSetManager( failed = true causeOfFailure = message // TODO: Kill running tasks if we were not terminated due to a Mesos error - sched.listener.taskSetFailed(taskSet, message) + sched.dagScheduler.taskSetFailed(taskSet, message) removeAllRunningTasks() sched.taskSetFinished(this) } @@ -606,7 +607,7 @@ private[spark] class ClusterTaskSetManager( addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. - sched.listener.taskEnded(tasks(index), Resubmitted, null, null, info, null) + sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index b445260d1b..2699f0b33e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -81,7 +81,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: val env = SparkEnv.get val attemptId = new AtomicInteger - var listener: TaskSchedulerListener = null + var dagScheduler: DAGScheduler = null // Application dependencies (added through SparkContext) that we've fetched so far on this node. // Each map holds the master's timestamp for the version of that file or JAR we got. @@ -114,8 +114,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test") } - override def setListener(listener: TaskSchedulerListener) { - this.listener = listener + override def setDAGScheduler(dagScheduler: DAGScheduler) { + this.dagScheduler = dagScheduler } override def submitTasks(taskSet: TaskSet) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala index f72e77d40f..55f8313e87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala @@ -133,7 +133,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas } def taskStarted(task: Task[_], info: TaskInfo) { - sched.listener.taskStarted(task, info) + sched.dagScheduler.taskStarted(task, info) } def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) { @@ -148,7 +148,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas } } result.metrics.resultSize = serializedData.limit() - sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics) + sched.dagScheduler.taskEnded(task, Success, result.value, result.accumUpdates, info, + result.metrics) numFinished += 1 decreaseRunningTasks(1) finished(index) = true @@ -165,7 +166,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas decreaseRunningTasks(1) val reason: ExceptionFailure = ser.deserialize[ExceptionFailure]( serializedData, getClass.getClassLoader) - sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null)) + sched.dagScheduler.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null)) if (!finished(index)) { copiesRunning(index) -= 1 numFailures(index) += 1 @@ -176,7 +177,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format( taskSet.id, index, 4, reason.description) decreaseRunningTasks(runningTasks) - sched.listener.taskSetFailed(taskSet, errorMessage) + sched.dagScheduler.taskSetFailed(taskSet, errorMessage) // need to delete failed Taskset from schedule queue sched.taskSetFinished(this) } @@ -184,7 +185,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas } override def error(message: String) { - sched.listener.taskSetFailed(taskSet, message) + sched.dagScheduler.taskSetFailed(taskSet, message) sched.taskSetFinished(this) } } From a32aa6b351064f17779adcd8e89ebc0e98fc3096 Mon Sep 17 00:00:00 2001 From: KarthikTunga Date: Wed, 16 Oct 2013 22:51:09 -0700 Subject: [PATCH 13/16] Implementing --config argument in the scripts --- bin/slaves.sh | 7 ++++--- bin/spark-daemon.sh | 10 ++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/bin/slaves.sh b/bin/slaves.sh index bcb7760130..aeb0c0f6be 100755 --- a/bin/slaves.sh +++ b/bin/slaves.sh @@ -28,7 +28,7 @@ # SPARK_SSH_OPTS Options passed to ssh when running remote commands. ## -usage="Usage: slaves.sh [--config confdir] command..." +usage="Usage: slaves.sh [--config ] command..." # if no args specified, show usage if [ $# -le 0 ]; then @@ -46,14 +46,15 @@ bin=`cd "$bin"; pwd` # spark-env.sh. Save it here. HOSTLIST=$SPARK_SLAVES -#check if conf dir passed as an argument +# Check if --config is passed as an argument. It is an optional parameter. +# Exit if the argument is a directory. if [ "$1" == "--config" ] then shift conf_dir=$1 if [ ! -d "$conf_dir" ] then - echo "$conf_dir is not a valid directory" + echo "ERROR : $conf_dir is not a directory" echo $usage exit 1 else diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh index 48d552f3db..262ef173be 100755 --- a/bin/spark-daemon.sh +++ b/bin/spark-daemon.sh @@ -29,7 +29,7 @@ # SPARK_NICENESS The scheduling priority for daemons. Defaults to 0. ## -usage="Usage: spark-daemon.sh [--config ] [--hosts hostlistfile] (start|stop) " +usage="Usage: spark-daemon.sh [--config ] (start|stop) " # if no args specified, show usage if [ $# -le 1 ]; then @@ -44,15 +44,17 @@ bin=`cd "$bin"; pwd` # get arguments -# check if conf dir passed as an argument +# Check if --config is passed as an argument. It is an optional parameter. +# Exit if the argument is a directory. + if [ "$1" == "--config" ] then shift conf_dir=$1 if [ ! -d "$conf_dir" ] then - echo "$conf_dir is not a valid directory" - echo "FOUL :"$usage + echo "ERROR : $conf_dir is not a directory" + echo $usage exit 1 else export SPARK_CONF_DIR=$conf_dir From ff4fb1f7ee3236f4a6d7fe5b677c11ba593c50c0 Mon Sep 17 00:00:00 2001 From: KarthikTunga Date: Wed, 16 Oct 2013 22:55:15 -0700 Subject: [PATCH 14/16] SPARK-627 , Implementing --config arguments in the scripts --- bin/slaves.sh | 2 +- bin/spark-daemon.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/slaves.sh b/bin/slaves.sh index aeb0c0f6be..c367c2fd8e 100755 --- a/bin/slaves.sh +++ b/bin/slaves.sh @@ -47,7 +47,7 @@ bin=`cd "$bin"; pwd` HOSTLIST=$SPARK_SLAVES # Check if --config is passed as an argument. It is an optional parameter. -# Exit if the argument is a directory. +# Exit if the argument is not a directory. if [ "$1" == "--config" ] then shift diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh index 262ef173be..a0c0d44b58 100755 --- a/bin/spark-daemon.sh +++ b/bin/spark-daemon.sh @@ -45,7 +45,7 @@ bin=`cd "$bin"; pwd` # get arguments # Check if --config is passed as an argument. It is an optional parameter. -# Exit if the argument is a directory. +# Exit if the argument is not a directory. if [ "$1" == "--config" ] then From 8537f19268bf53e5f154dedb7ba35b711dfbefbd Mon Sep 17 00:00:00 2001 From: KarthikTunga Date: Wed, 16 Oct 2013 23:00:33 -0700 Subject: [PATCH 15/16] SPARK-627 , Implementing --config arguments in the scripts --- bin/spark-daemons.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/spark-daemons.sh b/bin/spark-daemons.sh index 354eb905a1..64286cb2da 100755 --- a/bin/spark-daemons.sh +++ b/bin/spark-daemons.sh @@ -19,7 +19,7 @@ # Run a Spark command on all slave hosts. -usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..." +usage="Usage: spark-daemons.sh [--config ] [start|stop] command instance-number args..." # if no args specified, show usage if [ $# -le 1 ]; then From 809f547633cd0d7e48dffc31cd792ca2cebb85ad Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 16 Oct 2013 23:16:12 -0700 Subject: [PATCH 16/16] Fixed unit tests --- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../cluster/ClusterTaskSetManagerSuite.scala | 49 ++++++++++--------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 838179c6b5..2a2f828be6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -60,7 +60,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont taskSets += taskSet } override def cancelTasks(stageId: Int) {} - override def setListener(listener: TaskSchedulerListener) = {} + override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index 80d0c5a5e9..b97f2b19b5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -28,6 +28,30 @@ import org.apache.spark.executor.TaskMetrics import java.nio.ByteBuffer import org.apache.spark.util.{Utils, FakeClock} +class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) { + taskScheduler.startedTasks += taskInfo.index + } + + override def taskEnded( + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: mutable.Map[Long, Any], + taskInfo: TaskInfo, + taskMetrics: TaskMetrics) { + taskScheduler.endedTasks(taskInfo.index) = reason + } + + override def executorGained(execId: String, host: String) {} + + override def executorLost(execId: String) {} + + override def taskSetFailed(taskSet: TaskSet, reason: String) { + taskScheduler.taskSetsFailed += taskSet.id + } +} + /** * A mock ClusterScheduler implementation that just remembers information about tasks started and * feedback received from the TaskSetManagers. Note that it's important to initialize this with @@ -44,30 +68,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* val executors = new mutable.HashMap[String, String] ++ liveExecutors - listener = new TaskSchedulerListener { - def taskStarted(task: Task[_], taskInfo: TaskInfo) { - startedTasks += taskInfo.index - } - - def taskEnded( - task: Task[_], - reason: TaskEndReason, - result: Any, - accumUpdates: mutable.Map[Long, Any], - taskInfo: TaskInfo, - taskMetrics: TaskMetrics) - { - endedTasks(taskInfo.index) = reason - } - - def executorGained(execId: String, host: String) {} - - def executorLost(execId: String) {} - - def taskSetFailed(taskSet: TaskSet, reason: String) { - taskSetsFailed += taskSet.id - } - } + dagScheduler = new FakeDAGScheduler(this) def removeExecutor(execId: String): Unit = executors -= execId