Merge branch 'indexedrdd_graphx' of github.com:amplab/graphx into indexedrdd_graphx
This commit is contained in:
commit
8bd5f89662
|
@ -28,7 +28,7 @@
|
|||
# SPARK_SSH_OPTS Options passed to ssh when running remote commands.
|
||||
##
|
||||
|
||||
usage="Usage: slaves.sh [--config confdir] command..."
|
||||
usage="Usage: slaves.sh [--config <conf-dir>] command..."
|
||||
|
||||
# if no args specified, show usage
|
||||
if [ $# -le 0 ]; then
|
||||
|
@ -46,6 +46,23 @@ bin=`cd "$bin"; pwd`
|
|||
# spark-env.sh. Save it here.
|
||||
HOSTLIST=$SPARK_SLAVES
|
||||
|
||||
# Check if --config is passed as an argument. It is an optional parameter.
|
||||
# Exit if the argument is not a directory.
|
||||
if [ "$1" == "--config" ]
|
||||
then
|
||||
shift
|
||||
conf_dir=$1
|
||||
if [ ! -d "$conf_dir" ]
|
||||
then
|
||||
echo "ERROR : $conf_dir is not a directory"
|
||||
echo $usage
|
||||
exit 1
|
||||
else
|
||||
export SPARK_CONF_DIR=$conf_dir
|
||||
fi
|
||||
shift
|
||||
fi
|
||||
|
||||
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
|
||||
. "${SPARK_CONF_DIR}/spark-env.sh"
|
||||
fi
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
|
||||
##
|
||||
|
||||
usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"
|
||||
usage="Usage: spark-daemon.sh [--config <conf-dir>] (start|stop) <spark-command> <spark-instance-number> <args...>"
|
||||
|
||||
# if no args specified, show usage
|
||||
if [ $# -le 1 ]; then
|
||||
|
@ -43,6 +43,25 @@ bin=`cd "$bin"; pwd`
|
|||
. "$bin/spark-config.sh"
|
||||
|
||||
# get arguments
|
||||
|
||||
# Check if --config is passed as an argument. It is an optional parameter.
|
||||
# Exit if the argument is not a directory.
|
||||
|
||||
if [ "$1" == "--config" ]
|
||||
then
|
||||
shift
|
||||
conf_dir=$1
|
||||
if [ ! -d "$conf_dir" ]
|
||||
then
|
||||
echo "ERROR : $conf_dir is not a directory"
|
||||
echo $usage
|
||||
exit 1
|
||||
else
|
||||
export SPARK_CONF_DIR=$conf_dir
|
||||
fi
|
||||
shift
|
||||
fi
|
||||
|
||||
startStop=$1
|
||||
shift
|
||||
command=$1
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
# Run a Spark command on all slave hosts.
|
||||
|
||||
usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."
|
||||
usage="Usage: spark-daemons.sh [--config <conf-dir>] [start|stop] command instance-number args..."
|
||||
|
||||
# if no args specified, show usage
|
||||
if [ $# -le 1 ]; then
|
||||
|
|
|
@ -56,7 +56,7 @@ import org.apache.spark.deploy.LocalSparkCluster
|
|||
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
|
||||
import org.apache.spark.rdd._
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
|
||||
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend,
|
||||
ClusterScheduler}
|
||||
import org.apache.spark.scheduler.local.LocalScheduler
|
||||
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
|
||||
|
@ -213,7 +213,7 @@ class SparkContext(
|
|||
throw new SparkException("YARN mode not available ?", th)
|
||||
}
|
||||
}
|
||||
val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem)
|
||||
val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem)
|
||||
scheduler.initialize(backend)
|
||||
scheduler
|
||||
|
||||
|
|
|
@ -36,7 +36,10 @@ import org.apache.spark.SerializableWritable
|
|||
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
|
||||
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
|
||||
*/
|
||||
class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
|
||||
class SparkHadoopWriter(@transient jobConf: JobConf)
|
||||
extends Logging
|
||||
with SparkHadoopMapRedUtil
|
||||
with Serializable {
|
||||
|
||||
private val now = new Date()
|
||||
private val conf = new SerializableWritable(jobConf)
|
||||
|
|
|
@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
|
|||
import org.apache.spark.executor.TaskMetrics
|
||||
|
||||
class TaskContext(
|
||||
private[spark] val stageId: Int,
|
||||
val stageId: Int,
|
||||
val partitionId: Int,
|
||||
val attemptId: Long,
|
||||
val runningLocally: Boolean = false,
|
||||
|
|
|
@ -326,7 +326,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
|
|||
private var blocksInRequestBitVector = new BitSet(totalBlocks)
|
||||
|
||||
override def run() {
|
||||
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
|
||||
var threadPool = Utils.newDaemonFixedThreadPool(
|
||||
MultiTracker.MaxChatSlots, "Bit Torrent Chatter")
|
||||
|
||||
while (hasBlocks.get < totalBlocks) {
|
||||
var numThreadsToCreate = 0
|
||||
|
@ -736,7 +737,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
|
|||
private var setOfCompletedSources = Set[SourceInfo]()
|
||||
|
||||
override def run() {
|
||||
var threadPool = Utils.newDaemonCachedThreadPool()
|
||||
var threadPool = Utils.newDaemonCachedThreadPool("Bit torrent guide multiple requests")
|
||||
var serverSocket: ServerSocket = null
|
||||
|
||||
serverSocket = new ServerSocket(0)
|
||||
|
@ -927,7 +928,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
|
|||
class ServeMultipleRequests
|
||||
extends Thread with Logging {
|
||||
// Server at most MultiTracker.MaxChatSlots peers
|
||||
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
|
||||
var threadPool = Utils.newDaemonFixedThreadPool(
|
||||
MultiTracker.MaxChatSlots, "Bit torrent serve multiple requests")
|
||||
|
||||
override def run() {
|
||||
var serverSocket = new ServerSocket(0)
|
||||
|
|
|
@ -137,7 +137,7 @@ extends Logging {
|
|||
class TrackMultipleValues
|
||||
extends Thread with Logging {
|
||||
override def run() {
|
||||
var threadPool = Utils.newDaemonCachedThreadPool()
|
||||
var threadPool = Utils.newDaemonCachedThreadPool("Track multiple values")
|
||||
var serverSocket: ServerSocket = null
|
||||
|
||||
serverSocket = new ServerSocket(DriverTrackerPort)
|
||||
|
|
|
@ -291,7 +291,7 @@ extends Broadcast[T](id) with Logging with Serializable {
|
|||
private var setOfCompletedSources = Set[SourceInfo]()
|
||||
|
||||
override def run() {
|
||||
var threadPool = Utils.newDaemonCachedThreadPool()
|
||||
var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast guide multiple requests")
|
||||
var serverSocket: ServerSocket = null
|
||||
|
||||
serverSocket = new ServerSocket(0)
|
||||
|
@ -493,7 +493,7 @@ extends Broadcast[T](id) with Logging with Serializable {
|
|||
class ServeMultipleRequests
|
||||
extends Thread with Logging {
|
||||
|
||||
var threadPool = Utils.newDaemonCachedThreadPool()
|
||||
var threadPool = Utils.newDaemonCachedThreadPool("Tree broadcast serve multiple requests")
|
||||
|
||||
override def run() {
|
||||
var serverSocket = new ServerSocket(0)
|
||||
|
|
|
@ -24,11 +24,11 @@ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClie
|
|||
|
||||
import org.apache.spark.{Logging, SparkEnv}
|
||||
import org.apache.spark.TaskState.TaskState
|
||||
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
|
||||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
|
||||
import org.apache.spark.util.{Utils, AkkaUtils}
|
||||
|
||||
|
||||
private[spark] class StandaloneExecutorBackend(
|
||||
private[spark] class CoarseGrainedExecutorBackend(
|
||||
driverUrl: String,
|
||||
executorId: String,
|
||||
hostPort: String,
|
||||
|
@ -87,7 +87,7 @@ private[spark] class StandaloneExecutorBackend(
|
|||
}
|
||||
}
|
||||
|
||||
private[spark] object StandaloneExecutorBackend {
|
||||
private[spark] object CoarseGrainedExecutorBackend {
|
||||
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
|
||||
// Debug code
|
||||
Utils.checkHost(hostname)
|
||||
|
@ -99,7 +99,7 @@ private[spark] object StandaloneExecutorBackend {
|
|||
val sparkHostPort = hostname + ":" + boundPort
|
||||
System.setProperty("spark.hostPort", sparkHostPort)
|
||||
val actor = actorSystem.actorOf(
|
||||
Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
|
||||
Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
|
||||
name = "Executor")
|
||||
actorSystem.awaitTermination()
|
||||
}
|
||||
|
@ -107,7 +107,9 @@ private[spark] object StandaloneExecutorBackend {
|
|||
def main(args: Array[String]) {
|
||||
if (args.length < 4) {
|
||||
//the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
|
||||
System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> [<appid>]")
|
||||
System.err.println(
|
||||
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
|
||||
"[<appid>]")
|
||||
System.exit(1)
|
||||
}
|
||||
run(args(0), args(1), args(2), args(3).toInt)
|
|
@ -121,8 +121,7 @@ private[spark] class Executor(
|
|||
}
|
||||
|
||||
// Start worker thread pool
|
||||
val threadPool = new ThreadPoolExecutor(
|
||||
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory)
|
||||
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
|
||||
|
||||
// Maintains the list of running tasks.
|
||||
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
|
||||
|
|
|
@ -79,7 +79,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
|
|||
private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
|
||||
private val registerRequests = new SynchronizedQueue[SendingConnection]
|
||||
|
||||
implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool())
|
||||
implicit val futureExecContext = ExecutionContext.fromExecutor(
|
||||
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
|
||||
|
||||
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
|
||||
|
||||
|
|
|
@ -55,20 +55,20 @@ class DAGScheduler(
|
|||
mapOutputTracker: MapOutputTracker,
|
||||
blockManagerMaster: BlockManagerMaster,
|
||||
env: SparkEnv)
|
||||
extends TaskSchedulerListener with Logging {
|
||||
extends Logging {
|
||||
|
||||
def this(taskSched: TaskScheduler) {
|
||||
this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get)
|
||||
}
|
||||
taskSched.setListener(this)
|
||||
taskSched.setDAGScheduler(this)
|
||||
|
||||
// Called by TaskScheduler to report task's starting.
|
||||
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
|
||||
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
|
||||
eventQueue.put(BeginEvent(task, taskInfo))
|
||||
}
|
||||
|
||||
// Called by TaskScheduler to report task completions or failures.
|
||||
override def taskEnded(
|
||||
def taskEnded(
|
||||
task: Task[_],
|
||||
reason: TaskEndReason,
|
||||
result: Any,
|
||||
|
@ -79,18 +79,18 @@ class DAGScheduler(
|
|||
}
|
||||
|
||||
// Called by TaskScheduler when an executor fails.
|
||||
override def executorLost(execId: String) {
|
||||
def executorLost(execId: String) {
|
||||
eventQueue.put(ExecutorLost(execId))
|
||||
}
|
||||
|
||||
// Called by TaskScheduler when a host is added
|
||||
override def executorGained(execId: String, host: String) {
|
||||
def executorGained(execId: String, host: String) {
|
||||
eventQueue.put(ExecutorGained(execId, host))
|
||||
}
|
||||
|
||||
// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
|
||||
// cancellation of the job itself.
|
||||
override def taskSetFailed(taskSet: TaskSet, reason: String) {
|
||||
def taskSetFailed(taskSet: TaskSet, reason: String) {
|
||||
eventQueue.put(TaskSetFailed(taskSet, reason))
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
|
|||
* Each TaskScheduler schedulers task for a single SparkContext.
|
||||
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
|
||||
* and are responsible for sending the tasks to the cluster, running them, retrying if there
|
||||
* are failures, and mitigating stragglers. They return events to the DAGScheduler through
|
||||
* the TaskSchedulerListener interface.
|
||||
* are failures, and mitigating stragglers. They return events to the DAGScheduler.
|
||||
*/
|
||||
private[spark] trait TaskScheduler {
|
||||
|
||||
|
@ -48,8 +47,8 @@ private[spark] trait TaskScheduler {
|
|||
// Cancel a stage.
|
||||
def cancelTasks(stageId: Int)
|
||||
|
||||
// Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
|
||||
def setListener(listener: TaskSchedulerListener): Unit
|
||||
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
|
||||
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
|
||||
|
||||
// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
|
||||
def defaultParallelism(): Int
|
||||
|
|
|
@ -1,44 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler
|
||||
|
||||
import scala.collection.mutable.Map
|
||||
|
||||
import org.apache.spark.TaskEndReason
|
||||
import org.apache.spark.executor.TaskMetrics
|
||||
|
||||
/**
|
||||
* Interface for getting events back from the TaskScheduler.
|
||||
*/
|
||||
private[spark] trait TaskSchedulerListener {
|
||||
// A task has started.
|
||||
def taskStarted(task: Task[_], taskInfo: TaskInfo)
|
||||
|
||||
// A task has finished or failed.
|
||||
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
|
||||
taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
|
||||
|
||||
// A node was added to the cluster.
|
||||
def executorGained(execId: String, host: String): Unit
|
||||
|
||||
// A node was lost from the cluster.
|
||||
def executorLost(execId: String): Unit
|
||||
|
||||
// The TaskScheduler wants to abort an entire task set.
|
||||
def taskSetFailed(taskSet: TaskSet, reason: String): Unit
|
||||
}
|
|
@ -79,7 +79,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
private val executorIdToHost = new HashMap[String, String]
|
||||
|
||||
// Listener object to pass upcalls into
|
||||
var listener: TaskSchedulerListener = null
|
||||
var dagScheduler: DAGScheduler = null
|
||||
|
||||
var backend: SchedulerBackend = null
|
||||
|
||||
|
@ -94,8 +94,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
// This is a var so that we can reset it for testing purposes.
|
||||
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
|
||||
|
||||
override def setListener(listener: TaskSchedulerListener) {
|
||||
this.listener = listener
|
||||
override def setDAGScheduler(dagScheduler: DAGScheduler) {
|
||||
this.dagScheduler = dagScheduler
|
||||
}
|
||||
|
||||
def initialize(context: SchedulerBackend) {
|
||||
|
@ -297,7 +297,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
}
|
||||
// Update the DAGScheduler without holding a lock on this, since that can deadlock
|
||||
if (failedExecutor != None) {
|
||||
listener.executorLost(failedExecutor.get)
|
||||
dagScheduler.executorLost(failedExecutor.get)
|
||||
backend.reviveOffers()
|
||||
}
|
||||
if (taskFailed) {
|
||||
|
@ -397,9 +397,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
logError("Lost an executor " + executorId + " (already removed): " + reason)
|
||||
}
|
||||
}
|
||||
// Call listener.executorLost without holding the lock on this to prevent deadlock
|
||||
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
|
||||
if (failedExecutor != None) {
|
||||
listener.executorLost(failedExecutor.get)
|
||||
dagScheduler.executorLost(failedExecutor.get)
|
||||
backend.reviveOffers()
|
||||
}
|
||||
}
|
||||
|
@ -418,7 +418,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
|
|||
}
|
||||
|
||||
def executorGained(execId: String, host: String) {
|
||||
listener.executorGained(execId, host)
|
||||
dagScheduler.executorGained(execId, host)
|
||||
}
|
||||
|
||||
def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
|
||||
|
|
|
@ -415,11 +415,11 @@ private[spark] class ClusterTaskSetManager(
|
|||
}
|
||||
|
||||
private def taskStarted(task: Task[_], info: TaskInfo) {
|
||||
sched.listener.taskStarted(task, info)
|
||||
sched.dagScheduler.taskStarted(task, info)
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the task as successful and notifies the listener that a task has ended.
|
||||
* Marks the task as successful and notifies the DAGScheduler that a task has ended.
|
||||
*/
|
||||
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
|
||||
val info = taskInfos(tid)
|
||||
|
@ -429,7 +429,7 @@ private[spark] class ClusterTaskSetManager(
|
|||
if (!successful(index)) {
|
||||
logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
|
||||
tid, info.duration, info.host, tasksSuccessful, numTasks))
|
||||
sched.listener.taskEnded(
|
||||
sched.dagScheduler.taskEnded(
|
||||
tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
|
||||
|
||||
// Mark successful and stop if all the tasks have succeeded.
|
||||
|
@ -445,7 +445,8 @@ private[spark] class ClusterTaskSetManager(
|
|||
}
|
||||
|
||||
/**
|
||||
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener.
|
||||
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
|
||||
* DAG Scheduler.
|
||||
*/
|
||||
def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
|
||||
val info = taskInfos(tid)
|
||||
|
@ -463,7 +464,7 @@ private[spark] class ClusterTaskSetManager(
|
|||
reason.foreach {
|
||||
case fetchFailed: FetchFailed =>
|
||||
logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
|
||||
sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
|
||||
sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)
|
||||
successful(index) = true
|
||||
tasksSuccessful += 1
|
||||
sched.taskSetFinished(this)
|
||||
|
@ -472,11 +473,11 @@ private[spark] class ClusterTaskSetManager(
|
|||
|
||||
case TaskKilled =>
|
||||
logWarning("Task %d was killed.".format(tid))
|
||||
sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null)
|
||||
sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)
|
||||
return
|
||||
|
||||
case ef: ExceptionFailure =>
|
||||
sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
|
||||
sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
|
||||
val key = ef.description
|
||||
val now = clock.getTime()
|
||||
val (printFull, dupCount) = {
|
||||
|
@ -504,7 +505,7 @@ private[spark] class ClusterTaskSetManager(
|
|||
|
||||
case TaskResultLost =>
|
||||
logWarning("Lost result for TID %s on host %s".format(tid, info.host))
|
||||
sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
|
||||
sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
|
||||
|
||||
case _ => {}
|
||||
}
|
||||
|
@ -533,7 +534,7 @@ private[spark] class ClusterTaskSetManager(
|
|||
failed = true
|
||||
causeOfFailure = message
|
||||
// TODO: Kill running tasks if we were not terminated due to a Mesos error
|
||||
sched.listener.taskSetFailed(taskSet, message)
|
||||
sched.dagScheduler.taskSetFailed(taskSet, message)
|
||||
removeAllRunningTasks()
|
||||
sched.taskSetFinished(this)
|
||||
}
|
||||
|
@ -606,7 +607,7 @@ private[spark] class ClusterTaskSetManager(
|
|||
addPendingTask(index)
|
||||
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
|
||||
// stage finishes when a total of tasks.size tasks finish.
|
||||
sched.listener.taskEnded(tasks(index), Resubmitted, null, null, info, null)
|
||||
sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,28 +24,28 @@ import org.apache.spark.scheduler.TaskDescription
|
|||
import org.apache.spark.util.{Utils, SerializableBuffer}
|
||||
|
||||
|
||||
private[spark] sealed trait StandaloneClusterMessage extends Serializable
|
||||
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
|
||||
|
||||
private[spark] object StandaloneClusterMessages {
|
||||
private[spark] object CoarseGrainedClusterMessages {
|
||||
|
||||
// Driver to executors
|
||||
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
|
||||
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
|
||||
|
||||
case class KillTask(taskId: Long, executor: String) extends StandaloneClusterMessage
|
||||
case class KillTask(taskId: Long, executor: String) extends CoarseGrainedClusterMessage
|
||||
|
||||
case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
|
||||
extends StandaloneClusterMessage
|
||||
extends CoarseGrainedClusterMessage
|
||||
|
||||
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
|
||||
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
|
||||
|
||||
// Executors to driver
|
||||
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
|
||||
extends StandaloneClusterMessage {
|
||||
extends CoarseGrainedClusterMessage {
|
||||
Utils.checkHostPort(hostPort, "Expected host port")
|
||||
}
|
||||
|
||||
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
|
||||
data: SerializableBuffer) extends StandaloneClusterMessage
|
||||
data: SerializableBuffer) extends CoarseGrainedClusterMessage
|
||||
|
||||
object StatusUpdate {
|
||||
/** Alternate factory method that takes a ByteBuffer directly for the data field */
|
||||
|
@ -56,10 +56,10 @@ private[spark] object StandaloneClusterMessages {
|
|||
}
|
||||
|
||||
// Internal messages in driver
|
||||
case object ReviveOffers extends StandaloneClusterMessage
|
||||
case object ReviveOffers extends CoarseGrainedClusterMessage
|
||||
|
||||
case object StopDriver extends StandaloneClusterMessage
|
||||
case object StopDriver extends CoarseGrainedClusterMessage
|
||||
|
||||
case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
|
||||
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
|
||||
|
||||
}
|
|
@ -30,16 +30,19 @@ import akka.util.duration._
|
|||
|
||||
import org.apache.spark.{SparkException, Logging, TaskState}
|
||||
import org.apache.spark.scheduler.TaskDescription
|
||||
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
|
||||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* A standalone scheduler backend, which waits for standalone executors to connect to it through
|
||||
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
|
||||
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
|
||||
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
|
||||
* This backend holds onto each executor for the duration of the Spark job rather than relinquishing
|
||||
* executors whenever a task is done and asking the scheduler to launch a new executor for
|
||||
* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
|
||||
* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
|
||||
* (spark.deploy.*).
|
||||
*/
|
||||
private[spark]
|
||||
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
|
||||
class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
|
||||
extends SchedulerBackend with Logging
|
||||
{
|
||||
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
|
||||
|
@ -162,7 +165,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
|
|||
}
|
||||
}
|
||||
driverActor = actorSystem.actorOf(
|
||||
Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
|
||||
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
||||
}
|
||||
|
||||
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
|
||||
|
@ -202,6 +205,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
|
|||
}
|
||||
}
|
||||
|
||||
private[spark] object StandaloneSchedulerBackend {
|
||||
val ACTOR_NAME = "StandaloneScheduler"
|
||||
private[spark] object CoarseGrainedSchedulerBackend {
|
||||
val ACTOR_NAME = "CoarseGrainedScheduler"
|
||||
}
|
|
@ -28,7 +28,7 @@ private[spark] class SparkDeploySchedulerBackend(
|
|||
sc: SparkContext,
|
||||
masters: Array[String],
|
||||
appName: String)
|
||||
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
with ClientListener
|
||||
with Logging {
|
||||
|
||||
|
@ -44,10 +44,10 @@ private[spark] class SparkDeploySchedulerBackend(
|
|||
// The endpoint for executors to talk to us
|
||||
val driverUrl = "akka://spark@%s:%s/user/%s".format(
|
||||
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
|
||||
StandaloneSchedulerBackend.ACTOR_NAME)
|
||||
CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
||||
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
|
||||
val command = Command(
|
||||
"org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
|
||||
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
|
||||
val sparkHome = sc.getSparkHome().getOrElse(null)
|
||||
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
|
||||
"http://" + sc.ui.appUIAddress)
|
||||
|
|
|
@ -24,33 +24,16 @@ import org.apache.spark._
|
|||
import org.apache.spark.TaskState.TaskState
|
||||
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
|
||||
import org.apache.spark.serializer.SerializerInstance
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
|
||||
*/
|
||||
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
|
||||
extends Logging {
|
||||
private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt
|
||||
private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt
|
||||
private val getTaskResultExecutor = new ThreadPoolExecutor(
|
||||
MIN_THREADS,
|
||||
MAX_THREADS,
|
||||
0L,
|
||||
TimeUnit.SECONDS,
|
||||
new LinkedBlockingDeque[Runnable],
|
||||
new ResultResolverThreadFactory)
|
||||
|
||||
class ResultResolverThreadFactory extends ThreadFactory {
|
||||
private var counter = 0
|
||||
private var PREFIX = "Result resolver thread"
|
||||
|
||||
override def newThread(r: Runnable): Thread = {
|
||||
val thread = new Thread(r, "%s-%s".format(PREFIX, counter))
|
||||
counter += 1
|
||||
thread.setDaemon(true)
|
||||
return thread
|
||||
}
|
||||
}
|
||||
private val THREADS = System.getProperty("spark.resultGetter.threads", "4").toInt
|
||||
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
|
||||
THREADS, "Result resolver thread")
|
||||
|
||||
protected val serializer = new ThreadLocal[SerializerInstance] {
|
||||
override def initialValue(): SerializerInstance = {
|
||||
|
|
|
@ -30,13 +30,14 @@ import org.apache.mesos._
|
|||
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
|
||||
|
||||
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
|
||||
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
|
||||
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
|
||||
|
||||
/**
|
||||
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
|
||||
* onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
|
||||
* a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
|
||||
* StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
|
||||
* CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
|
||||
* latency.
|
||||
*
|
||||
* Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
|
||||
* remove this.
|
||||
|
@ -46,7 +47,7 @@ private[spark] class CoarseMesosSchedulerBackend(
|
|||
sc: SparkContext,
|
||||
master: String,
|
||||
appName: String)
|
||||
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
with MScheduler
|
||||
with Logging {
|
||||
|
||||
|
@ -122,20 +123,20 @@ private[spark] class CoarseMesosSchedulerBackend(
|
|||
val driverUrl = "akka://spark@%s:%s/user/%s".format(
|
||||
System.getProperty("spark.driver.host"),
|
||||
System.getProperty("spark.driver.port"),
|
||||
StandaloneSchedulerBackend.ACTOR_NAME)
|
||||
CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
||||
val uri = System.getProperty("spark.executor.uri")
|
||||
if (uri == null) {
|
||||
val runScript = new File(sparkHome, "spark-class").getCanonicalPath
|
||||
command.setValue(
|
||||
"\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
|
||||
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
|
||||
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
|
||||
} else {
|
||||
// Grab everything to the first '.'. We'll use that and '*' to
|
||||
// glob the directory "correctly".
|
||||
val basename = uri.split('/').last.split('.').head
|
||||
command.setValue(
|
||||
"cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
|
||||
basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
|
||||
"cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
|
||||
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
|
||||
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
|
||||
}
|
||||
return command.build()
|
||||
|
|
|
@ -81,7 +81,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
|
|||
|
||||
val env = SparkEnv.get
|
||||
val attemptId = new AtomicInteger
|
||||
var listener: TaskSchedulerListener = null
|
||||
var dagScheduler: DAGScheduler = null
|
||||
|
||||
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
|
||||
// Each map holds the master's timestamp for the version of that file or JAR we got.
|
||||
|
@ -114,8 +114,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
|
|||
localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
|
||||
}
|
||||
|
||||
override def setListener(listener: TaskSchedulerListener) {
|
||||
this.listener = listener
|
||||
override def setDAGScheduler(dagScheduler: DAGScheduler) {
|
||||
this.dagScheduler = dagScheduler
|
||||
}
|
||||
|
||||
override def submitTasks(taskSet: TaskSet) {
|
||||
|
|
|
@ -133,7 +133,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
|
|||
}
|
||||
|
||||
def taskStarted(task: Task[_], info: TaskInfo) {
|
||||
sched.listener.taskStarted(task, info)
|
||||
sched.dagScheduler.taskStarted(task, info)
|
||||
}
|
||||
|
||||
def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
|
||||
|
@ -148,7 +148,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
|
|||
}
|
||||
}
|
||||
result.metrics.resultSize = serializedData.limit()
|
||||
sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
|
||||
sched.dagScheduler.taskEnded(task, Success, result.value, result.accumUpdates, info,
|
||||
result.metrics)
|
||||
numFinished += 1
|
||||
decreaseRunningTasks(1)
|
||||
finished(index) = true
|
||||
|
@ -165,7 +166,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
|
|||
decreaseRunningTasks(1)
|
||||
val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
|
||||
serializedData, getClass.getClassLoader)
|
||||
sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
|
||||
sched.dagScheduler.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
|
||||
if (!finished(index)) {
|
||||
copiesRunning(index) -= 1
|
||||
numFailures(index) += 1
|
||||
|
@ -176,7 +177,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
|
|||
val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
|
||||
taskSet.id, index, 4, reason.description)
|
||||
decreaseRunningTasks(runningTasks)
|
||||
sched.listener.taskSetFailed(taskSet, errorMessage)
|
||||
sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
|
||||
// need to delete failed Taskset from schedule queue
|
||||
sched.taskSetFinished(this)
|
||||
}
|
||||
|
@ -184,7 +185,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
|
|||
}
|
||||
|
||||
override def error(message: String) {
|
||||
sched.listener.taskSetFailed(taskSet, message)
|
||||
sched.dagScheduler.taskSetFailed(taskSet, message)
|
||||
sched.taskSetFinished(this)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -447,14 +447,17 @@ private[spark] object Utils extends Logging {
|
|||
hostPortParseResults.get(hostPort)
|
||||
}
|
||||
|
||||
private[spark] val daemonThreadFactory: ThreadFactory =
|
||||
new ThreadFactoryBuilder().setDaemon(true).build()
|
||||
private val daemonThreadFactoryBuilder: ThreadFactoryBuilder =
|
||||
new ThreadFactoryBuilder().setDaemon(true)
|
||||
|
||||
/**
|
||||
* Wrapper over newCachedThreadPool.
|
||||
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
|
||||
* unique, sequentially assigned integer.
|
||||
*/
|
||||
def newDaemonCachedThreadPool(): ThreadPoolExecutor =
|
||||
Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
|
||||
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
|
||||
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the string to tell how long has passed in seconds. The passing parameter should be in
|
||||
|
@ -465,10 +468,13 @@ private[spark] object Utils extends Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Wrapper over newFixedThreadPool.
|
||||
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
|
||||
* unique, sequentially assigned integer.
|
||||
*/
|
||||
def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
|
||||
Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
|
||||
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
|
||||
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||
}
|
||||
|
||||
private def listFilesSafely(file: File): Seq[File] = {
|
||||
val files = file.listFiles()
|
||||
|
|
|
@ -60,7 +60,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
taskSets += taskSet
|
||||
}
|
||||
override def cancelTasks(stageId: Int) {}
|
||||
override def setListener(listener: TaskSchedulerListener) = {}
|
||||
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
|
||||
override def defaultParallelism() = 2
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,30 @@ import org.apache.spark.executor.TaskMetrics
|
|||
import java.nio.ByteBuffer
|
||||
import org.apache.spark.util.{Utils, FakeClock}
|
||||
|
||||
class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
|
||||
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
|
||||
taskScheduler.startedTasks += taskInfo.index
|
||||
}
|
||||
|
||||
override def taskEnded(
|
||||
task: Task[_],
|
||||
reason: TaskEndReason,
|
||||
result: Any,
|
||||
accumUpdates: mutable.Map[Long, Any],
|
||||
taskInfo: TaskInfo,
|
||||
taskMetrics: TaskMetrics) {
|
||||
taskScheduler.endedTasks(taskInfo.index) = reason
|
||||
}
|
||||
|
||||
override def executorGained(execId: String, host: String) {}
|
||||
|
||||
override def executorLost(execId: String) {}
|
||||
|
||||
override def taskSetFailed(taskSet: TaskSet, reason: String) {
|
||||
taskScheduler.taskSetsFailed += taskSet.id
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A mock ClusterScheduler implementation that just remembers information about tasks started and
|
||||
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
|
||||
|
@ -44,30 +68,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
|
|||
|
||||
val executors = new mutable.HashMap[String, String] ++ liveExecutors
|
||||
|
||||
listener = new TaskSchedulerListener {
|
||||
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
|
||||
startedTasks += taskInfo.index
|
||||
}
|
||||
|
||||
def taskEnded(
|
||||
task: Task[_],
|
||||
reason: TaskEndReason,
|
||||
result: Any,
|
||||
accumUpdates: mutable.Map[Long, Any],
|
||||
taskInfo: TaskInfo,
|
||||
taskMetrics: TaskMetrics)
|
||||
{
|
||||
endedTasks(taskInfo.index) = reason
|
||||
}
|
||||
|
||||
def executorGained(execId: String, host: String) {}
|
||||
|
||||
def executorLost(execId: String) {}
|
||||
|
||||
def taskSetFailed(taskSet: TaskSet, reason: String) {
|
||||
taskSetsFailed += taskSet.id
|
||||
}
|
||||
}
|
||||
dagScheduler = new FakeDAGScheduler(this)
|
||||
|
||||
def removeExecutor(execId: String): Unit = executors -= execId
|
||||
|
||||
|
|
|
@ -17,17 +17,19 @@
|
|||
|
||||
package org.apache.spark.streaming.examples.clickstream
|
||||
|
||||
import java.net.{InetAddress,ServerSocket,Socket,SocketException}
|
||||
import java.io.{InputStreamReader, BufferedReader, PrintWriter}
|
||||
import java.net.ServerSocket
|
||||
import java.io.PrintWriter
|
||||
import util.Random
|
||||
|
||||
/** Represents a page view on a website with associated dimension data.*/
|
||||
class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) {
|
||||
class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int)
|
||||
extends Serializable {
|
||||
override def toString() : String = {
|
||||
"%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
|
||||
}
|
||||
}
|
||||
object PageView {
|
||||
|
||||
object PageView extends Serializable {
|
||||
def fromString(in : String) : PageView = {
|
||||
val parts = in.split("\t")
|
||||
new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
|
||||
|
@ -39,6 +41,9 @@ object PageView {
|
|||
* This should be used in tandem with PageViewStream.scala. Example:
|
||||
* $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
|
||||
* $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
|
||||
*
|
||||
* When running this, you may want to set the root logging level to ERROR in
|
||||
* conf/log4j.properties to reduce the verbosity of the output.
|
||||
* */
|
||||
object PageViewGenerator {
|
||||
val pages = Map("http://foo.com/" -> .7,
|
||||
|
|
|
@ -317,6 +317,7 @@ object SparkBuild extends Build {
|
|||
mergeStrategy in assembly := {
|
||||
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
|
||||
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
|
||||
case "log4j.properties" => MergeStrategy.discard
|
||||
case "META-INF/services/org.apache.hadoop.fs.FileSystem" => MergeStrategy.concat
|
||||
case "reference.conf" => MergeStrategy.concat
|
||||
case _ => MergeStrategy.first
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
|
|||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.util.MetadataCleaner
|
||||
|
||||
|
||||
private[streaming]
|
||||
|
@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
|
|||
val checkpointDir = ssc.checkpointDir
|
||||
val checkpointDuration = ssc.checkpointDuration
|
||||
val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
|
||||
val delaySeconds = MetadataCleaner.getDelaySeconds
|
||||
|
||||
def validate() {
|
||||
assert(master != null, "Checkpoint.master is null")
|
||||
|
|
|
@ -100,6 +100,10 @@ class StreamingContext private (
|
|||
"both SparkContext and checkpoint as null")
|
||||
}
|
||||
|
||||
if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) {
|
||||
MetadataCleaner.setDelaySeconds(cp_.delaySeconds)
|
||||
}
|
||||
|
||||
if (MetadataCleaner.getDelaySeconds < 0) {
|
||||
throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
|
||||
+ "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.apache.spark.util.Utils
|
|||
import org.apache.spark.scheduler.SplitInfo
|
||||
import scala.collection
|
||||
import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
|
||||
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
|
||||
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
|
||||
import org.apache.hadoop.yarn.util.{RackResolver, Records}
|
||||
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
|
||||
|
@ -211,7 +211,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
|
|||
val workerId = workerIdCounter.incrementAndGet().toString
|
||||
val driverUrl = "akka://spark@%s:%s/user/%s".format(
|
||||
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
|
||||
StandaloneSchedulerBackend.ACTOR_NAME)
|
||||
CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
||||
|
||||
logInfo("launching container on " + containerId + " host " + workerHostname)
|
||||
// just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but ..
|
||||
|
|
Loading…
Reference in a new issue