Merge branch 'master' of https://github.com/apache/incubator-spark into indexedrdd_graphx

This commit is contained in:
Joseph E. Gonzalez 2013-10-18 12:21:19 -07:00
commit 1856b37e9d
32 changed files with 196 additions and 190 deletions

View file

@ -28,7 +28,7 @@
# SPARK_SSH_OPTS Options passed to ssh when running remote commands.
##
usage="Usage: slaves.sh [--config confdir] command..."
usage="Usage: slaves.sh [--config <conf-dir>] command..."
# if no args specified, show usage
if [ $# -le 0 ]; then
@ -46,6 +46,23 @@ bin=`cd "$bin"; pwd`
# spark-env.sh. Save it here.
HOSTLIST=$SPARK_SLAVES
# Check if --config is passed as an argument. It is an optional parameter.
# Exit if the argument is not a directory.
if [ "$1" == "--config" ]
then
shift
conf_dir=$1
if [ ! -d "$conf_dir" ]
then
echo "ERROR : $conf_dir is not a directory"
echo $usage
exit 1
else
export SPARK_CONF_DIR=$conf_dir
fi
shift
fi
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
. "${SPARK_CONF_DIR}/spark-env.sh"
fi

View file

@ -29,7 +29,7 @@
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
##
usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"
usage="Usage: spark-daemon.sh [--config <conf-dir>] (start|stop) <spark-command> <spark-instance-number> <args...>"
# if no args specified, show usage
if [ $# -le 1 ]; then
@ -43,6 +43,25 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
# get arguments
# Check if --config is passed as an argument. It is an optional parameter.
# Exit if the argument is not a directory.
if [ "$1" == "--config" ]
then
shift
conf_dir=$1
if [ ! -d "$conf_dir" ]
then
echo "ERROR : $conf_dir is not a directory"
echo $usage
exit 1
else
export SPARK_CONF_DIR=$conf_dir
fi
shift
fi
startStop=$1
shift
command=$1

View file

@ -19,7 +19,7 @@
# Run a Spark command on all slave hosts.
usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."
usage="Usage: spark-daemons.sh [--config <conf-dir>] [start|stop] command instance-number args..."
# if no args specified, show usage
if [ $# -le 1 ]; then

View file

@ -56,7 +56,7 @@ import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
@ -213,7 +213,7 @@ class SparkContext(
throw new SparkException("YARN mode not available ?", th)
}
}
val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem)
val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem)
scheduler.initialize(backend)
scheduler

View file

@ -36,7 +36,10 @@ import org.apache.spark.SerializableWritable
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
class SparkHadoopWriter(@transient jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
private val now = new Date()
private val conf = new SerializableWritable(jobConf)

View file

@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.executor.TaskMetrics
class TaskContext(
private[spark] val stageId: Int,
val stageId: Int,
val partitionId: Int,
val attemptId: Long,
val runningLocally: Boolean = false,

View file

@ -326,7 +326,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
private var blocksInRequestBitVector = new BitSet(totalBlocks)
override def run() {
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
var threadPool = Utils.newDaemonFixedThreadPool(
MultiTracker.MaxChatSlots, "Bit Torrent Chatter")
while (hasBlocks.get < totalBlocks) {
var numThreadsToCreate = 0
@ -736,7 +737,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
private var setOfCompletedSources = Set[SourceInfo]()
override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var threadPool = Utils.newDaemonCachedThreadPool("Bit torrent guide multiple requests")
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@ -927,7 +928,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
class ServeMultipleRequests
extends Thread with Logging {
// Server at most MultiTracker.MaxChatSlots peers
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
var threadPool = Utils.newDaemonFixedThreadPool(
MultiTracker.MaxChatSlots, "Bit torrent serve multiple requests")
override def run() {
var serverSocket = new ServerSocket(0)

View file

@ -137,7 +137,7 @@ extends Logging {
class TrackMultipleValues
extends Thread with Logging {
override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var threadPool = Utils.newDaemonCachedThreadPool("Track multiple values")
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(DriverTrackerPort)

View file

@ -291,7 +291,7 @@ extends Broadcast[T](id) with Logging with Serializable {
private var setOfCompletedSources = Set[SourceInfo]()
override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast guide multiple requests")
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(0)
@ -493,7 +493,7 @@ extends Broadcast[T](id) with Logging with Serializable {
class ServeMultipleRequests
extends Thread with Logging {
var threadPool = Utils.newDaemonCachedThreadPool()
var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast serve multiple requests")
override def run() {
var serverSocket = new ServerSocket(0)

View file

@ -24,11 +24,11 @@ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClie
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
private[spark] class StandaloneExecutorBackend(
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
@ -87,7 +87,7 @@ private[spark] class StandaloneExecutorBackend(
}
}
private[spark] object StandaloneExecutorBackend {
private[spark] object CoarseGrainedExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
// Debug code
Utils.checkHost(hostname)
@ -99,7 +99,7 @@ private[spark] object StandaloneExecutorBackend {
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
val actor = actorSystem.actorOf(
Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
@ -107,7 +107,9 @@ private[spark] object StandaloneExecutorBackend {
def main(args: Array[String]) {
if (args.length < 4) {
//the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> [<appid>]")
System.err.println(
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
"[<appid>]")
System.exit(1)
}
run(args(0), args(1), args(2), args(3).toInt)

View file

@ -121,8 +121,7 @@ private[spark] class Executor(
}
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory)
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

View file

@ -79,7 +79,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
private val registerRequests = new SynchronizedQueue[SendingConnection]
implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool())
implicit val futureExecContext = ExecutionContext.fromExecutor(
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null

View file

@ -55,20 +55,20 @@ class DAGScheduler(
mapOutputTracker: MapOutputTracker,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv)
extends TaskSchedulerListener with Logging {
extends Logging {
def this(taskSched: TaskScheduler) {
this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get)
}
taskSched.setListener(this)
taskSched.setDAGScheduler(this)
// Called by TaskScheduler to report task's starting.
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventQueue.put(BeginEvent(task, taskInfo))
}
// Called by TaskScheduler to report task completions or failures.
override def taskEnded(
def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
@ -79,18 +79,18 @@ class DAGScheduler(
}
// Called by TaskScheduler when an executor fails.
override def executorLost(execId: String) {
def executorLost(execId: String) {
eventQueue.put(ExecutorLost(execId))
}
// Called by TaskScheduler when a host is added
override def executorGained(execId: String, host: String) {
def executorGained(execId: String, host: String) {
eventQueue.put(ExecutorGained(execId, host))
}
// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
// cancellation of the job itself.
override def taskSetFailed(taskSet: TaskSet, reason: String) {
def taskSetFailed(taskSet: TaskSet, reason: String) {
eventQueue.put(TaskSetFailed(taskSet, reason))
}

View file

@ -24,8 +24,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
* Each TaskScheduler schedulers task for a single SparkContext.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
* and are responsible for sending the tasks to the cluster, running them, retrying if there
* are failures, and mitigating stragglers. They return events to the DAGScheduler through
* the TaskSchedulerListener interface.
* are failures, and mitigating stragglers. They return events to the DAGScheduler.
*/
private[spark] trait TaskScheduler {
@ -48,8 +47,8 @@ private[spark] trait TaskScheduler {
// Cancel a stage.
def cancelTasks(stageId: Int)
// Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
def setListener(listener: TaskSchedulerListener): Unit
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
def defaultParallelism(): Int

View file

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import scala.collection.mutable.Map
import org.apache.spark.TaskEndReason
import org.apache.spark.executor.TaskMetrics
/**
* Interface for getting events back from the TaskScheduler.
*/
private[spark] trait TaskSchedulerListener {
// A task has started.
def taskStarted(task: Task[_], taskInfo: TaskInfo)
// A task has finished or failed.
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
// A node was added to the cluster.
def executorGained(execId: String, host: String): Unit
// A node was lost from the cluster.
def executorLost(execId: String): Unit
// The TaskScheduler wants to abort an entire task set.
def taskSetFailed(taskSet: TaskSet, reason: String): Unit
}

View file

@ -79,7 +79,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
private val executorIdToHost = new HashMap[String, String]
// Listener object to pass upcalls into
var listener: TaskSchedulerListener = null
var dagScheduler: DAGScheduler = null
var backend: SchedulerBackend = null
@ -94,8 +94,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
override def setDAGScheduler(dagScheduler: DAGScheduler) {
this.dagScheduler = dagScheduler
}
def initialize(context: SchedulerBackend) {
@ -297,7 +297,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor != None) {
listener.executorLost(failedExecutor.get)
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
if (taskFailed) {
@ -397,9 +397,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
logError("Lost an executor " + executorId + " (already removed): " + reason)
}
}
// Call listener.executorLost without holding the lock on this to prevent deadlock
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
if (failedExecutor != None) {
listener.executorLost(failedExecutor.get)
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}
@ -418,7 +418,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
def executorGained(execId: String, host: String) {
listener.executorGained(execId, host)
dagScheduler.executorGained(execId, host)
}
def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {

View file

@ -415,11 +415,11 @@ private[spark] class ClusterTaskSetManager(
}
private def taskStarted(task: Task[_], info: TaskInfo) {
sched.listener.taskStarted(task, info)
sched.dagScheduler.taskStarted(task, info)
}
/**
* Marks the task as successful and notifies the listener that a task has ended.
* Marks the task as successful and notifies the DAGScheduler that a task has ended.
*/
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
val info = taskInfos(tid)
@ -429,7 +429,7 @@ private[spark] class ClusterTaskSetManager(
if (!successful(index)) {
logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
tid, info.duration, info.host, tasksSuccessful, numTasks))
sched.listener.taskEnded(
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
// Mark successful and stop if all the tasks have succeeded.
@ -445,7 +445,8 @@ private[spark] class ClusterTaskSetManager(
}
/**
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener.
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
* DAG Scheduler.
*/
def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
val info = taskInfos(tid)
@ -463,7 +464,7 @@ private[spark] class ClusterTaskSetManager(
reason.foreach {
case fetchFailed: FetchFailed =>
logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)
successful(index) = true
tasksSuccessful += 1
sched.taskSetFinished(this)
@ -472,11 +473,11 @@ private[spark] class ClusterTaskSetManager(
case TaskKilled =>
logWarning("Task %d was killed.".format(tid))
sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null)
sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)
return
case ef: ExceptionFailure =>
sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
val key = ef.description
val now = clock.getTime()
val (printFull, dupCount) = {
@ -504,7 +505,7 @@ private[spark] class ClusterTaskSetManager(
case TaskResultLost =>
logWarning("Lost result for TID %s on host %s".format(tid, info.host))
sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
case _ => {}
}
@ -533,7 +534,7 @@ private[spark] class ClusterTaskSetManager(
failed = true
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.listener.taskSetFailed(taskSet, message)
sched.dagScheduler.taskSetFailed(taskSet, message)
removeAllRunningTasks()
sched.taskSetFinished(this)
}
@ -606,7 +607,7 @@ private[spark] class ClusterTaskSetManager(
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
sched.listener.taskEnded(tasks(index), Resubmitted, null, null, info, null)
sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
}
}
}

View file

@ -24,28 +24,28 @@ import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{Utils, SerializableBuffer}
private[spark] sealed trait StandaloneClusterMessage extends Serializable
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object StandaloneClusterMessages {
private[spark] object CoarseGrainedClusterMessages {
// Driver to executors
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
case class KillTask(taskId: Long, executor: String) extends StandaloneClusterMessage
case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
extends StandaloneClusterMessage
extends CoarseGrainedClusterMessage
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
// Executors to driver
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
extends StandaloneClusterMessage {
extends CoarseGrainedClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
data: SerializableBuffer) extends StandaloneClusterMessage
data: SerializableBuffer) extends CoarseGrainedClusterMessage
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
@ -56,10 +56,10 @@ private[spark] object StandaloneClusterMessages {
}
// Internal messages in driver
case object ReviveOffers extends StandaloneClusterMessage
case object ReviveOffers extends CoarseGrainedClusterMessage
case object StopDriver extends StandaloneClusterMessage
case object StopDriver extends CoarseGrainedClusterMessage
case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
}

View file

@ -30,16 +30,19 @@ import akka.util.duration._
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.Utils
/**
* A standalone scheduler backend, which waits for standalone executors to connect to it through
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
* This backend holds onto each executor for the duration of the Spark job rather than relinquishing
* executors whenever a task is done and asking the scheduler to launch a new executor for
* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
* (spark.deploy.*).
*/
private[spark]
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
extends SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
@ -162,7 +165,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
}
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
@ -202,6 +205,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
}
private[spark] object StandaloneSchedulerBackend {
val ACTOR_NAME = "StandaloneScheduler"
private[spark] object CoarseGrainedSchedulerBackend {
val ACTOR_NAME = "CoarseGrainedScheduler"
}

View file

@ -28,7 +28,7 @@ private[spark] class SparkDeploySchedulerBackend(
sc: SparkContext,
masters: Array[String],
appName: String)
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with ClientListener
with Logging {
@ -44,10 +44,10 @@ private[spark] class SparkDeploySchedulerBackend(
// The endpoint for executors to talk to us
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command(
"org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)

View file

@ -24,33 +24,16 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.Utils
/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
*/
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
extends Logging {
private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt
private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt
private val getTaskResultExecutor = new ThreadPoolExecutor(
MIN_THREADS,
MAX_THREADS,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable],
new ResultResolverThreadFactory)
class ResultResolverThreadFactory extends ThreadFactory {
private var counter = 0
private var PREFIX = "Result resolver thread"
override def newThread(r: Runnable): Thread = {
val thread = new Thread(r, "%s-%s".format(PREFIX, counter))
counter += 1
thread.setDaemon(true)
return thread
}
}
private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
THREADS, "Result resolver thread")
protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {

View file

@ -30,13 +30,14 @@ import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
* onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
* a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
* StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
* CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
* latency.
*
* Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
* remove this.
@ -46,7 +47,7 @@ private[spark] class CoarseMesosSchedulerBackend(
sc: SparkContext,
master: String,
appName: String)
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with MScheduler
with Logging {
@ -122,20 +123,20 @@ private[spark] class CoarseMesosSchedulerBackend(
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
val runScript = new File(sparkHome, "spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
"cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
"cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
return command.build()

View file

@ -81,7 +81,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val env = SparkEnv.get
val attemptId = new AtomicInteger
var listener: TaskSchedulerListener = null
var dagScheduler: DAGScheduler = null
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
@ -114,8 +114,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
}
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
override def setDAGScheduler(dagScheduler: DAGScheduler) {
this.dagScheduler = dagScheduler
}
override def submitTasks(taskSet: TaskSet) {

View file

@ -133,7 +133,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
def taskStarted(task: Task[_], info: TaskInfo) {
sched.listener.taskStarted(task, info)
sched.dagScheduler.taskStarted(task, info)
}
def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
@ -148,7 +148,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
}
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
sched.dagScheduler.taskEnded(task, Success, result.value, result.accumUpdates, info,
result.metrics)
numFinished += 1
decreaseRunningTasks(1)
finished(index) = true
@ -165,7 +166,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
decreaseRunningTasks(1)
val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
serializedData, getClass.getClassLoader)
sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
sched.dagScheduler.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
if (!finished(index)) {
copiesRunning(index) -= 1
numFailures(index) += 1
@ -176,7 +177,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
taskSet.id, index, 4, reason.description)
decreaseRunningTasks(runningTasks)
sched.listener.taskSetFailed(taskSet, errorMessage)
sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
// need to delete failed Taskset from schedule queue
sched.taskSetFinished(this)
}
@ -184,7 +185,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def error(message: String) {
sched.listener.taskSetFailed(taskSet, message)
sched.dagScheduler.taskSetFailed(taskSet, message)
sched.taskSetFinished(this)
}
}

View file

@ -447,14 +447,17 @@ private[spark] object Utils extends Logging {
hostPortParseResults.get(hostPort)
}
private[spark] val daemonThreadFactory: ThreadFactory =
new ThreadFactoryBuilder().setDaemon(true).build()
private val daemonThreadFactoryBuilder: ThreadFactoryBuilder =
new ThreadFactoryBuilder().setDaemon(true)
/**
* Wrapper over newCachedThreadPool.
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(): ThreadPoolExecutor =
Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
/**
* Return the string to tell how long has passed in seconds. The passing parameter should be in
@ -465,10 +468,13 @@ private[spark] object Utils extends Logging {
}
/**
* Wrapper over newFixedThreadPool.
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
}
private def listFilesSafely(file: File): Seq[File] = {
val files = file.listFiles()

View file

@ -60,7 +60,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
taskSets += taskSet
}
override def cancelTasks(stageId: Int) {}
override def setListener(listener: TaskSchedulerListener) = {}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
}

View file

@ -28,6 +28,30 @@ import org.apache.spark.executor.TaskMetrics
import java.nio.ByteBuffer
import org.apache.spark.util.{Utils, FakeClock}
class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
taskScheduler.startedTasks += taskInfo.index
}
override def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: mutable.Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
taskScheduler.endedTasks(taskInfo.index) = reason
}
override def executorGained(execId: String, host: String) {}
override def executorLost(execId: String) {}
override def taskSetFailed(taskSet: TaskSet, reason: String) {
taskScheduler.taskSetsFailed += taskSet.id
}
}
/**
* A mock ClusterScheduler implementation that just remembers information about tasks started and
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
@ -44,30 +68,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
val executors = new mutable.HashMap[String, String] ++ liveExecutors
listener = new TaskSchedulerListener {
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
startedTasks += taskInfo.index
}
def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: mutable.Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics)
{
endedTasks(taskInfo.index) = reason
}
def executorGained(execId: String, host: String) {}
def executorLost(execId: String) {}
def taskSetFailed(taskSet: TaskSet, reason: String) {
taskSetsFailed += taskSet.id
}
}
dagScheduler = new FakeDAGScheduler(this)
def removeExecutor(execId: String): Unit = executors -= execId

View file

@ -17,17 +17,19 @@
package org.apache.spark.streaming.examples.clickstream
import java.net.{InetAddress,ServerSocket,Socket,SocketException}
import java.io.{InputStreamReader, BufferedReader, PrintWriter}
import java.net.ServerSocket
import java.io.PrintWriter
import util.Random
/** Represents a page view on a website with associated dimension data.*/
class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) {
class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int)
extends Serializable {
override def toString() : String = {
"%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
}
}
object PageView {
object PageView extends Serializable {
def fromString(in : String) : PageView = {
val parts = in.split("\t")
new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
@ -39,6 +41,9 @@ object PageView {
* This should be used in tandem with PageViewStream.scala. Example:
* $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
* $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
*
* When running this, you may want to set the root logging level to ERROR in
* conf/log4j.properties to reduce the verbosity of the output.
* */
object PageViewGenerator {
val pages = Map("http://foo.com/" -> .7,

View file

@ -317,6 +317,7 @@ object SparkBuild extends Build {
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "log4j.properties" => MergeStrategy.discard
case "META-INF/services/org.apache.hadoop.fs.FileSystem" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first

View file

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
private[streaming]
@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds
def validate() {
assert(master != null, "Checkpoint.master is null")

View file

@ -100,6 +100,10 @@ class StreamingContext private (
"both SparkContext and checkpoint as null")
}
if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) {
MetadataCleaner.setDelaySeconds(cp_.delaySeconds)
}
if (MetadataCleaner.getDelaySeconds < 0) {
throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
+ "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")

View file

@ -22,7 +22,7 @@ import org.apache.spark.util.Utils
import org.apache.spark.scheduler.SplitInfo
import scala.collection
import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
import org.apache.hadoop.yarn.util.{RackResolver, Records}
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
@ -211,7 +211,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val workerId = workerIdCounter.incrementAndGet().toString
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("launching container on " + containerId + " host " + workerHostname)
// just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but ..