Merge remote-tracking branch 'upstream/master' into implicit-als

This commit is contained in:
Nick Pentreath 2013-10-04 13:25:34 +02:00
commit 1cbdcb9cb6
73 changed files with 873 additions and 362 deletions

View file

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -56,9 +56,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
ClusterScheduler}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
@ -145,7 +145,7 @@ class SparkContext(
}
// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
private[spark] var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
@ -256,7 +256,9 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new ThreadLocal[Properties]
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}
def initLocalProperties() {
localProperties.set(new Properties())
@ -273,6 +275,9 @@ class SparkContext(
}
}
def getLocalProperty(key: String): String =
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)

View file

@ -46,6 +46,10 @@ private[spark] case class ExceptionFailure(
metrics: Option[TaskMetrics])
extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
/**
* The task finished successfully, but the result was lost from the executor's block manager before
* it was fetched.
*/
private[spark] case object TaskResultLost extends TaskEndReason
private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason

View file

@ -67,6 +67,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def map[R](f: JFunction[T, R]): JavaRDD[R] =
new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
def mapPartitionsWithIndex[R: ClassManifest](
f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
preservesPartitioning))
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/

View file

@ -17,7 +17,7 @@
package org.apache.spark.executor
import java.io.{File}
import java.io.File
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent._
@ -27,11 +27,11 @@ import scala.collection.mutable.HashMap
import org.apache.spark.scheduler._
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
/**
* The Mesos executor for Spark.
* Spark executor used with Mesos and the standalone scheduler.
*/
private[spark] class Executor(
executorId: String,
@ -167,12 +167,20 @@ private[spark] class Executor(
// we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could
// just change the relevants bytes in the byte buffer
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
if (serializedResult.limit >= (akkaFrameSize - 1024)) {
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
return
val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
logInfo("Storing result for " + taskId + " in local BlockManager")
val blockId = "taskresult_" + taskId
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(new IndirectTaskResult[Any](blockId))
} else {
logInfo("Sending result for " + taskId + " directly to driver")
serializedDirectResult
}
}
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)

View file

@ -753,24 +753,42 @@ abstract class RDD[T: ClassManifest](
}
/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*/
def take(num: Int): Array[T] = {
if (num == 0) {
return new Array[T](0)
}
val buf = new ArrayBuffer[T]
var p = 0
while (buf.size < num && p < partitions.size) {
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
if (buf.size == 0) {
numPartsToTry = totalParts - 1
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
}
}
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions
val left = num - buf.size
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
buf ++= res(0)
if (buf.size == num)
return buf.toArray
p += 1
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
}
return buf.toArray
}

View file

@ -28,7 +28,6 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
@ -553,7 +552,7 @@ class DAGScheduler(
SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
} catch {
case e: NotSerializableException =>
abortStage(stage, e.toString)
abortStage(stage, "Task not serializable: " + e.toString)
running -= stage
return
}
@ -705,6 +704,9 @@ class DAGScheduler(
case ExceptionFailure(className, description, stackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
case other =>
// Unrecognized failure - abort all jobs depending on this stage
abortStage(stageIdToStage(task.stageId), task + " failed: " + other)

View file

@ -19,7 +19,6 @@ package org.apache.spark.scheduler
import java.util.Properties
import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map
import org.apache.spark._

View file

@ -30,7 +30,6 @@ import scala.io.Source
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.TaskInfo
// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside

View file

@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import org.apache.spark.Logging
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
* An Schedulable entity that represent collection of Pools or TaskSetManagers
@ -45,7 +45,7 @@ private[spark] class Pool(
var priority = 0
var stageId = 0
var name = poolName
var parent:Schedulable = null
var parent: Pool = null
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
@ -101,14 +101,14 @@ private[spark] class Pool(
return sortedTaskSetQueue
}
override def increaseRunningTasks(taskNum: Int) {
def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
override def decreaseRunningTasks(taskNum: Int) {
def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)

View file

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import scala.collection.mutable.ArrayBuffer
/**
@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
* there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
var parent: Schedulable
var parent: Pool
// child queues
def schedulableQueue: ArrayBuffer[Schedulable]
def schedulingMode: SchedulingMode
@ -36,8 +36,6 @@ private[spark] trait Schedulable {
def stageId: Int
def name: String
def increaseRunningTasks(taskNum: Int): Unit
def decreaseRunningTasks(taskNum: Int): Unit
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable

View file

@ -15,16 +15,14 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
import java.util.Properties
import scala.xml.XML
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
import org.apache.spark.Logging
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import scala.xml.XML
/**
* An interface to build Schedulable tree
@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
if (schedulerAllocFile != null) {
val file = new File(schedulerAllocFile)
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
var is: Option[InputStream] = None
try {
is = Option {
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
} else {
throw new java.io.FileNotFoundException(
"Fair scheduler allocation file not found: " + schedulerAllocFile)
}
is.foreach { i => buildFairSchedulerPool(i) }
} finally {
is.foreach(_.close())
}
// finally create "default" pool
buildDefaultPool()
}
private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
}
}
private def buildFairSchedulerPool(is: InputStream) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: NoSuchElementException =>
logWarning("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
/**
* An interface for sort algorithm

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
/**
* "FAIR" and "FIFO" determines which policy is used

View file

@ -18,7 +18,6 @@
package org.apache.spark.scheduler
import java.util.Properties
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{Logging, SparkContext, TaskEndReason}
import org.apache.spark.executor.TaskMetrics

View file

@ -17,8 +17,8 @@
package org.apache.spark.scheduler
import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection._
import org.apache.spark.executor.TaskMetrics
case class StageInfo(

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
import java.nio.ByteBuffer
import org.apache.spark.util.SerializableBuffer

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
import org.apache.spark.util.Utils

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
private[spark] object TaskLocality

View file

@ -26,12 +26,17 @@ import java.nio.ByteBuffer
import org.apache.spark.util.Utils
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
private[spark] sealed trait TaskResult[T]
/** A reference to a DirectTaskResult that has been stored in the worker's BlockManager. */
private[spark]
class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
extends Externalizable
{
case class IndirectTaskResult[T](val blockId: String) extends TaskResult[T] with Serializable
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
def this() = this(null.asInstanceOf[T], null, null)
override def writeExternal(out: ObjectOutput) {

View file

@ -17,10 +17,11 @@
package org.apache.spark.scheduler
import org.apache.spark.scheduler.cluster.Pool
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
* Each TaskScheduler schedulers task for a single SparkContext.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
* and are responsible for sending the tasks to the cluster, running them, retrying if there
* are failures, and mitigating stragglers. They return events to the DAGScheduler through

View file

@ -17,7 +17,6 @@
package org.apache.spark.scheduler
import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map
import org.apache.spark.TaskEndReason

View file

@ -15,12 +15,11 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler
import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.TaskSet
/**
* Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
@ -45,7 +44,5 @@ private[spark] trait TaskSetManager extends Schedulable {
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription]
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
def error(message: String)
}

View file

@ -18,6 +18,9 @@
package org.apache.spark.scheduler.cluster
import java.lang.{Boolean => JBoolean}
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@ -26,10 +29,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@ -55,7 +55,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager]
// ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
val activeTaskSets = new HashMap[String, ClusterTaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskIdToExecutorId = new HashMap[Long, String]
@ -65,7 +67,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
@volatile private var hasLaunchedTask = false
private val starvationTimer = new Timer(true)
// Incrementing Mesos task IDs
// Incrementing task IDs
val nextTaskId = new AtomicLong(0)
// Which executor IDs we have executors on
@ -96,6 +98,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.scheduler.mode", "FIFO"))
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
}
@ -234,7 +239,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var taskSetToUpdate: Option[TaskSetManager] = None
var failedExecutor: Option[String] = None
var taskFailed = false
synchronized {
@ -249,9 +253,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
taskIdToTaskSetId.get(tid) match {
case Some(taskSetId) =>
if (activeTaskSets.contains(taskSetId)) {
taskSetToUpdate = Some(activeTaskSets(taskSetId))
}
if (TaskState.isFinished(state)) {
taskIdToTaskSetId.remove(tid)
if (taskSetTaskIds.contains(taskSetId)) {
@ -262,6 +263,15 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (state == TaskState.FAILED) {
taskFailed = true
}
activeTaskSets.get(taskSetId).foreach { taskSet =>
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logInfo("Ignoring update from TID " + tid + " because its task set is gone")
}
@ -269,10 +279,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the task set and DAGScheduler without holding a lock on this, since that can deadlock
if (taskSetToUpdate != None) {
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor != None) {
listener.executorLost(failedExecutor.get)
backend.reviveOffers()
@ -283,6 +290,25 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
def handleSuccessfulTask(
taskSetManager: ClusterTaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]) = synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}
def handleFailedTask(
taskSetManager: ClusterTaskSetManager,
tid: Long,
taskState: TaskState,
reason: Option[TaskEndReason]) = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
if (taskState == TaskState.FINISHED) {
// The task finished successfully but the result was lost, so we should revive offers.
backend.reviveOffers()
}
}
def error(message: String) {
synchronized {
if (activeTaskSets.size > 0) {
@ -311,6 +337,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (jarServer != null) {
jarServer.stop()
}
if (taskResultGetter != null) {
taskResultGetter.stop()
}
// sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
// TODO: Do something better !

View file

@ -25,15 +25,12 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
import scala.Some
import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState}
import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
SparkException, Success, TaskEndReason, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
import scala.Some
import org.apache.spark.FetchFailed
import org.apache.spark.ExceptionFailure
import org.apache.spark.TaskResultTooBigFailure
import org.apache.spark.util.{SystemClock, Clock}
@ -71,18 +68,20 @@ private[spark] class ClusterTaskSetManager(
val tasks = taskSet.tasks
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
val finished = new Array[Boolean](numTasks)
val successful = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
var tasksSuccessful = 0
var weight = 1
var minShare = 0
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_"+taskSet.stageId.toString
var parent: Schedulable = null
var parent: Pool = null
var runningTasks = 0
private val runningTasksSet = new HashSet[Long]
// Set of pending tasks for each executor. These collections are actually
// treated as stacks, in which new tasks are added to the end of the
@ -223,7 +222,7 @@ private[spark] class ClusterTaskSetManager(
while (!list.isEmpty) {
val index = list.last
list.trimEnd(1)
if (copiesRunning(index) == 0 && !finished(index)) {
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
}
@ -243,7 +242,7 @@ private[spark] class ClusterTaskSetManager(
private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
if (!speculatableTasks.isEmpty) {
// Check for process-local or preference-less tasks; note that tasks can be process-local
@ -344,7 +343,7 @@ private[spark] class ClusterTaskSetManager(
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
if (tasksSuccessful < numTasks && availableCpus >= CPUS_PER_TASK) {
val curTime = clock.getTime()
var allowedLocality = getAllowedLocalityLevel(curTime)
@ -375,7 +374,7 @@ private[spark] class ClusterTaskSetManager(
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = clock.getTime() - startTime
increaseRunningTasks(1)
addRunningTask(taskId)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
@ -417,94 +416,61 @@ private[spark] class ClusterTaskSetManager(
index
}
/** Called by cluster scheduler when one of our tasks changes state */
override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
taskFinished(tid, state, serializedData)
case TaskState.LOST =>
taskLost(tid, state, serializedData)
case TaskState.FAILED =>
taskLost(tid, state, serializedData)
case TaskState.KILLED =>
taskLost(tid, state, serializedData)
case _ =>
}
}
def taskStarted(task: Task[_], info: TaskInfo) {
private def taskStarted(task: Task[_], info: TaskInfo) {
sched.listener.taskStarted(task, info)
}
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
/**
* Marks the task as successful and notifies the listener that a task has ended.
*/
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
val info = taskInfos(tid)
if (info.failed) {
// We might get two task-lost messages for the same task in coarse-grained Mesos mode,
// or even from Mesos itself when acks get delayed.
return
}
val index = info.index
info.markSuccessful()
decreaseRunningTasks(1)
if (!finished(index)) {
tasksFinished += 1
removeRunningTask(tid)
if (!successful(index)) {
logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
tid, info.duration, info.host, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
try {
val result = ser.deserialize[TaskResult[_]](serializedData)
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(
tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread().getContextClassLoader
throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
case ex => throw ex
}
// Mark finished and stop if we've finished all the tasks
finished(index) = true
if (tasksFinished == numTasks) {
tid, info.duration, info.host, tasksSuccessful, numTasks))
sched.listener.taskEnded(
tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
// Mark successful and stop if all the tasks have succeeded.
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
sched.taskSetFinished(this)
}
} else {
logInfo("Ignoring task-finished event for TID " + tid +
" because task " + index + " is already finished")
logInfo("Ignorning task-finished event for TID " + tid + " because task " +
index + " has already completed successfully")
}
}
def taskLost(tid: Long, state: TaskState, serializedData: ByteBuffer) {
/**
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener.
*/
def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
val info = taskInfos(tid)
if (info.failed) {
// We might get two task-lost messages for the same task in coarse-grained Mesos mode,
// or even from Mesos itself when acks get delayed.
return
}
removeRunningTask(tid)
val index = info.index
info.markFailed()
decreaseRunningTasks(1)
if (!finished(index)) {
if (!successful(index)) {
logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
if (serializedData != null && serializedData.limit() > 0) {
val reason = ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader)
reason match {
reason.foreach {
_ match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
finished(index) = true
tasksFinished += 1
successful(index) = true
tasksSuccessful += 1
sched.taskSetFinished(this)
decreaseRunningTasks(runningTasks)
return
case taskResultTooBig: TaskResultTooBigFailure =>
logInfo("Loss was due to task %s result exceeding Akka frame size; aborting job".format(
tid))
abort("Task %s result exceeded Akka frame size".format(tid))
removeAllRunningTasks()
return
case ef: ExceptionFailure =>
@ -534,13 +500,16 @@ private[spark] class ClusterTaskSetManager(
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case TaskResultLost =>
logInfo("Lost result for TID %s on host %s".format(tid, info.host))
sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
case _ => {}
}
}
// On non-fetch failures, re-enqueue the task as pending for a max number of retries
addPendingTask(index)
// Count failed attempts only on FAILED and LOST state (not on KILLED)
if (state == TaskState.FAILED || state == TaskState.LOST) {
if (state != TaskState.KILLED) {
numFailures(index) += 1
if (numFailures(index) > MAX_TASK_FAILURES) {
logError("Task %s:%d failed more than %d times; aborting job".format(
@ -564,22 +533,36 @@ private[spark] class ClusterTaskSetManager(
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.listener.taskSetFailed(taskSet, message)
decreaseRunningTasks(runningTasks)
removeAllRunningTasks()
sched.taskSetFinished(this)
}
override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
/** If the given task ID is not in the set of running tasks, adds it.
*
* Used to keep track of the number of running tasks, for enforcing scheduling policies.
*/
def addRunningTask(tid: Long) {
if (runningTasksSet.add(tid) && parent != null) {
parent.increaseRunningTasks(1)
}
runningTasks = runningTasksSet.size
}
override def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
/** If the given task ID is in the set of running tasks, removes it. */
def removeRunningTask(tid: Long) {
if (runningTasksSet.remove(tid) && parent != null) {
parent.decreaseRunningTasks(1)
}
runningTasks = runningTasksSet.size
}
private def removeAllRunningTasks() {
val numRunningTasks = runningTasksSet.size
runningTasksSet.clear()
if (parent != null) {
parent.decreaseRunningTasks(numRunningTasks)
}
runningTasks = 0
}
override def getSchedulableByName(name: String): Schedulable = {
@ -615,10 +598,10 @@ private[spark] class ClusterTaskSetManager(
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (finished(index)) {
finished(index) = false
if (successful(index)) {
successful(index) = false
copiesRunning(index) -= 1
tasksFinished -= 1
tasksSuccessful -= 1
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
@ -628,7 +611,7 @@ private[spark] class ClusterTaskSetManager(
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
taskLost(tid, TaskState.KILLED, null)
handleFailedTask(tid, TaskState.KILLED, None)
}
}
@ -641,13 +624,13 @@ private[spark] class ClusterTaskSetManager(
*/
override def checkSpeculatableTasks(): Boolean = {
// Can't speculate if we only have one task, or if all tasks have finished.
if (numTasks == 1 || tasksFinished == numTasks) {
if (numTasks == 1 || tasksSuccessful == numTasks) {
return false
}
var foundTasks = false
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksFinished >= minFinishedForSpeculation) {
if (tasksSuccessful >= minFinishedForSpeculation) {
val time = clock.getTime()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
@ -658,7 +641,7 @@ private[spark] class ClusterTaskSetManager(
logDebug("Task length threshold for speculation: " + threshold)
for ((tid, info) <- taskInfos) {
val index = info.index
if (!finished(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
logInfo(
"Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
@ -672,7 +655,7 @@ private[spark] class ClusterTaskSetManager(
}
override def hasPendingTasks(): Boolean = {
numTasks > 0 && tasksFinished < numTasks
numTasks > 0 && tasksSuccessful < numTasks
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {

View file

@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{Utils, SerializableBuffer}

View file

@ -29,6 +29,7 @@ import akka.util.Duration
import akka.util.duration._
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.util.Utils

View file

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer
import java.util.concurrent.{LinkedBlockingDeque, ThreadFactory, ThreadPoolExecutor, TimeUnit}
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
import org.apache.spark.serializer.SerializerInstance
/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
*/
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
extends Logging {
private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt
private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt
private val getTaskResultExecutor = new ThreadPoolExecutor(
MIN_THREADS,
MAX_THREADS,
0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable],
new ResultResolverThreadFactory)
class ResultResolverThreadFactory extends ThreadFactory {
private var counter = 0
private var PREFIX = "Result resolver thread"
override def newThread(r: Runnable): Thread = {
val thread = new Thread(r, "%s-%s".format(PREFIX, counter))
counter += 1
thread.setDaemon(true)
return thread
}
}
protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
return sparkEnv.closureSerializer.newInstance()
}
}
def enqueueSuccessfulTask(
taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(new Runnable {
override def run() {
try {
val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] => directResult
case IndirectTaskResult(blockId) =>
logDebug("Fetching indirect task result for TID %s".format(tid))
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost))
return
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
sparkEnv.blockManager.master.removeBlock(blockId)
deserializedResult
}
result.metrics.resultSize = serializedData.limit()
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
case ex =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
}
})
}
def enqueueFailedTask(taskSetManager: ClusterTaskSetManager, tid: Long, taskState: TaskState,
serializedData: ByteBuffer) {
var reason: Option[TaskEndReason] = None
getTaskResultExecutor.execute(new Runnable {
override def run() {
try {
if (serializedData != null && serializedData.limit() > 0) {
reason = Some(serializer.get().deserialize[TaskEndReason](
serializedData, getClass.getClassLoader))
}
} catch {
case cnd: ClassNotFoundException =>
// Log an error but keep going here -- the task failed, so not catastropic if we can't
// deserialize the reason.
val loader = Thread.currentThread.getContextClassLoader
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
case ex => {}
}
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
})
}
def stop() {
getTaskResultExecutor.shutdownNow()
}
}

View file

@ -15,22 +15,22 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.mesos
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import com.google.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{SparkException, Logging, SparkContext}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import java.io.File
import org.apache.spark.scheduler.cluster._
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import org.apache.spark.TaskState
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds

View file

@ -15,22 +15,24 @@
* limitations under the License.
*/
package org.apache.spark.scheduler.mesos
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import com.google.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{SparkException, Logging, SparkContext}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import java.io.File
import org.apache.spark.scheduler.cluster._
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import org.apache.spark.TaskState
import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
import org.apache.spark.util.Utils
/**

View file

@ -31,8 +31,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import akka.actor._
import org.apache.spark.util.Utils
@ -92,7 +91,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var rootPool: Pool = null
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.scheduler.mode", "FIFO"))
val activeTaskSets = new HashMap[String, TaskSetManager]
val activeTaskSets = new HashMap[String, LocalTaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@ -211,7 +210,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val taskResult = new DirectTaskResult(
result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {

View file

@ -21,16 +21,16 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.{Task, TaskResult, TaskSet}
import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task,
TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager}
private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
extends TaskSetManager with Logging {
var parent: Schedulable = null
var parent: Pool = null
var weight: Int = 1
var minShare: Int = 0
var runningTasks: Int = 0
@ -49,14 +49,14 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val numFailures = new Array[Int](numTasks)
val MAX_TASK_FAILURES = sched.maxFailures
override def increaseRunningTasks(taskNum: Int): Unit = {
def increaseRunningTasks(taskNum: Int): Unit = {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
override def decreaseRunningTasks(taskNum: Int): Unit = {
def decreaseRunningTasks(taskNum: Int): Unit = {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
@ -132,7 +132,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
@ -152,7 +152,12 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val index = info.index
val task = taskSet.tasks(index)
info.markSuccessful()
val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match {
case directResult: DirectTaskResult[_] => directResult
case IndirectTaskResult(blockId) => {
throw new SparkException("Expect only DirectTaskResults when using LocalScheduler")
}
}
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
numFinished += 1

View file

@ -484,7 +484,7 @@ private[spark] class BlockManager(
for (loc <- locations) {
logDebug("Getting remote block " + blockId + " from " + loc)
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
return Some(dataDeserialize(blockId, data))
}
@ -494,6 +494,31 @@ private[spark] class BlockManager(
return None
}
/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: String): Option[ByteBuffer] = {
// TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be
// refactored.
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
logDebug("Getting remote block " + blockId + " as bytes")
val locations = master.getLocations(blockId)
for (loc <- locations) {
logDebug("Getting remote block " + blockId + " from " + loc)
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
return Some(data)
}
logDebug("The value of block " + blockId + " is null")
}
logDebug("Block " + blockId + " not found")
return None
}
/**
* Get a block from the block manager (either local or remote).
*/

View file

@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils}
private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
case class Entry(value: Any, size: Long, deserialized: Boolean)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
@volatile private var currentMemory = 0L
// Object used to ensure that only one thread is putting blocks and if necessary, dropping
// blocks from the memory store.
private val putLock = new Object()
@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def remove(blockId: String): Boolean = {
entries.synchronized {
val entry = entries.get(blockId)
val entry = entries.remove(blockId)
if (entry != null) {
entries.remove(blockId)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def clear() {
entries.synchronized {
entries.clear()
currentMemory = 0
}
logInfo("MemoryStore cleared")
}
@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += size
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
}
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))

View file

@ -21,7 +21,7 @@ import scala.util.Random
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.scheduler.cluster.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode
/**

View file

@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler
import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Executors
import org.apache.spark.ui.UIUtils

View file

@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.{NodeSeq, Node}
import org.apache.spark.scheduler.cluster.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.Page._
import org.apache.spark.ui.UIUtils._

View file

@ -21,10 +21,8 @@ import scala.Seq
import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
import org.apache.spark.{ExceptionFailure, SparkContext, Success}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.executor.TaskMetrics
import collection.mutable
import org.apache.spark.scheduler._
/**
* Tracks task-level information to be displayed in the UI.

View file

@ -32,8 +32,8 @@ import org.apache.spark.ui.JettyUtils._
import org.apache.spark.{ExceptionFailure, SparkContext, Success}
import org.apache.spark.scheduler._
import collection.mutable
import org.apache.spark.scheduler.cluster.SchedulingMode
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
/** Web UI showing progress status of all jobs in the given SparkContext. */

View file

@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.xml.Node
import org.apache.spark.scheduler.Stage
import org.apache.spark.scheduler.cluster.Schedulable
import org.apache.spark.scheduler.{Schedulable, Stage}
import org.apache.spark.ui.UIUtils
/** Table showing list of pools */

View file

@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.{ExceptionFailure}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.UIUtils._
import org.apache.spark.ui.Page._
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{ExceptionFailure}
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.TaskInfo
/** Page showing statistics and task list for a given stage */
private[spark] class StagePage(parent: JobProgressUI) {

View file

@ -22,8 +22,7 @@ import java.util.Date
import scala.xml.Node
import scala.collection.mutable.HashSet
import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import org.apache.spark.scheduler.Stage
import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils

View file

@ -319,19 +319,6 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
}
}
test("job should fail if TaskResult exceeds Akka frame size") {
// We must use local-cluster mode since results are returned differently
// when running under LocalScheduler:
sc = new SparkContext("local-cluster[1,1,512]", "test")
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
val exception = intercept[SparkException] {
rdd.reduce((x, y) => x)
}
exception.getMessage should endWith("result exceeded Akka frame size")
}
}
object DistributedSuite {

View file

@ -40,17 +40,17 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
}
def resetSparkContext() = {
if (sc != null) {
LocalSparkContext.stop(sc)
sc = null
}
LocalSparkContext.stop(sc)
sc = null
}
}
object LocalSparkContext {
def stop(sc: SparkContext) {
sc.stop()
if (sc != null) {
sc.stop()
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")

View file

@ -33,10 +33,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
}
override def afterAll() {
if (_sc != null) {
LocalSparkContext.stop(_sc)
_sc = null
}
LocalSparkContext.stop(_sc)
_sc = null
super.afterAll()
}
}

View file

@ -40,7 +40,7 @@ object ThreadingSuiteState {
}
class ThreadingSuite extends FunSuite with LocalSparkContext {
test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
fail("One or more threads didn't see runningThreads = 4")
}
}
test("set local properties in different thread") {
sc = new SparkContext("local", "test")
val sem = new Semaphore(0)
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
}
}
}
threads.foreach(_.start())
sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
}
test("set and get local properties in parent-children thread") {
sc = new SparkContext("local", "test")
sc.setLocalProperty("test", "parent")
val sem = new Semaphore(0)
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
assert(sc.getLocalProperty("test") === "parent")
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
}
}
}
threads.foreach(_.start())
sem.acquire(5)
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
}
}

View file

@ -25,7 +25,6 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.parallel.mutable
import org.apache.spark._
import org.apache.spark.rdd.CoalescedRDDPartition
class RDDSuite extends FunSuite with SharedSparkContext {
@ -321,6 +320,44 @@ class RDDSuite extends FunSuite with SharedSparkContext {
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
}
test("take") {
var nums = sc.makeRDD(Range(1, 1000), 1)
assert(nums.take(0).size === 0)
assert(nums.take(1) === Array(1))
assert(nums.take(3) === Array(1, 2, 3))
assert(nums.take(500) === (1 to 500).toArray)
assert(nums.take(501) === (1 to 501).toArray)
assert(nums.take(999) === (1 to 999).toArray)
assert(nums.take(1000) === (1 to 999).toArray)
nums = sc.makeRDD(Range(1, 1000), 2)
assert(nums.take(0).size === 0)
assert(nums.take(1) === Array(1))
assert(nums.take(3) === Array(1, 2, 3))
assert(nums.take(500) === (1 to 500).toArray)
assert(nums.take(501) === (1 to 501).toArray)
assert(nums.take(999) === (1 to 999).toArray)
assert(nums.take(1000) === (1 to 999).toArray)
nums = sc.makeRDD(Range(1, 1000), 100)
assert(nums.take(0).size === 0)
assert(nums.take(1) === Array(1))
assert(nums.take(3) === Array(1, 2, 3))
assert(nums.take(500) === (1 to 500).toArray)
assert(nums.take(501) === (1 to 501).toArray)
assert(nums.take(999) === (1 to 999).toArray)
assert(nums.take(1000) === (1 to 999).toArray)
nums = sc.makeRDD(Range(1, 1000), 1000)
assert(nums.take(0).size === 0)
assert(nums.take(1) === Array(1))
assert(nums.take(3) === Array(1, 2, 3))
assert(nums.take(500) === (1 to 500).toArray)
assert(nums.take(501) === (1 to 501).toArray)
assert(nums.take(999) === (1 to 999).toArray)
assert(nums.take(1000) === (1 to 999).toArray)
}
test("top with predefined ordering") {
val nums = Array.range(1, 100000)
val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)

View file

@ -32,9 +32,7 @@ import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
import org.apache.spark.{FetchFailed, Success, TaskEndReason}
import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster}
import org.apache.spark.scheduler.cluster.Pool
import org.apache.spark.scheduler.cluster.SchedulingMode
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler

View file

@ -43,16 +43,16 @@ class FakeTaskSetManager(
stageId = initStageId
name = "TaskSet_"+stageId
override val numTasks = initNumTasks
tasksFinished = 0
tasksSuccessful = 0
override def increaseRunningTasks(taskNum: Int) {
def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
override def decreaseRunningTasks(taskNum: Int) {
def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
@ -79,7 +79,7 @@ class FakeTaskSetManager(
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (tasksFinished + runningTasks < numTasks) {
if (tasksSuccessful + runningTasks < numTasks) {
increaseRunningTasks(1)
return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
}
@ -92,8 +92,8 @@ class FakeTaskSetManager(
def taskFinished() {
decreaseRunningTasks(1)
tasksFinished +=1
if (tasksFinished == numTasks) {
tasksSuccessful +=1
if (tasksSuccessful == numTasks) {
parent.removeSchedulable(this)
}
}
@ -114,7 +114,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val taskSetQueue = rootPool.getSortedTaskSetQueue()
/* Just for Test*/
for (manager <- taskSetQueue) {
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {

View file

@ -40,6 +40,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
val startedTasks = new ArrayBuffer[Long]
val endedTasks = new mutable.HashMap[Long, TaskEndReason]
val finishedManagers = new ArrayBuffer[TaskSetManager]
val taskSetsFailed = new ArrayBuffer[String]
val executors = new mutable.HashMap[String, String] ++ liveExecutors
@ -63,7 +64,9 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
def executorLost(execId: String) {}
def taskSetFailed(taskSet: TaskSet, reason: String) {}
def taskSetFailed(taskSet: TaskSet, reason: String) {
taskSetsFailed += taskSet.id
}
}
def removeExecutor(execId: String): Unit = executors -= execId
@ -101,7 +104,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
// Tell it the task has finished
manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
manager.handleSuccessfulTask(0, createTaskResult(0))
assert(sched.endedTasks(0) === Success)
assert(sched.finishedManagers.contains(manager))
}
@ -125,14 +128,14 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
// Finish the first two tasks
manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
manager.statusUpdate(1, TaskState.FINISHED, createTaskResult(1))
manager.handleSuccessfulTask(0, createTaskResult(0))
manager.handleSuccessfulTask(1, createTaskResult(1))
assert(sched.endedTasks(0) === Success)
assert(sched.endedTasks(1) === Success)
assert(!sched.finishedManagers.contains(manager))
// Finish the last task
manager.statusUpdate(2, TaskState.FINISHED, createTaskResult(2))
manager.handleSuccessfulTask(2, createTaskResult(2))
assert(sched.endedTasks(2) === Success)
assert(sched.finishedManagers.contains(manager))
}
@ -253,6 +256,47 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
}
test("task result lost") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
// Tell it the task has finished but the result was lost.
manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost))
assert(sched.endedTasks(0) === TaskResultLost)
// Re-offer the host -- now we should get task 0 again.
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
}
test("repeated failures lead to task set abortion") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
(0 until manager.MAX_TASK_FAILURES).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
assert(offerResult != None,
"Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0)
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
if (index < manager.MAX_TASK_FAILURES) {
assert(!sched.taskSetsFailed.contains(taskSet.id))
} else {
assert(sched.taskSetsFailed.contains(taskSet.id))
}
}
}
/**
* Utility method to create a TaskSet, potentially setting a particular sequence of preferred
* locations for each task (given as varargs) if this sequence is not empty.
@ -267,7 +311,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
new TaskSet(tasks, 0, 0, 0, null)
}
def createTaskResult(id: Int): ByteBuffer = {
ByteBuffer.wrap(Utils.serialize(new TaskResult[Int](id, mutable.Map.empty, new TaskMetrics)))
def createTaskResult(id: Int): DirectTaskResult[Int] = {
new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
}
}

View file

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
/**
* Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
*
* Used to test the case where a BlockManager evicts the task result (or dies) before the
* TaskResult is retrieved.
*/
class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
extends TaskResultGetter(sparkEnv, scheduler) {
var removedResult = false
override def enqueueSuccessfulTask(
taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
if (!removedResult) {
// Only remove the result once, since we'd like to test the case where the task eventually
// succeeds.
serializer.get().deserialize[TaskResult[_]](serializedData) match {
case IndirectTaskResult(blockId) =>
sparkEnv.blockManager.master.removeBlock(blockId)
case directResult: DirectTaskResult[_] =>
taskSetManager.abort("Internal error: expect only indirect results")
}
serializedData.rewind()
removedResult = true
}
super.enqueueSuccessfulTask(taskSetManager, tid, serializedData)
}
}
/**
* Tests related to handling task results (both direct and indirect).
*/
class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
with LocalSparkContext {
override def beforeAll {
// Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
// as we can make it) so the tests don't take too long.
System.setProperty("spark.akka.frameSize", "1")
}
before {
// Use local-cluster mode because results are returned differently when running with the
// LocalScheduler.
sc = new SparkContext("local-cluster[1,1,512]", "test")
}
override def afterAll {
System.clearProperty("spark.akka.frameSize")
}
test("handling results smaller than Akka frame size") {
val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
assert(result === 2)
}
test("handling results larger than Akka frame size") {
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
val RESULT_BLOCK_ID = "taskresult_0"
assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
"Expect result to be removed from the block manager.")
}
test("task retried if result missing from block manager") {
// If this test hangs, it's probably because no resource offers were made after the task
// failed.
val scheduler: ClusterScheduler = sc.taskScheduler match {
case clusterScheduler: ClusterScheduler =>
clusterScheduler
case _ =>
assert(false, "Expect local cluster to use ClusterScheduler")
throw new ClassCastException
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
// Make sure two tasks were run (one failed one, and a second retried one).
assert(scheduler.nextTaskId.get() === 2)
}
}

View file

@ -3,8 +3,8 @@ markdown: kramdown
# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
SPARK_VERSION: 0.8.0-SNAPSHOT
SPARK_VERSION_SHORT: 0.8.0
SPARK_VERSION: 0.9.0-incubating-SNAPSHOT
SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT
SCALA_VERSION: 2.9.3
MESOS_VERSION: 0.13.0
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net

View file

@ -50,6 +50,7 @@ The command to launch the YARN Client is as follows:
--master-memory <MEMORY_FOR_MASTER> \
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
--name <application_name> \
--queue <queue_name>
For example:

View file

@ -1,4 +1,4 @@
This folder contains a script, spark-ec2, for launching Spark clusters on
Amazon EC2. Usage instructions are available online at:
http://spark-project.org/docs/latest/ec2-scripts.html
http://spark.incubator.apache.org/docs/latest/ec2-scripts.html

View file

@ -23,6 +23,7 @@ from __future__ import with_statement
import logging
import os
import pipes
import random
import shutil
import subprocess
@ -36,6 +37,9 @@ import boto
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
from boto import ec2
class UsageError(Exception):
pass
# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
@ -103,11 +107,7 @@ def parse_args():
parser.print_help()
sys.exit(1)
(action, cluster_name) = args
if opts.identity_file == None and action in ['launch', 'login', 'start']:
print >> stderr, ("ERROR: The -i or --identity-file argument is " +
"required for " + action)
sys.exit(1)
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
home_dir = os.getenv('HOME')
@ -390,10 +390,18 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file
ssh(master, opts, 'mkdir -p ~/.ssh')
scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
print "Generating cluster's SSH key on master..."
key_setup = """
[ -f ~/.ssh/id_rsa ] ||
(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)
"""
ssh(master, opts, key_setup)
dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
print "Transferring cluster's SSH key to slaves..."
for slave in slave_nodes:
print slave.public_dns_name
ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone']
@ -535,18 +543,33 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
dest.write(text)
dest.close()
# rsync the whole directory over to the master machine
command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
"'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
subprocess.check_call(command, shell=True)
command = [
'rsync', '-rv',
'-e', stringify_command(ssh_command(opts)),
"%s/" % tmp_dir,
"%s@%s:/" % (opts.user, active_master)
]
subprocess.check_call(command)
# Remove the temp directory we created above
shutil.rmtree(tmp_dir)
# Copy a file to a given host through scp, throwing an exception if scp fails
def scp(host, opts, local_file, dest_file):
subprocess.check_call(
"scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" %
(opts.identity_file, local_file, opts.user, host, dest_file), shell=True)
def stringify_command(parts):
if isinstance(parts, str):
return parts
else:
return ' '.join(map(pipes.quote, parts))
def ssh_args(opts):
parts = ['-o', 'StrictHostKeyChecking=no']
if opts.identity_file is not None:
parts += ['-i', opts.identity_file]
return parts
def ssh_command(opts):
return ['ssh'] + ssh_args(opts)
# Run a command on a host through ssh, retrying up to two times
@ -556,18 +579,42 @@ def ssh(host, opts, command):
while True:
try:
return subprocess.check_call(
"ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
(opts.identity_file, opts.user, host, command), shell=True)
ssh_command(opts) + ['-t', '%s@%s' % (opts.user, host), stringify_command(command)])
except subprocess.CalledProcessError as e:
if (tries > 2):
raise e
print "Couldn't connect to host {0}, waiting 30 seconds".format(e)
# If this was an ssh failure, provide the user with hints.
if e.returncode == 255:
raise UsageError("Failed to SSH to remote host {0}.\nPlease check that you have provided the correct --identity-file and --key-pair parameters and try again.".format(host))
else:
raise e
print >> stderr, "Error executing remote command, retrying after 30 seconds: {0}".format(e)
time.sleep(30)
tries = tries + 1
def ssh_read(host, opts, command):
return subprocess.check_output(
ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)])
def ssh_write(host, opts, command, input):
tries = 0
while True:
proc = subprocess.Popen(
ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)],
stdin=subprocess.PIPE)
proc.stdin.write(input)
proc.stdin.close()
status = proc.wait()
if status == 0:
break
elif (tries > 2):
raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
else:
print >> stderr, "Error {0} while executing remote command, retrying after 30 seconds".format(status)
time.sleep(30)
tries = tries + 1
# Gets a list of zones to launch instances in
def get_zones(conn, opts):
@ -586,7 +633,7 @@ def get_partition(total, num_partitions, current_partitions):
return num_slaves_this_zone
def main():
def real_main():
(opts, action, cluster_name) = parse_args()
try:
conn = ec2.connect_to_region(opts.region)
@ -669,11 +716,11 @@ def main():
conn, opts, cluster_name)
master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..."
proxy_opt = ""
proxy_opt = []
if opts.proxy_port != None:
proxy_opt = "-D " + opts.proxy_port
subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s %s@%s" %
(opts.identity_file, proxy_opt, opts.user, master), shell=True)
proxy_opt = ['-D', opts.proxy_port]
subprocess.check_call(
ssh_command(opts) + proxy_opt + ['-t', "%s@%s" % (opts.user, master)])
elif action == "get-master":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
@ -715,6 +762,13 @@ def main():
sys.exit(1)
def main():
try:
real_main()
except UsageError, e:
print >> stderr, "\nError:\n", e
if __name__ == "__main__":
logging.basicConfig()
main()

View file

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -95,7 +95,7 @@ cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/jars/"
# Copy other things
mkdir "$DISTDIR"/conf
cp "$FWDIR/conf/*.template" "$DISTDIR"/conf
cp "$FWDIR"/conf/*.template "$DISTDIR"/conf
cp -r "$FWDIR/bin" "$DISTDIR"
cp -r "$FWDIR/python" "$DISTDIR"
cp "$FWDIR/spark-class" "$DISTDIR"

View file

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -25,7 +25,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
<url>http://spark.incubator.apache.org/</url>
@ -557,7 +557,6 @@
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-optimise</arg>
<arg>-deprecation</arg>
</args>
<jvmArgs>

View file

@ -79,9 +79,9 @@ object SparkBuild extends Build {
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
version := "0.8.0-SNAPSHOT",
version := "0.9.0-incubating-SNAPSHOT",
scalaVersion := "2.9.3",
scalacOptions := Seq("-unchecked", "-optimize", "-deprecation",
scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation",
"-target:" + SCALAC_JVM_VERSION),
javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
@ -97,6 +97,9 @@ object SparkBuild extends Build {
// Only allow one test at a time, even across projects, since they run in the same JVM
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
// also check the local Maven repository ~/.m2
resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))),
// Shared between both core and streaming.
resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"),

View file

@ -35,7 +35,7 @@ print """Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 0.8.0
/__ / .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT
/_/
"""
print "Using Python version %s (%s, %s)" % (

View file

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 0.8.0
/___/ .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT
/_/
""")
import Properties._

View file

@ -37,7 +37,7 @@ fi
# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable
# values for that; it doesn't need a lot
if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
# Do not overwrite SPARK_JAVA_OPTS environment variable in this script
@ -49,19 +49,19 @@ fi
# Add java opts for master, worker, executor. The opts maybe null
case "$1" in
'spark.deploy.master.Master')
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
;;
'spark.deploy.worker.Worker')
'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
;;
'spark.executor.StandaloneExecutorBackend')
'org.apache.spark.executor.StandaloneExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.executor.MesosExecutorBackend')
'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
'spark.repl.Main')
'org.apache.spark.repl.Main')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
;;
esac

View file

@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View file

@ -106,7 +106,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
logInfo("Setting up application submission context for ASM")
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
appContext.setApplicationName("Spark")
appContext.setApplicationName(args.appName)
return appContext
}
@ -224,8 +224,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add Xmx for am memory
JAVA_OPTS += "-Xmx" + amMemory + "m "
JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
@ -241,6 +241,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
}
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}

View file

@ -32,6 +32,7 @@ class ClientArguments(val args: Array[String]) {
var numWorkers = 2
var amQueue = System.getProperty("QUEUE", "default")
var amMemory: Int = 512
var appName: String = "Spark"
// TODO
var inputFormatInfo: List[InputFormatInfo] = null
@ -78,6 +79,10 @@ class ClientArguments(val args: Array[String]) {
amQueue = value
args = tail
case ("--name") :: value :: tail =>
appName = value
args = tail
case Nil =>
if (userJar == null || userClass == null) {
printUsageAndExit(1)
@ -108,6 +113,7 @@ class ClientArguments(val args: Array[String]) {
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --name NAME The name of your application (Default: Spark)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
)
System.exit(exitCode)

View file

@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same