upmerge with latest mesos/spark master and fix hbase compile with hadoop2-yarn profile

This commit is contained in:
Thomas Graves 2013-06-19 14:39:13 -05:00
commit bad51c7cb4
38 changed files with 2069 additions and 916 deletions

View file

@ -32,8 +32,8 @@
<artifactId>compress-lzf</artifactId>
</dependency>
<dependency>
<groupId>asm</groupId>
<artifactId>asm-all</artifactId>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>

View file

@ -5,8 +5,7 @@ import java.lang.reflect.Field
import scala.collection.mutable.Map
import scala.collection.mutable.Set
import org.objectweb.asm.{ClassReader, MethodVisitor, Type}
import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.objectweb.asm.Opcodes._
import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
@ -162,10 +161,10 @@ private[spark] object ClosureCleaner extends Logging {
}
}
private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor {
private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
return new EmptyVisitor {
return new MethodVisitor(ASM4) {
override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
if (op == GETFIELD) {
for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
@ -188,7 +187,7 @@ private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) exten
}
}
private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor {
private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
var myName: String = null
override def visit(version: Int, access: Int, name: String, sig: String,
@ -198,7 +197,7 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisi
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
return new EmptyVisitor {
return new MethodVisitor(ASM4) {
override def visitMethodInsn(op: Int, owner: String, name: String,
desc: String) {
val argTypes = Type.getArgumentTypes(desc)

View file

@ -10,6 +10,8 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.FileOutputFormat
import org.apache.hadoop.mapred.HadoopWriter
@ -17,7 +19,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
import org.apache.hadoop.security.UserGroupInformation
import spark.partial.BoundedDouble
@ -186,11 +188,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
@ -516,6 +520,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. Compress the result with the
* supplied codec.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
}
/**
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
@ -575,6 +589,20 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
jobCommitter.cleanupJob(jobTaskContext)
}
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. Compress with the supplied codec.
*/
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
codec: Class[_ <: CompressionCodec]) {
saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
new JobConf(self.context.hadoopConfiguration), Some(codec))
}
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
@ -584,7 +612,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration)) {
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None) {
// make sure to propogate any credentials from the current user to the jobConf
// for Hadoop security
val jobCreds = conf.getCredentials();
@ -593,6 +622,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
conf.setOutputValueClass(valueClass)
// conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
conf.set("mapred.output.format.class", outputFormatClass.getName)
for (c <- codec) {
conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
conf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)

View file

@ -7,12 +7,14 @@ import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
import spark.broadcast.Broadcast
import spark.Partitioner._
import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
@ -35,6 +37,7 @@ import spark.rdd.ZippedPartitionsRDD2
import spark.rdd.ZippedPartitionsRDD3
import spark.rdd.ZippedPartitionsRDD4
import spark.storage.StorageLevel
import spark.util.BoundedPriorityQueue
import SparkContext._
@ -352,13 +355,36 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
def pipe(command: String, env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
/**
* Return an RDD created by piping elements to a forked external process.
* The print behavior can be customized by providing two functions.
*
* @param command command to run in forked process.
* @param env environment variables to set.
* @param printPipeContext Before piping elements, this function is called as an oppotunity
* to pipe context data. Print line function (like out.println) will be
* passed as printPipeContext's parameter.
* @param printPipeContext Use this function to customize how to pipe elements. This function
* will be called with each RDD element as the 1st parameter, and the
* print line function (like out.println()) as the 2nd parameter.
* An example of pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the elements:
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
* for (e <- record._2){f(e)}
* @return the result RDD
*/
def pipe(command: Seq[String], env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
/**
* Return a new RDD by applying a function to each partition of this RDD.
@ -722,6 +748,24 @@ abstract class RDD[T: ClassManifest](
case _ => throw new UnsupportedOperationException("empty collection")
}
/**
* Returns the top K elements from this RDD as defined by
* the specified implicit Ordering[T].
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
val queue = new BoundedPriorityQueue[T](num)
queue ++= items
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray
}
/**
* Save this RDD as a text file, using string representations of elements.
*/
@ -730,6 +774,14 @@ abstract class RDD[T: ClassManifest](
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}
/**
* Save this RDD as a SequenceFile of serialized objects.
*/

View file

@ -18,6 +18,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.mapred.OutputCommitter
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.Writable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.BytesWritable
@ -62,7 +63,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
* byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
* file system.
*/
def saveAsSequenceFile(path: String) {
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
def anyToWritable[U <% Writable](u: U): Writable = u
val keyClass = getWritableClass[K]
@ -72,14 +73,18 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
self.saveAsHadoopFile(path, keyClass, valueClass, format)
self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
path, keyClass, valueClass, format, jobConf, codec)
}
}
}

View file

@ -116,8 +116,8 @@ private object Utils extends Logging {
while (dir == null) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException("Failed to create a temp directory after " + maxAttempts +
" attempts!")
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
maxAttempts + " attempts!")
}
try {
dir = new File(root, "spark-" + UUID.randomUUID.toString)

View file

@ -6,6 +6,7 @@ import java.util.Comparator
import scala.Tuple2
import scala.collection.JavaConversions._
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
@ -459,6 +460,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
}
/** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
codec: Class[_ <: CompressionCodec]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
}
/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,

View file

@ -86,7 +86,6 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
}
object JavaRDD {

View file

@ -1,9 +1,10 @@
package spark.api.java
import java.util.{List => JList}
import java.util.{List => JList, Comparator}
import scala.Tuple2
import scala.collection.JavaConversions._
import org.apache.hadoop.io.compress.CompressionCodec
import spark.{SparkContext, Partition, RDD, TaskContext}
import spark.api.java.JavaPairRDD._
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
@ -310,6 +311,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
rdd.saveAsTextFile(path, codec)
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
@ -351,4 +359,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def toDebugString(): String = {
rdd.toDebugString
}
/**
* Returns the top K elements from this RDD as defined by
* the specified Comparator[T].
* @param num the number of top elements to return
* @param comp the comparator that defines the order
* @return an array of top elements
*/
def top(num: Int, comp: Comparator[T]): JList[T] = {
import scala.collection.JavaConversions._
val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp))
val arr: java.util.Collection[T] = topElems.toSeq
new java.util.ArrayList(arr)
}
/**
* Returns the top K elements from this RDD using the
* natural ordering for T.
* @param num the number of top elements to return
* @return an array of top elements
*/
def top(num: Int): JList[T] = {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
top(num, comp)
}
}

View file

@ -6,7 +6,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
import spark.{Aggregator, Logging, Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@ -49,12 +49,16 @@ private[spark] class CoGroupAggregator
*
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
* @param mapSideCombine flag indicating whether to merge values before shuffle step.
* @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
* is on, Spark does an extra pass over the data on the map side to merge
* all values belonging to the same key together. This can reduce the amount
* of data shuffled if and only if the number of distinct keys is very small,
* and the ratio of key size to value size is also very small.
*/
class CoGroupedRDD[K](
@transient var rdds: Seq[RDD[(K, _)]],
part: Partitioner,
val mapSideCombine: Boolean = true,
val mapSideCombine: Boolean = false,
val serializerClass: String = null)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {

View file

@ -9,6 +9,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import spark.{RDD, SparkEnv, Partition, TaskContext}
import spark.broadcast.Broadcast
/**
@ -18,14 +19,21 @@ import spark.{RDD, SparkEnv, Partition, TaskContext}
class PipedRDD[T: ClassManifest](
prev: RDD[T],
command: Seq[String],
envVars: Map[String, String])
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit)
extends RDD[String](prev) {
def this(prev: RDD[T], command: Seq[String]) = this(prev, command, Map())
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command))
def this(
prev: RDD[T],
command: String,
envVars: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
override def getPartitions: Array[Partition] = firstParent[T].partitions
@ -52,8 +60,17 @@ class PipedRDD[T: ClassManifest](
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
// input the pipe context firstly
if ( printPipeContext != null) {
printPipeContext(out.println(_))
}
for (elem <- firstParent[T].iterator(split, context)) {
out.println(elem)
if (printRDDElement != null) {
printRDDElement(elem, out.println(_))
} else {
out.println(elem)
}
}
out.close()
}

View file

@ -53,14 +53,10 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y))
// Remove exact match and then do host local match.
val otherNodePreferredLocations = rddSplitZip.map(x => {
x._1.preferredLocations(x._2).map(hostPort => {
val host = Utils.parseHostPort(hostPort)._1
if (exactMatchLocations.contains(host)) null else host
}).filter(_ != null)
})
val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y))
val exactMatchHosts = exactMatchLocations.map(Utils.parseHostPort(_)._1)
val matchPreferredHosts = exactMatchPreferredLocations.map(locs => locs.map(Utils.parseHostPort(_)._1))
.reduce((x, y) => x.intersect(y))
val otherNodeLocalLocations = matchPreferredHosts.filter { s => !exactMatchHosts.contains(s) }
otherNodeLocalLocations ++ exactMatchLocations
}

View file

@ -177,7 +177,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = new TaskSetManager(this, taskSet)
val manager = new ClusterTaskSetManager(this, taskSet)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
taskSetTaskIds(taskSet.id) = new HashSet[Long]()

View file

@ -0,0 +1,747 @@
package spark.scheduler.cluster
import java.util.{HashMap => JHashMap, NoSuchElementException, Arrays}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
import spark._
import spark.scheduler._
import spark.TaskState.TaskState
import java.nio.ByteBuffer
private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
// process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
// Must not be the constraint.
assert (constraint != TaskLocality.PROCESS_LOCAL)
constraint match {
case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL
case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
// For anything else, allow
case _ => true
}
}
def parse(str: String): TaskLocality = {
// better way to do this ?
try {
val retval = TaskLocality.withName(str)
// Must not specify PROCESS_LOCAL !
assert (retval != TaskLocality.PROCESS_LOCAL)
retval
} catch {
case nEx: NoSuchElementException => {
logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL");
// default to preserve earlier behavior
NODE_LOCAL
}
}
}
}
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
private[spark] class ClusterTaskSetManager(
sched: ClusterScheduler,
val taskSet: TaskSet)
extends TaskSetManager
with Logging {
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
// CPUs to request per task
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val ser = SparkEnv.get.closureSerializer.newInstance()
val tasks = taskSet.tasks
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
val finished = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
var weight = 1
var minShare = 0
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_"+taskSet.stageId.toString
var parent:Schedulable = null
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
// List of pending tasks for each node (process local to container). These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
// back at the head of the stack. They are also only cleaned up lazily;
// when a task is launched, it remains in all the pending lists except
// the one that it was launched from, but gets removed from them later.
private val pendingTasksForHostPort = new HashMap[String, ArrayBuffer[Int]]
// List of pending tasks for each node.
// Essentially, similar to pendingTasksForHostPort, except at host level
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
// List of pending tasks for each node based on rack locality.
// Essentially, similar to pendingTasksForHost, except at rack level
private val pendingRackLocalTasksForHost = new HashMap[String, ArrayBuffer[Int]]
// List containing pending tasks with no locality preferences
val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
// List containing all pending tasks (also used as a stack, as above)
val allPendingTasks = new ArrayBuffer[Int]
// Tasks that can be speculated. Since these will be a small fraction of total
// tasks, we'll just hold them in a HashSet.
val speculatableTasks = new HashSet[Int]
// Task index, start and finish time for each task attempt (indexed by task ID)
val taskInfos = new HashMap[Long, TaskInfo]
// Did the job fail?
var failed = false
var causeOfFailure = ""
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and
// top stack frame) to duplicate count (how many times the same
// exception has appeared) and time the full exception was
// printed. This should ideally be an LRU map that can drop old
// exceptions automatically.
val recentExceptions = HashMap[String, (Int, Long)]()
// Figure out the current map output tracker generation and set it on all tasks
val generation = sched.mapOutputTracker.getGeneration
logDebug("Generation for " + taskSet.id + ": " + generation)
for (t <- tasks) {
t.generation = generation
}
// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
// Note that it follows the hierarchy.
// if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
// if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
if (TaskLocality.PROCESS_LOCAL == taskLocality) {
// straight forward comparison ! Special case it.
val retval = new HashSet[String]()
scheduler.synchronized {
for (location <- _taskPreferredLocations) {
if (scheduler.isExecutorAliveOnHostPort(location)) {
retval += location
}
}
}
return retval
}
val taskPreferredLocations =
if (TaskLocality.NODE_LOCAL == taskLocality) {
_taskPreferredLocations
} else {
assert (TaskLocality.RACK_LOCAL == taskLocality)
// Expand set to include all 'seen' rack local hosts.
// This works since container allocation/management happens within master - so any rack locality information is updated in msater.
// Best case effort, and maybe sort of kludge for now ... rework it later ?
val hosts = new HashSet[String]
_taskPreferredLocations.foreach(h => {
val rackOpt = scheduler.getRackForHost(h)
if (rackOpt.isDefined) {
val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
if (hostsOpt.isDefined) {
hosts ++= hostsOpt.get
}
}
// Ensure that irrespective of what scheduler says, host is always added !
hosts += h
})
hosts
}
val retval = new HashSet[String]
scheduler.synchronized {
for (prefLocation <- taskPreferredLocations) {
val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1)
if (aliveLocationsOpt.isDefined) {
retval ++= aliveLocationsOpt.get
}
}
}
retval
}
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
// We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
// hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL)
val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
if (rackLocalLocations.size == 0) {
// Current impl ensures this.
assert (processLocalLocations.size == 0)
assert (hostLocalLocations.size == 0)
pendingTasksWithNoPrefs += index
} else {
// process local locality
for (hostPort <- processLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer())
hostPortList += index
}
// host locality (includes process local)
for (hostPort <- hostLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
val host = Utils.parseHostPort(hostPort)._1
val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
hostList += index
}
// rack locality (includes process local and host local)
for (rackLocalHostPort <- rackLocalLocations) {
// DEBUG Code
Utils.checkHostPort(rackLocalHostPort)
val rackLocalHost = Utils.parseHostPort(rackLocalHostPort)._1
val list = pendingRackLocalTasksForHost.getOrElseUpdate(rackLocalHost, ArrayBuffer())
list += index
}
}
allPendingTasks += index
}
// Return the pending tasks list for a given host port (process local), or an empty list if
// there is no map entry for that host
private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = {
// DEBUG Code
Utils.checkHostPort(hostPort)
pendingTasksForHostPort.getOrElse(hostPort, ArrayBuffer())
}
// Return the pending tasks list for a given host, or an empty list if
// there is no map entry for that host
private def getPendingTasksForHost(hostPort: String): ArrayBuffer[Int] = {
val host = Utils.parseHostPort(hostPort)._1
pendingTasksForHost.getOrElse(host, ArrayBuffer())
}
// Return the pending tasks (rack level) list for a given host, or an empty list if
// there is no map entry for that host
private def getRackLocalPendingTasksForHost(hostPort: String): ArrayBuffer[Int] = {
val host = Utils.parseHostPort(hostPort)._1
pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
}
// Number of pending tasks for a given host Port (which would be process local)
def numPendingTasksForHostPort(hostPort: String): Int = {
getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
// Number of pending tasks for a given host (which would be data local)
def numPendingTasksForHost(hostPort: String): Int = {
getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
// Number of pending rack local tasks for a given host
def numRackLocalPendingTasksForHost(hostPort: String): Int = {
getRackLocalPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
// Dequeue a pending task from the given list and return its index.
// Return None if the list is empty.
// This method also cleans up any tasks in the list that have already
// been launched, since we want that to happen lazily.
private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
while (!list.isEmpty) {
val index = list.last
list.trimEnd(1)
if (copiesRunning(index) == 0 && !finished(index)) {
return Some(index)
}
}
return None
}
// Return a speculative task for a given host if any are available. The task should not have an
// attempt running on this host, in case the host is slow. In addition, if locality is set, the
// task must have a preference for this host/rack/no preferred locations at all.
private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL))
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
if (speculatableTasks.size > 0) {
val localTask = speculatableTasks.find {
index =>
val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort)
(locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
}
if (localTask != None) {
speculatableTasks -= localTask.get
return localTask
}
// check for rack locality
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
val rackTask = speculatableTasks.find {
index =>
val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort)
locations.contains(hostPort) && !attemptLocs.contains(hostPort)
}
if (rackTask != None) {
speculatableTasks -= rackTask.get
return rackTask
}
}
// Any task ...
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
// Check for attemptLocs also ?
val nonLocalTask = speculatableTasks.find(i => !taskAttempts(i).map(_.hostPort).contains(hostPort))
if (nonLocalTask != None) {
speculatableTasks -= nonLocalTask.get
return nonLocalTask
}
}
}
return None
}
// Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well.
private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
if (processLocalTask != None) {
return processLocalTask
}
val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
if (localTask != None) {
return localTask
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
val rackLocalTask = findTaskFromList(getRackLocalPendingTasksForHost(hostPort))
if (rackLocalTask != None) {
return rackLocalTask
}
}
// Look for no pref tasks AFTER rack local tasks - this has side effect that we will get to failed tasks later rather than sooner.
// TODO: That code path needs to be revisited (adding to no prefs list when host:port goes down).
val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs)
if (noPrefTask != None) {
return noPrefTask
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
val nonLocalTask = findTaskFromList(allPendingTasks)
if (nonLocalTask != None) {
return nonLocalTask
}
}
// Finally, if all else has failed, find a speculative task
return findSpeculativeTask(hostPort, locality)
}
private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = {
Utils.checkHostPort(hostPort)
val locs = task.preferredLocations
locs.contains(hostPort)
}
private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = {
val locs = task.preferredLocations
// If no preference, consider it as host local
if (locs.isEmpty) return true
val host = Utils.parseHostPort(hostPort)._1
locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
}
// Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location).
// This is true if either the task has preferred locations and this host is one, or it has
// no preferred locations (in which we still count the launch as preferred).
private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
val locs = task.preferredLocations
val preferredRacks = new HashSet[String]()
for (preferredHost <- locs) {
val rack = sched.getRackForHost(preferredHost)
if (None != rack) preferredRacks += rack.get
}
if (preferredRacks.isEmpty) return false
val hostRack = sched.getRackForHost(hostPort)
return None != hostRack && preferredRacks.contains(hostRack.get)
}
// Respond to an offer of a single slave from the scheduler by finding a task
def slaveOffer(execId: String, hostPort: String, availableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
// If explicitly specified, use that
val locality = if (overrideLocality != null) overrideLocality else {
// expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
val time = System.currentTimeMillis
if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY
}
findTask(hostPort, locality) match {
case Some(index) => {
// Found a task; do some bookkeeping and return a Mesos task for it
val task = tasks(index)
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
val taskLocality =
if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL else
if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL else
if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else
TaskLocality.ANY
val prefStr = taskLocality.toString
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
taskSet.id, index, taskId, execId, hostPort, prefStr))
// Do various bookkeeping
copiesRunning(index) += 1
val time = System.currentTimeMillis
val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
if (TaskLocality.NODE_LOCAL == taskLocality) {
lastPreferredLaunchTime = time
}
// Serialize and return the task
val startTime = System.currentTimeMillis
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime
increaseRunningTasks(1)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
}
}
return None
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
state match {
case TaskState.FINISHED =>
taskFinished(tid, state, serializedData)
case TaskState.LOST =>
taskLost(tid, state, serializedData)
case TaskState.FAILED =>
taskLost(tid, state, serializedData)
case TaskState.KILLED =>
taskLost(tid, state, serializedData)
case _ =>
}
}
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
// We might get two task-lost messages for the same task in coarse-grained Mesos mode,
// or even from Mesos itself when acks get delayed.
return
}
val index = info.index
info.markSuccessful()
decreaseRunningTasks(1)
if (!finished(index)) {
tasksFinished += 1
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
tid, info.duration, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
try {
val result = ser.deserialize[TaskResult[_]](serializedData)
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread().getContextClassLoader
throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
case ex => throw ex
}
// Mark finished and stop if we've finished all the tasks
finished(index) = true
if (tasksFinished == numTasks) {
sched.taskSetFinished(this)
}
} else {
logInfo("Ignoring task-finished event for TID " + tid +
" because task " + index + " is already finished")
}
}
def taskLost(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
// We might get two task-lost messages for the same task in coarse-grained Mesos mode,
// or even from Mesos itself when acks get delayed.
return
}
val index = info.index
info.markFailed()
decreaseRunningTasks(1)
if (!finished(index)) {
logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
if (serializedData != null && serializedData.limit() > 0) {
val reason = ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader)
reason match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
finished(index) = true
tasksFinished += 1
sched.taskSetFinished(this)
decreaseRunningTasks(runningTasks)
return
case taskResultTooBig: TaskResultTooBigFailure =>
logInfo("Loss was due to task %s result exceeding Akka frame size; " +
"aborting job".format(tid))
abort("Task %s result exceeded Akka frame size".format(tid))
return
case ef: ExceptionFailure =>
val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
recentExceptions(key) = (0, now)
(true, 0)
} else {
recentExceptions(key) = (dupCount + 1, printTime)
(false, dupCount + 1)
}
} else {
recentExceptions(key) = (0, now)
(true, 0)
}
}
if (printFull) {
val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case _ => {}
}
}
// On non-fetch failures, re-enqueue the task as pending for a max number of retries
addPendingTask(index)
// Count failed attempts only on FAILED and LOST state (not on KILLED)
if (state == TaskState.FAILED || state == TaskState.LOST) {
numFailures(index) += 1
if (numFailures(index) > MAX_TASK_FAILURES) {
logError("Task %s:%d failed more than %d times; aborting job".format(
taskSet.id, index, MAX_TASK_FAILURES))
abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
}
}
} else {
logInfo("Ignoring task-lost event for TID " + tid +
" because task " + index + " is already finished")
}
}
def error(message: String) {
// Save the error message
abort("Error: " + message)
}
def abort(message: String) {
failed = true
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.listener.taskSetFailed(taskSet, message)
decreaseRunningTasks(runningTasks)
sched.taskSetFinished(this)
}
override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
override def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
//TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
override def addSchedulable(schedulable:Schedulable) {
//nothing
}
override def removeSchedulable(schedulable:Schedulable) {
//nothing
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
sortedTaskSetQueue += this
return sortedTaskSetQueue
}
override def executorLost(execId: String, hostPort: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// If some task has preferred locations only on hostname, and there are no more executors there,
// put it in the no-prefs list to avoid the wait from delay scheduling
// host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
// no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
// Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if
// there is no host local node for the task (not if there is no process local node for the task)
for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
// val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
if (newLocs.isEmpty) {
pendingTasksWithNoPrefs += index
}
}
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (finished(index)) {
finished(index) = false
copiesRunning(index) -= 1
tasksFinished -= 1
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
sched.listener.taskEnded(tasks(index), Resubmitted, null, null, info, null)
}
}
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
taskLost(tid, TaskState.KILLED, null)
}
}
/**
* Check for tasks to be speculated and return true if there are any. This is called periodically
* by the ClusterScheduler.
*
* TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
* we don't scan the whole task set. It might also help to make this sorted by launch time.
*/
override def checkSpeculatableTasks(): Boolean = {
// Can't speculate if we only have one task, or if all tasks have finished.
if (numTasks == 1 || tasksFinished == numTasks) {
return false
}
var foundTasks = false
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksFinished >= minFinishedForSpeculation) {
val time = System.currentTimeMillis()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1))
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for ((tid, info) <- taskInfos) {
val index = info.index
if (!finished(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
logInfo(
"Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
taskSet.id, index, info.hostPort, threshold))
speculatableTasks += index
foundTasks = true
}
}
}
return foundTasks
}
override def hasPendingTasks(): Boolean = {
numTasks > 0 && tasksFinished < numTasks
}
}

View file

@ -13,11 +13,11 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = Math.signum(priority1 - priority2)
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = Math.signum(stageId1 - stageId2)
res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
return true
@ -35,22 +35,30 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2, 1.0).toDouble
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true
var compare:Int = 0
if (s1Needy && !s2Needy) {
res = true
return true
} else if (!s1Needy && s2Needy) {
res = false
return false
} else if (s1Needy && s2Needy) {
res = minShareRatio1 <= minShareRatio2
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
res = taskToWeightRatio1 <= taskToWeightRatio2
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
return true
} else if (compare > 0) {
return false
} else {
return s1.name < s2.name
}
return res
}
}

View file

@ -1,747 +1,17 @@
package spark.scheduler.cluster
import java.util.{HashMap => JHashMap, NoSuchElementException, Arrays}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
import spark._
import spark.scheduler._
import spark.TaskState.TaskState
import java.nio.ByteBuffer
private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
// process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
// Must not be the constraint.
assert (constraint != TaskLocality.PROCESS_LOCAL)
constraint match {
case TaskLocality.NODE_LOCAL => condition == TaskLocality.NODE_LOCAL
case TaskLocality.RACK_LOCAL => condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
// For anything else, allow
case _ => true
}
}
def parse(str: String): TaskLocality = {
// better way to do this ?
try {
val retval = TaskLocality.withName(str)
// Must not specify PROCESS_LOCAL !
assert (retval != TaskLocality.PROCESS_LOCAL)
retval
} catch {
case nEx: NoSuchElementException => {
logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL");
// default to preserve earlier behavior
NODE_LOCAL
}
}
}
}
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
private[spark] class TaskSetManager(
sched: ClusterScheduler,
val taskSet: TaskSet)
extends Schedulable
with Logging {
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
// CPUs to request per task
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val ser = SparkEnv.get.closureSerializer.newInstance()
val tasks = taskSet.tasks
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
val finished = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
var weight = 1
var minShare = 0
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_"+taskSet.stageId.toString
var parent:Schedulable = null
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
// List of pending tasks for each node (process local to container). These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
// back at the head of the stack. They are also only cleaned up lazily;
// when a task is launched, it remains in all the pending lists except
// the one that it was launched from, but gets removed from them later.
private val pendingTasksForHostPort = new HashMap[String, ArrayBuffer[Int]]
// List of pending tasks for each node.
// Essentially, similar to pendingTasksForHostPort, except at host level
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
// List of pending tasks for each node based on rack locality.
// Essentially, similar to pendingTasksForHost, except at rack level
private val pendingRackLocalTasksForHost = new HashMap[String, ArrayBuffer[Int]]
// List containing pending tasks with no locality preferences
val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
// List containing all pending tasks (also used as a stack, as above)
val allPendingTasks = new ArrayBuffer[Int]
// Tasks that can be speculated. Since these will be a small fraction of total
// tasks, we'll just hold them in a HashSet.
val speculatableTasks = new HashSet[Int]
// Task index, start and finish time for each task attempt (indexed by task ID)
val taskInfos = new HashMap[Long, TaskInfo]
// Did the job fail?
var failed = false
var causeOfFailure = ""
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and
// top stack frame) to duplicate count (how many times the same
// exception has appeared) and time the full exception was
// printed. This should ideally be an LRU map that can drop old
// exceptions automatically.
val recentExceptions = HashMap[String, (Int, Long)]()
// Figure out the current map output tracker generation and set it on all tasks
val generation = sched.mapOutputTracker.getGeneration
logDebug("Generation for " + taskSet.id + ": " + generation)
for (t <- tasks) {
t.generation = generation
}
// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
// Note that it follows the hierarchy.
// if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
// if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
private def findPreferredLocations(_taskPreferredLocations: Seq[String], scheduler: ClusterScheduler,
taskLocality: TaskLocality.TaskLocality): HashSet[String] = {
if (TaskLocality.PROCESS_LOCAL == taskLocality) {
// straight forward comparison ! Special case it.
val retval = new HashSet[String]()
scheduler.synchronized {
for (location <- _taskPreferredLocations) {
if (scheduler.isExecutorAliveOnHostPort(location)) {
retval += location
}
}
}
return retval
}
val taskPreferredLocations =
if (TaskLocality.NODE_LOCAL == taskLocality) {
_taskPreferredLocations
} else {
assert (TaskLocality.RACK_LOCAL == taskLocality)
// Expand set to include all 'seen' rack local hosts.
// This works since container allocation/management happens within master - so any rack locality information is updated in msater.
// Best case effort, and maybe sort of kludge for now ... rework it later ?
val hosts = new HashSet[String]
_taskPreferredLocations.foreach(h => {
val rackOpt = scheduler.getRackForHost(h)
if (rackOpt.isDefined) {
val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
if (hostsOpt.isDefined) {
hosts ++= hostsOpt.get
}
}
// Ensure that irrespective of what scheduler says, host is always added !
hosts += h
})
hosts
}
val retval = new HashSet[String]
scheduler.synchronized {
for (prefLocation <- taskPreferredLocations) {
val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1)
if (aliveLocationsOpt.isDefined) {
retval ++= aliveLocationsOpt.get
}
}
}
retval
}
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
// We can infer hostLocalLocations from rackLocalLocations by joining it against tasks(index).preferredLocations (with appropriate
// hostPort <-> host conversion). But not doing it for simplicity sake. If this becomes a performance issue, modify it.
val processLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.PROCESS_LOCAL)
val hostLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val rackLocalLocations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
if (rackLocalLocations.size == 0) {
// Current impl ensures this.
assert (processLocalLocations.size == 0)
assert (hostLocalLocations.size == 0)
pendingTasksWithNoPrefs += index
} else {
// process local locality
for (hostPort <- processLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer())
hostPortList += index
}
// host locality (includes process local)
for (hostPort <- hostLocalLocations) {
// DEBUG Code
Utils.checkHostPort(hostPort)
val host = Utils.parseHostPort(hostPort)._1
val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
hostList += index
}
// rack locality (includes process local and host local)
for (rackLocalHostPort <- rackLocalLocations) {
// DEBUG Code
Utils.checkHostPort(rackLocalHostPort)
val rackLocalHost = Utils.parseHostPort(rackLocalHostPort)._1
val list = pendingRackLocalTasksForHost.getOrElseUpdate(rackLocalHost, ArrayBuffer())
list += index
}
}
allPendingTasks += index
}
// Return the pending tasks list for a given host port (process local), or an empty list if
// there is no map entry for that host
private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = {
// DEBUG Code
Utils.checkHostPort(hostPort)
pendingTasksForHostPort.getOrElse(hostPort, ArrayBuffer())
}
// Return the pending tasks list for a given host, or an empty list if
// there is no map entry for that host
private def getPendingTasksForHost(hostPort: String): ArrayBuffer[Int] = {
val host = Utils.parseHostPort(hostPort)._1
pendingTasksForHost.getOrElse(host, ArrayBuffer())
}
// Return the pending tasks (rack level) list for a given host, or an empty list if
// there is no map entry for that host
private def getRackLocalPendingTasksForHost(hostPort: String): ArrayBuffer[Int] = {
val host = Utils.parseHostPort(hostPort)._1
pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
}
// Number of pending tasks for a given host Port (which would be process local)
def numPendingTasksForHostPort(hostPort: String): Int = {
getPendingTasksForHostPort(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
// Number of pending tasks for a given host (which would be data local)
def numPendingTasksForHost(hostPort: String): Int = {
getPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
// Number of pending rack local tasks for a given host
def numRackLocalPendingTasksForHost(hostPort: String): Int = {
getRackLocalPendingTasksForHost(hostPort).count( index => copiesRunning(index) == 0 && !finished(index) )
}
// Dequeue a pending task from the given list and return its index.
// Return None if the list is empty.
// This method also cleans up any tasks in the list that have already
// been launched, since we want that to happen lazily.
private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
while (!list.isEmpty) {
val index = list.last
list.trimEnd(1)
if (copiesRunning(index) == 0 && !finished(index)) {
return Some(index)
}
}
return None
}
// Return a speculative task for a given host if any are available. The task should not have an
// attempt running on this host, in case the host is slow. In addition, if locality is set, the
// task must have a preference for this host/rack/no preferred locations at all.
private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL))
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
if (speculatableTasks.size > 0) {
val localTask = speculatableTasks.find {
index =>
val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort)
(locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
}
if (localTask != None) {
speculatableTasks -= localTask.get
return localTask
}
// check for rack locality
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
val rackTask = speculatableTasks.find {
index =>
val locations = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
val attemptLocs = taskAttempts(index).map(_.hostPort)
locations.contains(hostPort) && !attemptLocs.contains(hostPort)
}
if (rackTask != None) {
speculatableTasks -= rackTask.get
return rackTask
}
}
// Any task ...
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
// Check for attemptLocs also ?
val nonLocalTask = speculatableTasks.find(i => !taskAttempts(i).map(_.hostPort).contains(hostPort))
if (nonLocalTask != None) {
speculatableTasks -= nonLocalTask.get
return nonLocalTask
}
}
}
return None
}
// Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well.
private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
if (processLocalTask != None) {
return processLocalTask
}
val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
if (localTask != None) {
return localTask
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
val rackLocalTask = findTaskFromList(getRackLocalPendingTasksForHost(hostPort))
if (rackLocalTask != None) {
return rackLocalTask
}
}
// Look for no pref tasks AFTER rack local tasks - this has side effect that we will get to failed tasks later rather than sooner.
// TODO: That code path needs to be revisited (adding to no prefs list when host:port goes down).
val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs)
if (noPrefTask != None) {
return noPrefTask
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
val nonLocalTask = findTaskFromList(allPendingTasks)
if (nonLocalTask != None) {
return nonLocalTask
}
}
// Finally, if all else has failed, find a speculative task
return findSpeculativeTask(hostPort, locality)
}
private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = {
Utils.checkHostPort(hostPort)
val locs = task.preferredLocations
locs.contains(hostPort)
}
private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = {
val locs = task.preferredLocations
// If no preference, consider it as host local
if (locs.isEmpty) return true
val host = Utils.parseHostPort(hostPort)._1
locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
}
// Does a host count as a rack local preferred location for a task? (assumes host is NOT preferred location).
// This is true if either the task has preferred locations and this host is one, or it has
// no preferred locations (in which we still count the launch as preferred).
private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
val locs = task.preferredLocations
val preferredRacks = new HashSet[String]()
for (preferredHost <- locs) {
val rack = sched.getRackForHost(preferredHost)
if (None != rack) preferredRacks += rack.get
}
if (preferredRacks.isEmpty) return false
val hostRack = sched.getRackForHost(hostPort)
return None != hostRack && preferredRacks.contains(hostRack.get)
}
// Respond to an offer of a single slave from the scheduler by finding a task
def slaveOffer(execId: String, hostPort: String, availableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
// If explicitly specified, use that
val locality = if (overrideLocality != null) overrideLocality else {
// expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
val time = System.currentTimeMillis
if (time - lastPreferredLaunchTime < LOCALITY_WAIT) TaskLocality.NODE_LOCAL else TaskLocality.ANY
}
findTask(hostPort, locality) match {
case Some(index) => {
// Found a task; do some bookkeeping and return a Mesos task for it
val task = tasks(index)
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
val taskLocality =
if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL else
if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL else
if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL else
TaskLocality.ANY
val prefStr = taskLocality.toString
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
taskSet.id, index, taskId, execId, hostPort, prefStr))
// Do various bookkeeping
copiesRunning(index) += 1
val time = System.currentTimeMillis
val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
if (TaskLocality.NODE_LOCAL == taskLocality) {
lastPreferredLaunchTime = time
}
// Serialize and return the task
val startTime = System.currentTimeMillis
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime
increaseRunningTasks(1)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
}
}
return None
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
state match {
case TaskState.FINISHED =>
taskFinished(tid, state, serializedData)
case TaskState.LOST =>
taskLost(tid, state, serializedData)
case TaskState.FAILED =>
taskLost(tid, state, serializedData)
case TaskState.KILLED =>
taskLost(tid, state, serializedData)
case _ =>
}
}
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
// We might get two task-lost messages for the same task in coarse-grained Mesos mode,
// or even from Mesos itself when acks get delayed.
return
}
val index = info.index
info.markSuccessful()
decreaseRunningTasks(1)
if (!finished(index)) {
tasksFinished += 1
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
tid, info.duration, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
try {
val result = ser.deserialize[TaskResult[_]](serializedData)
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread().getContextClassLoader
throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
case ex => throw ex
}
// Mark finished and stop if we've finished all the tasks
finished(index) = true
if (tasksFinished == numTasks) {
sched.taskSetFinished(this)
}
} else {
logInfo("Ignoring task-finished event for TID " + tid +
" because task " + index + " is already finished")
}
}
def taskLost(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
if (info.failed) {
// We might get two task-lost messages for the same task in coarse-grained Mesos mode,
// or even from Mesos itself when acks get delayed.
return
}
val index = info.index
info.markFailed()
decreaseRunningTasks(1)
if (!finished(index)) {
logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
if (serializedData != null && serializedData.limit() > 0) {
val reason = ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader)
reason match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
finished(index) = true
tasksFinished += 1
sched.taskSetFinished(this)
decreaseRunningTasks(runningTasks)
return
case taskResultTooBig: TaskResultTooBigFailure =>
logInfo("Loss was due to task %s result exceeding Akka frame size;" +
"aborting job".format(tid))
abort("Task %s result exceeded Akka frame size".format(tid))
return
case ef: ExceptionFailure =>
val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
recentExceptions(key) = (0, now)
(true, 0)
} else {
recentExceptions(key) = (dupCount + 1, printTime)
(false, dupCount + 1)
}
} else {
recentExceptions(key) = (0, now)
(true, 0)
}
}
if (printFull) {
val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case _ => {}
}
}
// On non-fetch failures, re-enqueue the task as pending for a max number of retries
addPendingTask(index)
// Count failed attempts only on FAILED and LOST state (not on KILLED)
if (state == TaskState.FAILED || state == TaskState.LOST) {
numFailures(index) += 1
if (numFailures(index) > MAX_TASK_FAILURES) {
logError("Task %s:%d failed more than %d times; aborting job".format(
taskSet.id, index, MAX_TASK_FAILURES))
abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
}
}
} else {
logInfo("Ignoring task-lost event for TID " + tid +
" because task " + index + " is already finished")
}
}
def error(message: String) {
// Save the error message
abort("Error: " + message)
}
def abort(message: String) {
failed = true
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.listener.taskSetFailed(taskSet, message)
decreaseRunningTasks(runningTasks)
sched.taskSetFinished(this)
}
override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
override def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
//TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
override def addSchedulable(schedulable:Schedulable) {
//nothing
}
override def removeSchedulable(schedulable:Schedulable) {
//nothing
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
sortedTaskSetQueue += this
return sortedTaskSetQueue
}
override def executorLost(execId: String, hostPort: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// If some task has preferred locations only on hostname, and there are no more executors there,
// put it in the no-prefs list to avoid the wait from delay scheduling
// host local tasks - should we push this to rack local or no pref list ? For now, preserving behavior and moving to
// no prefs list. Note, this was done due to impliations related to 'waiting' for data local tasks, etc.
// Note: NOT checking process local list - since host local list is super set of that. We need to ad to no prefs only if
// there is no host local node for the task (not if there is no process local node for the task)
for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
// val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.RACK_LOCAL)
val newLocs = findPreferredLocations(tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
if (newLocs.isEmpty) {
pendingTasksWithNoPrefs += index
}
}
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (finished(index)) {
finished(index) = false
copiesRunning(index) -= 1
tasksFinished -= 1
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
sched.listener.taskEnded(tasks(index), Resubmitted, null, null, info, null)
}
}
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
taskLost(tid, TaskState.KILLED, null)
}
}
/**
* Check for tasks to be speculated and return true if there are any. This is called periodically
* by the ClusterScheduler.
*
* TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
* we don't scan the whole task set. It might also help to make this sorted by launch time.
*/
override def checkSpeculatableTasks(): Boolean = {
// Can't speculate if we only have one task, or if all tasks have finished.
if (numTasks == 1 || tasksFinished == numTasks) {
return false
}
var foundTasks = false
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksFinished >= minFinishedForSpeculation) {
val time = System.currentTimeMillis()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1))
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for ((tid, info) <- taskInfos) {
val index = info.index
if (!finished(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
logInfo(
"Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
taskSet.id, index, info.hostPort, threshold))
speculatableTasks += index
foundTasks = true
}
}
}
return foundTasks
}
override def hasPendingTasks(): Boolean = {
numTasks > 0 && tasksFinished < numTasks
}
private[spark] trait TaskSetManager extends Schedulable {
def taskSet: TaskSet
def slaveOffer(execId: String, hostPort: String, availableCpus: Double,
overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription]
def numPendingTasksForHostPort(hostPort: String): Int
def numRackLocalPendingTasksForHost(hostPort :String): Int
def numPendingTasksForHost(hostPort: String): Int
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
def error(message: String)
}

View file

@ -2,19 +2,50 @@ package spark.scheduler.local
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark._
import spark.TaskState.TaskState
import spark.executor.ExecutorURLClassLoader
import spark.scheduler._
import spark.scheduler.cluster.{TaskLocality, TaskInfo}
import spark.scheduler.cluster._
import akka.actor._
/**
* A simple TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
* A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
* the scheduler also allows each task to fail up to maxFailures times, which is useful for
* testing fault recovery.
*/
private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkContext)
private[spark] case class LocalReviveOffers()
private[spark] case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
private[spark] class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
def receive = {
case LocalReviveOffers =>
launchTask(localScheduler.resourceOffer(freeCores))
case LocalStatusUpdate(taskId, state, serializeData) =>
freeCores += 1
localScheduler.statusUpdate(taskId, state, serializeData)
launchTask(localScheduler.resourceOffer(freeCores))
}
def launchTask(tasks : Seq[TaskDescription]) {
for (task <- tasks) {
freeCores -= 1
localScheduler.threadPool.submit(new Runnable {
def run() {
localScheduler.runTask(task.taskId,task.serializedTask)
}
})
}
}
}
private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: SparkContext)
extends TaskScheduler
with Logging {
@ -30,89 +61,127 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
val classLoader = new ExecutorURLClassLoader(Array(), Thread.currentThread.getContextClassLoader)
// TODO: Need to take into account stage priority in scheduling
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
val activeTaskSets = new HashMap[String, TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
override def start() { }
var localActor: ActorRef = null
override def start() {
//default scheduler is FIFO
val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
//temporarily set rootPool name to empty
rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
schedulableBuilder = {
schedulingMode match {
case "FIFO" =>
new FIFOSchedulableBuilder(rootPool)
case "FAIR" =>
new FairSchedulableBuilder(rootPool)
}
}
schedulableBuilder.buildPools()
localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
}
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
}
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
val failCount = new Array[Int](tasks.size)
def submitTask(task: Task[_], idInJob: Int) {
val myAttemptId = attemptId.getAndIncrement()
threadPool.submit(new Runnable {
def run() {
runTask(task, idInJob, myAttemptId)
}
})
synchronized {
var manager = new LocalTaskSetManager(this, taskSet)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
activeTaskSets(taskSet.id) = manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
localActor ! LocalReviveOffers
}
}
def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
logInfo("Running " + task)
val info = new TaskInfo(attemptId, idInJob, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
try {
Accumulators.clear()
Thread.currentThread().setContextClassLoader(classLoader)
// Serialize and deserialize the task so that accumulators are changed to thread-local ones;
// this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
val ser = SparkEnv.get.closureSerializer.newInstance()
val bytes = Task.serializeWithDependencies(task, sc.addedFiles, sc.addedJars, ser)
logInfo("Size of task " + idInJob + " is " + bytes.limit + " bytes")
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
updateDependencies(taskFiles, taskJars) // Download any files added with addFile
val deserStart = System.currentTimeMillis()
val deserializedTask = ser.deserialize[Task[_]](
taskBytes, Thread.currentThread.getContextClassLoader)
val deserTime = System.currentTimeMillis() - deserStart
// Run it
val result: Any = deserializedTask.run(attemptId)
// Serialize and deserialize the result to emulate what the Mesos
// executor does. This is useful to catch serialization errors early
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
val serResult = ser.serialize(result)
deserializedTask.metrics.get.resultSize = serResult.limit()
val resultToReturn = ser.deserialize[Any](serResult)
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
logInfo("Finished " + task)
info.markSuccessful()
deserializedTask.metrics.get.executorRunTime = info.duration.toInt //close enough
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
// If the threadpool has not already been shutdown, notify DAGScheduler
if (!Thread.currentThread().isInterrupted)
listener.taskEnded(task, Success, resultToReturn, accumUpdates, info, deserializedTask.metrics.getOrElse(null))
} catch {
case t: Throwable => {
logError("Exception in task " + idInJob, t)
failCount.synchronized {
failCount(idInJob) += 1
if (failCount(idInJob) <= maxFailures) {
submitTask(task, idInJob)
} else {
// TODO: Do something nicer here to return all the way to the user
if (!Thread.currentThread().isInterrupted) {
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
listener.taskEnded(task, failure, null, null, info, null)
}
}
}
}
def resourceOffer(freeCores: Int): Seq[TaskDescription] = {
synchronized {
var freeCpuCores = freeCores
val tasks = new ArrayBuffer[TaskDescription](freeCores)
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue) {
logDebug("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
}
}
for ((task, i) <- tasks.zipWithIndex) {
submitTask(task, i)
var launchTask = false
for (manager <- sortedTaskSetQueue) {
do {
launchTask = false
manager.slaveOffer(null,null,freeCpuCores) match {
case Some(task) =>
tasks += task
taskIdToTaskSetId(task.taskId) = manager.taskSet.id
taskSetTaskIds(manager.taskSet.id) += task.taskId
freeCpuCores -= 1
launchTask = true
case None => {}
}
} while(launchTask)
}
return tasks
}
}
def taskSetFinished(manager: TaskSetManager) {
synchronized {
activeTaskSets -= manager.taskSet.id
manager.parent.removeSchedulable(manager)
logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
taskSetTaskIds -= manager.taskSet.id
}
}
def runTask(taskId: Long, bytes: ByteBuffer) {
logInfo("Running " + taskId)
val info = new TaskInfo(taskId, 0, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
val ser = SparkEnv.get.closureSerializer.newInstance()
try {
Accumulators.clear()
Thread.currentThread().setContextClassLoader(classLoader)
// Serialize and deserialize the task so that accumulators are changed to thread-local ones;
// this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
updateDependencies(taskFiles, taskJars) // Download any files added with addFile
val deserStart = System.currentTimeMillis()
val deserializedTask = ser.deserialize[Task[_]](
taskBytes, Thread.currentThread.getContextClassLoader)
val deserTime = System.currentTimeMillis() - deserStart
// Run it
val result: Any = deserializedTask.run(taskId)
// Serialize and deserialize the result to emulate what the Mesos
// executor does. This is useful to catch serialization errors early
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
val serResult = ser.serialize(result)
deserializedTask.metrics.get.resultSize = serResult.limit()
val resultToReturn = ser.deserialize[Any](serResult)
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
logInfo("Finished " + taskId)
deserializedTask.metrics.get.executorRunTime = deserTime.toInt//info.duration.toInt //close enough
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
case t: Throwable => {
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
}
}
}
@ -128,6 +197,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
@ -143,7 +213,16 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
}
}
override def stop() {
def statusUpdate(taskId :Long, state: TaskState, serializedData: ByteBuffer) {
synchronized {
val taskSetId = taskIdToTaskSetId(taskId)
val taskSetManager = activeTaskSets(taskSetId)
taskSetTaskIds(taskSetId) -= taskId
taskSetManager.statusUpdate(taskId, state, serializedData)
}
}
override def stop() {
threadPool.shutdownNow()
}

View file

@ -0,0 +1,172 @@
package spark.scheduler.local
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark._
import spark.TaskState.TaskState
import spark.scheduler._
import spark.scheduler.cluster._
private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging {
var parent: Schedulable = null
var weight: Int = 1
var minShare: Int = 0
var runningTasks: Int = 0
var priority: Int = taskSet.priority
var stageId: Int = taskSet.stageId
var name: String = "TaskSet_"+taskSet.stageId.toString
var failCount = new Array[Int](taskSet.tasks.size)
val taskInfos = new HashMap[Long, TaskInfo]
val numTasks = taskSet.tasks.size
var numFinished = 0
val ser = SparkEnv.get.closureSerializer.newInstance()
val copiesRunning = new Array[Int](numTasks)
val finished = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
val MAX_TASK_FAILURES = sched.maxFailures
def increaseRunningTasks(taskNum: Int): Unit = {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
def decreaseRunningTasks(taskNum: Int): Unit = {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
def addSchedulable(schedulable: Schedulable): Unit = {
//nothing
}
def removeSchedulable(schedulable: Schedulable): Unit = {
//nothing
}
def getSchedulableByName(name: String): Schedulable = {
return null
}
def executorLost(executorId: String, host: String): Unit = {
//nothing
}
def checkSpeculatableTasks(): Boolean = {
return true
}
def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
sortedTaskSetQueue += this
return sortedTaskSetQueue
}
def hasPendingTasks(): Boolean = {
return true
}
def findTask(): Option[Int] = {
for (i <- 0 to numTasks-1) {
if (copiesRunning(i) == 0 && !finished(i)) {
return Some(i)
}
}
return None
}
def slaveOffer(execId: String, hostPort: String, availableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
SparkEnv.set(sched.env)
logDebug("availableCpus:%d,numFinished:%d,numTasks:%d".format(availableCpus.toInt, numFinished, numTasks))
if (availableCpus > 0 && numFinished < numTasks) {
findTask() match {
case Some(index) =>
val taskId = sched.attemptId.getAndIncrement()
val task = taskSet.tasks(index)
val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
taskInfos(taskId) = info
val bytes = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")
val taskName = "task %s:%d".format(taskSet.id, index)
copiesRunning(index) += 1
increaseRunningTasks(1)
return Some(new TaskDescription(taskId, null, taskName, bytes))
case None => {}
}
}
return None
}
def numPendingTasksForHostPort(hostPort: String): Int = {
return 0
}
def numRackLocalPendingTasksForHost(hostPort :String): Int = {
return 0
}
def numPendingTasksForHost(hostPort: String): Int = {
return 0
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
state match {
case TaskState.FINISHED =>
taskEnded(tid, state, serializedData)
case TaskState.FAILED =>
taskFailed(tid, state, serializedData)
case _ => {}
}
}
def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
val index = info.index
val task = taskSet.tasks(index)
info.markSuccessful()
val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
numFinished += 1
decreaseRunningTasks(1)
finished(index) = true
if (numFinished == numTasks) {
sched.taskSetFinished(this)
}
}
def taskFailed(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
val index = info.index
val task = taskSet.tasks(index)
info.markFailed()
decreaseRunningTasks(1)
val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader)
if (!finished(index)) {
copiesRunning(index) -= 1
numFailures(index) += 1
val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s\n%s".format(reason.className, reason.description, locs.mkString("\n")))
if (numFailures(index) > MAX_TASK_FAILURES) {
val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(taskSet.id, index, 4, reason.description)
decreaseRunningTasks(runningTasks)
sched.listener.taskSetFailed(taskSet, errorMessage)
// need to delete failed Taskset from schedule queue
sched.taskSetFinished(this)
}
}
}
def error(message: String) {
}
}

View file

@ -965,7 +965,7 @@ private[spark] object BlockManager extends Logging {
}
def getHeartBeatFrequencyFromSystemProperties: Long =
System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
def getDisableHeartBeatsForTesting: Boolean =
System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean

View file

@ -39,7 +39,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
"5000").toLong
"60000").toLong
var timeoutCheckingTask: Cancellable = null

View file

@ -82,15 +82,15 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def size(): Long = lastValidPosition
}
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
var shuffleSender : ShuffleSender = null
private var shuffleSender : ShuffleSender = null
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
// having really large inodes at the top level.
val localDirs = createLocalDirs()
val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private val localDirs: Array[File] = createLocalDirs()
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
addShutdownHook()
@ -99,7 +99,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
new DiskBlockObjectWriter(blockId, serializer, bufferSize)
}
override def getSize(blockId: String): Long = {
getFile(blockId).length()
}
@ -232,8 +231,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
private def createLocalDirs(): Array[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map(rootDir => {
var foundLocalDir: Boolean = false
rootDirs.split(",").map { rootDir =>
var foundLocalDir = false
var localDir: File = null
var localDirId: String = null
var tries = 0
@ -248,7 +247,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
} catch {
case e: Exception =>
logWarning("Attempt " + tries + " to create local dir failed", e)
logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e)
}
}
if (!foundLocalDir) {
@ -258,7 +257,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
logInfo("Created local directory at " + localDir)
localDir
})
}
}
private def addShutdownHook() {
@ -266,15 +265,16 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
logDebug("Shutdown hook called")
try {
localDirs.foreach { localDir =>
localDirs.foreach { localDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
} catch {
case t: Throwable =>
logError("Exception while deleting local spark dir: " + localDir, t)
}
if (shuffleSender != null) {
shuffleSender.stop
}
} catch {
case t: Throwable => logError("Exception while deleting local spark dirs", t)
}
if (shuffleSender != null) {
shuffleSender.stop
}
}
})

View file

@ -55,21 +55,21 @@ object StorageUtils {
}.mapValues(_.values.toArray)
// For each RDD, generate an RDDInfo object
val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) =>
val rddInfos = groupedRddBlocks.map { case (rddKey, rddBlocks) =>
// Add up memory and disk sizes
val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)
// Find the id of the RDD, e.g. rdd_1 => 1
val rddId = rddKey.split("_").last.toInt
// Get the friendly name for the rdd, if available.
val rdd = sc.persistentRdds(rddId)
val rddName = Option(rdd.name).getOrElse(rddKey)
val rddStorageLevel = rdd.getStorageLevel
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize)
}.toArray
// Get the friendly name and storage level for the RDD, if available
sc.persistentRdds.get(rddId).map { r =>
val rddName = Option(r.name).getOrElse(rddKey)
val rddStorageLevel = r.getStorageLevel
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize)
}
}.flatten.toArray
scala.util.Sorting.quickSort(rddInfos)

View file

@ -29,7 +29,7 @@ private[spark] object AkkaUtils {
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
// 10 seconds is the default akka timeout, but in a cluster, we need higher by default.

View file

@ -0,0 +1,45 @@
package spark.util
import java.io.Serializable
import java.util.{PriorityQueue => JPriorityQueue}
import scala.collection.generic.Growable
import scala.collection.JavaConverters._
/**
* Bounded priority queue. This class wraps the original PriorityQueue
* class and modifies it such that only the top K elements are retained.
* The top K elements are defined by an implicit Ordering[A].
*/
class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
extends Iterable[A] with Growable[A] with Serializable {
private val underlying = new JPriorityQueue[A](maxSize, ord)
override def iterator: Iterator[A] = underlying.iterator.asScala
override def ++=(xs: TraversableOnce[A]): this.type = {
xs.foreach { this += _ }
this
}
override def +=(elem: A): this.type = {
if (size < maxSize) underlying.offer(elem)
else maybeReplaceLowest(elem)
this
}
override def +=(elem1: A, elem2: A, elems: A*): this.type = {
this += elem1 += elem2 ++= elems
}
override def clear() { underlying.clear() }
private def maybeReplaceLowest(a: A): Boolean = {
val head = underlying.peek()
if (head != null && ord.gt(a, head)) {
underlying.poll()
underlying.offer(a)
} else false
}
}

View file

@ -37,17 +37,23 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
if (other == this) {
merge(other.copy()) // Avoid overwriting fields in a weird order
} else {
val delta = other.mu - mu
if (other.n * 10 < n) {
mu = mu + (delta * other.n) / (n + other.n)
} else if (n * 10 < other.n) {
mu = other.mu - (delta * n) / (n + other.n)
} else {
mu = (mu * n + other.mu * other.n) / (n + other.n)
if (n == 0) {
mu = other.mu
m2 = other.m2
n = other.n
} else if (other.n != 0) {
val delta = other.mu - mu
if (other.n * 10 < n) {
mu = mu + (delta * other.n) / (n + other.n)
} else if (n * 10 < other.n) {
mu = other.mu - (delta * n) / (n + other.n)
} else {
mu = (mu * n + other.mu * other.n) / (n + other.n)
}
m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
n += other.n
}
m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
n += other.n
this
this
}
}

View file

@ -7,6 +7,8 @@ import scala.io.Source
import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodec, GzipCodec}
import SparkContext._
@ -26,6 +28,28 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
}
test("text files (compressed)") {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val normalDir = new File(tempDir, "output_normal").getAbsolutePath
val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
val codec = new DefaultCodec()
val data = sc.parallelize("a" * 10000, 1)
data.saveAsTextFile(normalDir)
data.saveAsTextFile(compressedOutputDir, classOf[DefaultCodec])
val normalFile = new File(normalDir, "part-00000")
val normalContent = sc.textFile(normalDir).collect
assert(normalContent === Array.fill(10000)("a"))
val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
val compressedContent = sc.textFile(compressedOutputDir).collect
assert(compressedContent === Array.fill(10000)("a"))
assert(compressedFile.length < normalFile.length)
}
test("SequenceFiles") {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
@ -37,6 +61,28 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}
test("SequenceFile (compressed)") {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val normalDir = new File(tempDir, "output_normal").getAbsolutePath
val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
val codec = new DefaultCodec()
val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
data.saveAsSequenceFile(normalDir)
data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec]))
val normalFile = new File(normalDir, "part-00000")
val normalContent = sc.sequenceFile[String, String](normalDir).collect
assert(normalContent === Array.fill(100)("abc", "abc"))
val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
assert(compressedContent === Array.fill(100)("abc", "abc"))
assert(compressedFile.length < normalFile.length)
}
test("SequenceFile with writable key") {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()

View file

@ -8,6 +8,7 @@ import java.util.*;
import scala.Tuple2;
import com.google.common.base.Charsets;
import org.apache.hadoop.io.compress.DefaultCodec;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@ -473,6 +474,19 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, readRDD.collect());
}
@Test
public void textFilesCompressed() throws IOException {
File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
rdd.saveAsTextFile(outputDir, DefaultCodec.class);
// Try reading it in as a text file RDD
List<String> expected = Arrays.asList("1", "2", "3", "4");
JavaRDD<String> readRDD = sc.textFile(outputDir);
Assert.assertEquals(expected, readRDD.collect());
}
@Test
public void sequenceFile() {
File tempDir = Files.createTempDir();
@ -619,6 +633,37 @@ public class JavaAPISuite implements Serializable {
}).collect().toString());
}
@Test
public void hadoopFileCompressed() {
File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
new Tuple2<Integer, String>(2, "aa"),
new Tuple2<Integer, String>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
DefaultCodec.class);
JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
SequenceFileInputFormat.class, IntWritable.class, Text.class);
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
String>() {
@Override
public String call(Tuple2<IntWritable, Text> x) {
return x.toString();
}
}).collect().toString());
}
@Test
public void zip() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));

View file

@ -1,10 +1,10 @@
package spark
import org.scalatest.FunSuite
import scala.collection.mutable.ArrayBuffer
import SparkContext._
import spark.util.StatCounter
import scala.math.abs
class PartitioningSuite extends FunSuite with LocalSparkContext {
@ -120,4 +120,21 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
}
test("Zero-length partitions should be correctly handled") {
// Create RDD with some consecutive empty partitions (including the "first" one)
sc = new SparkContext("local", "test")
val rdd: RDD[Double] = sc
.parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8)
.filter(_ >= 0.0)
// Run the partitions, including the consecutive empty ones, through StatCounter
val stats: StatCounter = rdd.stats();
assert(abs(6.0 - stats.sum) < 0.01);
assert(abs(6.0/2 - rdd.mean) < 0.01);
assert(abs(1.0 - rdd.variance) < 0.01);
assert(abs(1.0 - rdd.stdev) < 0.01);
// Add other tests here for classes that should be able to handle empty partitions correctly
}
}

View file

@ -19,6 +19,45 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
assert(c(3) === "4")
}
test("advanced pipe") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val bl = sc.broadcast(List("0"))
val piped = nums.pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
(i:Int, f: String=> Unit) => f(i + "_"))
val c = piped.collect()
assert(c.size === 8)
assert(c(0) === "0")
assert(c(1) === "\u0001")
assert(c(2) === "1_")
assert(c(3) === "2_")
assert(c(4) === "0")
assert(c(5) === "\u0001")
assert(c(6) === "3_")
assert(c(7) === "4_")
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str=>str.split("\t")(0)).
pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
(i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
assert(d.size === 8)
assert(d(0) === "0")
assert(d(1) === "\u0001")
assert(d(2) === "b\t2_")
assert(d(3) === "b\t4_")
assert(d(4) === "0")
assert(d(5) === "\u0001")
assert(d(6) === "a\t1_")
assert(d(7) === "a\t3_")
}
test("pipe with env variable") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)

View file

@ -317,4 +317,23 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(sample.size === checkSample.size)
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
}
test("top with predefined ordering") {
sc = new SparkContext("local", "test")
val nums = Array.range(1, 100000)
val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
val topK = ints.top(5)
assert(topK.size === 5)
assert(topK.sorted === nums.sorted.takeRight(5))
}
test("top with custom ordering") {
sc = new SparkContext("local", "test")
val words = Vector("a", "b", "c", "d")
implicit val ord = implicitly[Ordering[String]].reverse
val rdd = sc.makeRDD(words, 2)
val topK = rdd.top(2)
assert(topK.size === 2)
assert(topK.sorted === Array("b", "a"))
}
}

View file

@ -16,7 +16,7 @@ class DummyTaskSetManager(
initNumTasks: Int,
clusterScheduler: ClusterScheduler,
taskSet: TaskSet)
extends TaskSetManager(clusterScheduler,taskSet) {
extends ClusterTaskSetManager(clusterScheduler,taskSet) {
parent = null
weight = 1
@ -88,7 +88,7 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
}
}
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
@ -96,8 +96,11 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
def resourceOffer(rootPool: Pool): Int = {
val taskSetQueue = rootPool.getSortedTaskSetQueue()
for (taskSet <- taskSetQueue)
{
/* Just for Test*/
for (manager <- taskSetQueue) {
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
case Some(task) =>
return taskSet.stageId

View file

@ -0,0 +1,206 @@
package spark.scheduler
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import spark._
import spark.scheduler._
import spark.scheduler.cluster._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ConcurrentMap, HashMap}
import java.util.concurrent.Semaphore
import java.util.concurrent.CountDownLatch
import java.util.Properties
class Lock() {
var finished = false
def jobWait() = {
synchronized {
while(!finished) {
this.wait()
}
}
}
def jobFinished() = {
synchronized {
finished = true
this.notifyAll()
}
}
}
object TaskThreadInfo {
val threadToLock = HashMap[Int, Lock]()
val threadToRunning = HashMap[Int, Boolean]()
val threadToStarted = HashMap[Int, CountDownLatch]()
}
/*
* 1. each thread contains one job.
* 2. each job contains one stage.
* 3. each stage only contains one task.
* 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
* it will get cpu core resource, and will wait to finished after user manually
* release "Lock" and then cluster will contain another free cpu cores.
* 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* thus it will be scheduled later when cluster has free cpu cores.
*/
class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
TaskThreadInfo.threadToRunning(threadIndex) = false
val nums = sc.parallelize(threadIndex to threadIndex, 1)
TaskThreadInfo.threadToLock(threadIndex) = new Lock()
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
}
override def run() {
val ans = nums.map(number => {
TaskThreadInfo.threadToRunning(number) = true
TaskThreadInfo.threadToStarted(number).countDown()
TaskThreadInfo.threadToLock(number).jobWait()
TaskThreadInfo.threadToRunning(number) = false
number
}).collect()
assert(ans.toList === List(threadIndex))
sem.release()
}
}.start()
}
test("Local FIFO scheduler end-to-end test") {
System.setProperty("spark.cluster.schedulingmode", "FIFO")
sc = new SparkContext("local[4]", "test")
val sem = new Semaphore(0)
createThread(1,null,sc,sem)
TaskThreadInfo.threadToStarted(1).await()
createThread(2,null,sc,sem)
TaskThreadInfo.threadToStarted(2).await()
createThread(3,null,sc,sem)
TaskThreadInfo.threadToStarted(3).await()
createThread(4,null,sc,sem)
TaskThreadInfo.threadToStarted(4).await()
// thread 5 and 6 (stage pending)must meet following two points
// 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
// queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
// 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
// So I just use "sleep" 1s here for each thread.
// TODO: any better solution?
createThread(5,null,sc,sem)
Thread.sleep(1000)
createThread(6,null,sc,sem)
Thread.sleep(1000)
assert(TaskThreadInfo.threadToRunning(1) === true)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === false)
assert(TaskThreadInfo.threadToRunning(6) === false)
TaskThreadInfo.threadToLock(1).jobFinished()
TaskThreadInfo.threadToStarted(5).await()
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === false)
TaskThreadInfo.threadToLock(3).jobFinished()
TaskThreadInfo.threadToStarted(6).await()
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === false)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === true)
TaskThreadInfo.threadToLock(2).jobFinished()
TaskThreadInfo.threadToLock(4).jobFinished()
TaskThreadInfo.threadToLock(5).jobFinished()
TaskThreadInfo.threadToLock(6).jobFinished()
sem.acquire(6)
}
test("Local fair scheduler end-to-end test") {
sc = new SparkContext("local[8]", "LocalSchedulerSuite")
val sem = new Semaphore(0)
System.setProperty("spark.cluster.schedulingmode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
createThread(10,"1",sc,sem)
TaskThreadInfo.threadToStarted(10).await()
createThread(20,"2",sc,sem)
TaskThreadInfo.threadToStarted(20).await()
createThread(30,"3",sc,sem)
TaskThreadInfo.threadToStarted(30).await()
assert(TaskThreadInfo.threadToRunning(10) === true)
assert(TaskThreadInfo.threadToRunning(20) === true)
assert(TaskThreadInfo.threadToRunning(30) === true)
createThread(11,"1",sc,sem)
TaskThreadInfo.threadToStarted(11).await()
createThread(21,"2",sc,sem)
TaskThreadInfo.threadToStarted(21).await()
createThread(31,"3",sc,sem)
TaskThreadInfo.threadToStarted(31).await()
assert(TaskThreadInfo.threadToRunning(11) === true)
assert(TaskThreadInfo.threadToRunning(21) === true)
assert(TaskThreadInfo.threadToRunning(31) === true)
createThread(12,"1",sc,sem)
TaskThreadInfo.threadToStarted(12).await()
createThread(22,"2",sc,sem)
TaskThreadInfo.threadToStarted(22).await()
createThread(32,"3",sc,sem)
assert(TaskThreadInfo.threadToRunning(12) === true)
assert(TaskThreadInfo.threadToRunning(22) === true)
assert(TaskThreadInfo.threadToRunning(32) === false)
TaskThreadInfo.threadToLock(10).jobFinished()
TaskThreadInfo.threadToStarted(32).await()
assert(TaskThreadInfo.threadToRunning(32) === true)
//1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
// queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
//2. priority of 23 and 33 will be meaningless as using fair scheduler here.
createThread(23,"2",sc,sem)
createThread(33,"3",sc,sem)
Thread.sleep(1000)
TaskThreadInfo.threadToLock(11).jobFinished()
TaskThreadInfo.threadToStarted(23).await()
assert(TaskThreadInfo.threadToRunning(23) === true)
assert(TaskThreadInfo.threadToRunning(33) === false)
TaskThreadInfo.threadToLock(12).jobFinished()
TaskThreadInfo.threadToStarted(33).await()
assert(TaskThreadInfo.threadToRunning(33) === true)
TaskThreadInfo.threadToLock(20).jobFinished()
TaskThreadInfo.threadToLock(21).jobFinished()
TaskThreadInfo.threadToLock(22).jobFinished()
TaskThreadInfo.threadToLock(23).jobFinished()
TaskThreadInfo.threadToLock(30).jobFinished()
TaskThreadInfo.threadToLock(31).jobFinished()
TaskThreadInfo.threadToLock(32).jobFinished()
TaskThreadInfo.threadToLock(33).jobFinished()
sem.acquire(11)
}
}

View file

@ -34,6 +34,41 @@
<artifactId>scalacheck_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>1.2.5</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
</exclusion>
<exclusion>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.cassandra.deps</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
@ -67,6 +102,11 @@
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
</dependency>
</dependencies>
<build>
<plugins>
@ -105,6 +145,11 @@
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
</dependency>
</dependencies>
<build>
<plugins>
@ -148,6 +193,11 @@
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
</dependency>
</dependencies>
<build>
<plugins>

View file

@ -0,0 +1,196 @@
package spark.examples
import org.apache.hadoop.mapreduce.Job
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
import org.apache.cassandra.thrift._
import spark.SparkContext
import spark.SparkContext._
import java.nio.ByteBuffer
import java.util.SortedMap
import org.apache.cassandra.db.IColumn
import org.apache.cassandra.utils.ByteBufferUtil
import scala.collection.JavaConversions._
/*
* This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra
* support for Hadoop.
*
* To run this example, run this file with the following command params -
* <spark_master> <cassandra_node> <cassandra_port>
*
* So if you want to run this on localhost this will be,
* local[3] localhost 9160
*
* The example makes some assumptions:
* 1. You have already created a keyspace called casDemo and it has a column family named Words
* 2. There are column family has a column named "para" which has test content.
*
* You can create the content by running the following script at the bottom of this file with
* cassandra-cli.
*
*/
object CassandraTest {
def main(args: Array[String]) {
// Get a SparkContext
val sc = new SparkContext(args(0), "casDemo")
// Build the job configuration with ConfigHelper provided by Cassandra
val job = new Job()
job.setInputFormatClass(classOf[ColumnFamilyInputFormat])
val host: String = args(1)
val port: String = args(2)
ConfigHelper.setInputInitialAddress(job.getConfiguration(), host)
ConfigHelper.setInputRpcPort(job.getConfiguration(), port)
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host)
ConfigHelper.setOutputRpcPort(job.getConfiguration(), port)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")
val predicate = new SlicePredicate()
val sliceRange = new SliceRange()
sliceRange.setStart(Array.empty[Byte])
sliceRange.setFinish(Array.empty[Byte])
predicate.setSlice_range(sliceRange)
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate)
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
// Make a new Hadoop RDD
val casRdd = sc.newAPIHadoopRDD(
job.getConfiguration(),
classOf[ColumnFamilyInputFormat],
classOf[ByteBuffer],
classOf[SortedMap[ByteBuffer, IColumn]])
// Let us first get all the paragraphs from the retrieved rows
val paraRdd = casRdd.map {
case (key, value) => {
ByteBufferUtil.string(value.get(ByteBufferUtil.bytes("para")).value())
}
}
// Lets get the word count in paras
val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.collect().foreach {
case (word, count) => println(word + ":" + count)
}
counts.map {
case (word, count) => {
val colWord = new org.apache.cassandra.thrift.Column()
colWord.setName(ByteBufferUtil.bytes("word"))
colWord.setValue(ByteBufferUtil.bytes(word))
colWord.setTimestamp(System.currentTimeMillis)
val colCount = new org.apache.cassandra.thrift.Column()
colCount.setName(ByteBufferUtil.bytes("wcount"))
colCount.setValue(ByteBufferUtil.bytes(count.toLong))
colCount.setTimestamp(System.currentTimeMillis)
val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil
mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
mutations.get(0).column_or_supercolumn.setColumn(colWord)
mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
mutations.get(1).column_or_supercolumn.setColumn(colCount)
(outputkey, mutations)
}
}.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
classOf[ColumnFamilyOutputFormat], job.getConfiguration)
}
}
/*
create keyspace casDemo;
use casDemo;
create column family WordCount with comparator = UTF8Type;
update column family WordCount with column_metadata =
[{column_name: word, validation_class: UTF8Type},
{column_name: wcount, validation_class: LongType}];
create column family Words with comparator = UTF8Type;
update column family Words with column_metadata =
[{column_name: book, validation_class: UTF8Type},
{column_name: para, validation_class: UTF8Type}];
assume Words keys as utf8;
set Words['3musk001']['book'] = 'The Three Musketeers';
set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market
town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to
be in as perfect a state of revolution as if the Huguenots had just made
a second La Rochelle of it. Many citizens, seeing the women flying
toward the High Street, leaving their children crying at the open doors,
hastened to don the cuirass, and supporting their somewhat uncertain
courage with a musket or a partisan, directed their steps toward the
hostelry of the Jolly Miller, before which was gathered, increasing
every minute, a compact group, vociferous and full of curiosity.';
set Words['3musk002']['book'] = 'The Three Musketeers';
set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without
some city or other registering in its archives an event of this kind. There were
nobles, who made war against each other; there was the king, who made
war against the cardinal; there was Spain, which made war against the
king. Then, in addition to these concealed or public, secret or open
wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
who made war upon everybody. The citizens always took up arms readily
against thieves, wolves or scoundrels, often against nobles or
Huguenots, sometimes against the king, but never against cardinal or
Spain. It resulted, then, from this habit that on the said first Monday
of April, 1625, the citizens, on hearing the clamor, and seeing neither
the red-and-yellow standard nor the livery of the Duc de Richelieu,
rushed toward the hostel of the Jolly Miller. When arrived there, the
cause of the hubbub was apparent to all';
set Words['3musk003']['book'] = 'The Three Musketeers';
set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however
large the sum may be; but you ought also to endeavor to perfect yourself in
the exercises becoming a gentleman. I will write a letter today to the
Director of the Royal Academy, and tomorrow he will admit you without
any expense to yourself. Do not refuse this little service. Our
best-born and richest gentlemen sometimes solicit it without being able
to obtain it. You will learn horsemanship, swordsmanship in all its
branches, and dancing. You will make some desirable acquaintances; and
from time to time you can call upon me, just to tell me how you are
getting on, and to say whether I can be of further service to you.';
set Words['thelostworld001']['book'] = 'The Lost World';
set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined
against the red curtain. How beautiful she was! And yet how aloof! We had been
friends, quite good friends; but never could I get beyond the same
comradeship which I might have established with one of my
fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
and perfectly unsexual. My instincts are all against a woman being too
frank and at her ease with me. It is no compliment to a man. Where
the real sex feeling begins, timidity and distrust are its companions,
heritage from old wicked days when love and violence went often hand in
hand. The bent head, the averted eye, the faltering voice, the wincing
figure--these, and not the unshrinking gaze and frank reply, are the
true signals of passion. Even in my short life I had learned as much
as that--or had inherited it in that race memory which we call instinct.';
set Words['thelostworld002']['book'] = 'The Lost World';
set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed,
red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was
the real boss; but he lived in the rarefied atmosphere of some Olympian
height from which he could distinguish nothing smaller than an
international crisis or a split in the Cabinet. Sometimes we saw him
passing in lonely majesty to his inner sanctum, with his eyes staring
vaguely and his mind hovering over the Balkans or the Persian Gulf. He
was above and beyond us. But McArdle was his first lieutenant, and it
was he that we knew. The old man nodded as I entered the room, and he
pushed his spectacles far up on his bald forehead.';
*/

View file

@ -0,0 +1,35 @@
package spark.examples
import spark._
import spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
object HBaseTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HBaseTest",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val conf = HBaseConfiguration.create()
// Other options for configuring scan behavior are available. More information available at
// http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html
conf.set(TableInputFormat.INPUT_TABLE, args(1))
// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if(!admin.isTableAvailable(args(1))) {
val tableDesc = new HTableDescriptor(args(1))
admin.createTable(tableDesc)
}
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
}

11
pom.xml
View file

@ -59,6 +59,9 @@
<slf4j.version>1.6.1</slf4j.version>
<cdh.version>4.1.2</cdh.version>
<log4j.version>1.2.17</log4j.version>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
</properties>
<repositories>
@ -187,9 +190,9 @@
<version>0.8.4</version>
</dependency>
<dependency>
<groupId>asm</groupId>
<artifactId>asm-all</artifactId>
<version>3.3.1</version>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>4.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
@ -392,6 +395,8 @@
<jvmArgs>
<jvmArg>-Xms64m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
<jvmArg>-XX:PermSize=${PermGen}</jvmArg>
<jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
</jvmArgs>
<javacArgs>
<javacArg>-source</javacArg>

View file

@ -148,7 +148,7 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"commons-daemon" % "commons-daemon" % "1.0.10",
"com.ning" % "compress-lzf" % "0.8.4",
"asm" % "asm-all" % "3.3.1",
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.22",
"com.typesafe.akka" % "akka-actor" % "2.0.3" excludeAll(excludeNetty),
@ -201,7 +201,21 @@ object SparkBuild extends Build {
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.11")
resolvers ++= Seq("Apache HBase" at "https://repository.apache.org/content/repositories/releases"),
libraryDependencies ++= Seq(
"com.twitter" % "algebird-core_2.9.2" % "0.1.11",
"org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty),
"org.apache.cassandra" % "cassandra-all" % "1.2.5"
exclude("com.google.guava", "guava")
exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru")
exclude("com.ning","compress-lzf")
exclude("io.netty", "netty")
exclude("jline","jline")
exclude("log4j","log4j")
exclude("org.apache.cassandra.deps", "avro")
)
)
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")

View file

@ -8,7 +8,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.objectweb.asm._
import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.Opcodes._
@ -83,7 +82,7 @@ extends ClassLoader(parent) {
}
class ConstructorCleaner(className: String, cv: ClassVisitor)
extends ClassAdapter(cv) {
extends ClassVisitor(ASM4, cv) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
val mv = cv.visitMethod(access, name, desc, sig, exceptions)