diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala new file mode 100644 index 0000000000..cb30cb2ac8 --- /dev/null +++ b/core/src/main/scala/spark/DaemonThreadFactory.scala @@ -0,0 +1,14 @@ +package spark + +import java.util.concurrent.ThreadFactory + +/** + * A ThreadFactory that creates daemon threads + */ +private object DaemonThreadFactory extends ThreadFactory { + override def newThread(r: Runnable): Thread = { + val t = new Thread(r); + t.setDaemon(true) + return t + } +} \ No newline at end of file diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 20954a1224..8518229856 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -2,6 +2,8 @@ package spark import java.util.concurrent._ +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet import scala.collection.mutable.Map /** @@ -14,28 +16,27 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { override def start() {} override def waitForRegister() {} - - override def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]) - : Array[T] = { - val futures = new Array[Future[TaskResult[T]]](tasks.length) - - for (i <- 0 until tasks.length) { - futures(i) = threadPool.submit(new Callable[TaskResult[T]]() { - def call(): TaskResult[T] = { + + val completionEvents = new LinkedBlockingQueue[CompletionEvent] + + def submitTasks(tasks: Seq[Task[_]]) { + tasks.zipWithIndex.foreach { case (task, i) => + threadPool.submit(new Runnable { + def run() { logInfo("Running task " + i) try { // Serialize and deserialize the task so that accumulators are // changed to thread-local ones; this adds a bit of unnecessary - // overhead but matches how the Nexus Executor works + // overhead but matches how the Mesos Executor works Accumulators.clear val bytes = Utils.serialize(tasks(i)) logInfo("Size of task " + i + " is " + bytes.size + " bytes") - val task = Utils.deserialize[Task[T]]( + val task = Utils.deserialize[Task[_]]( bytes, currentThread.getContextClassLoader) - val value = task.run + val result: Any = task.run val accumUpdates = Accumulators.values logInfo("Finished task " + i) - new TaskResult[T](value, accumUpdates) + completionEvents.put(CompletionEvent(task, true, result, accumUpdates)) } catch { case e: Exception => { // TODO: Do something nicer here @@ -47,26 +48,233 @@ private class LocalScheduler(threads: Int) extends Scheduler with Logging { } }) } - - val taskResults = futures.map(_.get) - for (result <- taskResults) - Accumulators.add(currentThread, result.accumUpdates) - return taskResults.map(_.value).toArray(m) } override def stop() {} override def numCores() = threads -} + var nextStageId = 0 -/** - * A ThreadFactory that creates daemon threads - */ -private object DaemonThreadFactory extends ThreadFactory { - override def newThread(r: Runnable): Thread = { - val t = new Thread(r); - t.setDaemon(true) - return t + def newStageId() = { + var res = nextStageId + nextStageId += 1 + res + } + + val idToStage = new HashMap[Int, Stage] + + val shuffleToMapStage = new HashMap[ShuffleDependency[_,_,_], Stage] + + val cacheLocs = new HashMap[RDD[_], Array[List[String]]] + + def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { + cacheLocs.getOrElseUpdate(rdd, Array.fill[List[String]](rdd.splits.size)(Nil)) + } + + def addCacheLoc(rdd: RDD[_], partition: Int, host: String) { + val locs = getCacheLocs(rdd) + locs(partition) = host :: locs(partition) + } + + def removeCacheLoc(rdd: RDD[_], partition: Int, host: String) { + val locs = getCacheLocs(rdd) + locs(partition) -= host + } + + def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = { + shuffleToMapStage.get(shuf) match { + case Some(stage) => stage + case None => + val stage = newStage( + true, shuf.rdd, shuf.spec.partitioner.numPartitions) + shuffleToMapStage(shuf) = stage + stage + } + } + + def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = { + val id = newStageId() + val parents = getParentStages(rdd) + val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions) + idToStage(id) = stage + stage + } + + def getParentStages(rdd: RDD[_]): List[Stage] = { + val parents = new HashSet[Stage] + val visited = new HashSet[RDD[_]] + def visit(r: RDD[_]) { + if (!visited(r)) { + visited += r + for (dep <- r.dependencies) { + dep match { + case shufDep: ShuffleDependency[_,_,_] => + parents += getShuffleMapStage(shufDep) + case _ => + visit(dep.rdd) + } + } + } + } + visit(rdd) + parents.toList + } + + def getMissingParentStages(stage: Stage): List[Stage] = { + val missing = new HashSet[Stage] + val visited = new HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + if (!visited(rdd)) { + visited += rdd + val locs = getCacheLocs(rdd) + for (p <- 0 until rdd.splits.size) { + if (locs(p) == Nil) { + for (dep <- rdd.dependencies) { + dep match { + case shufDep: ShuffleDependency[_,_,_] => + val stage = getShuffleMapStage(shufDep) + if (!stage.isAvailable) + missing += stage + case narrowDep: NarrowDependency[_] => + visit(narrowDep.rdd) + } + } + } + } + } + } + visit(stage.rdd) + missing.toList + } + + override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) + : Array[U] = { + val numOutputParts: Int = rdd.splits.size + val finalStage = newStage(false, rdd, numOutputParts) + val results = new Array[U](numOutputParts) + val finished = new Array[Boolean](numOutputParts) + var numFinished = 0 + + val waiting = new HashSet[Stage] + val running = new HashSet[Stage] + val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] + + def submitStage(stage: Stage) { + if (!waiting(stage) && !running(stage)) { + val missing = getMissingParentStages(stage) + if (missing == Nil) { + logInfo("Submitting " + stage + ", which has no missing parents") + submitMissingTasks(stage) + running += stage + } else { + for (parent <- missing) + submitStage(parent) + waiting += stage + } + } + } + + def submitMissingTasks(stage: Stage) { + var tasks: List[Task[_]] = Nil + if (stage == finalStage) { + for (p <- 0 until numOutputParts if (!finished(p))) { + val locs = getPreferredLocs(rdd, p) + tasks = new ResultTask(rdd, func, p, locs) :: tasks + } + } + submitTasks(tasks) + } + + submitStage(finalStage) + + while (numFinished != numOutputParts) { + val evt = completionEvents.take() + if (evt.successful) { + evt.task match { + case rt: ResultTask[_, _] => + results(rt.partition) = evt.result.asInstanceOf[U] + finished(rt.partition) = true + numFinished += 1 + // case smt: ShuffleMapTask + } + } else { + throw new SparkException("Task failed: " + evt.task) + // TODO: Kill the running job + } + } + + return results + } + + def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = { + // If the partition is cached, return the cache locations + val cached = getCacheLocs(rdd)(partition) + if (cached != Nil) { + return cached + } + // If the RDD has some placement preferences (as is the case for input RDDs), get those + val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList + if (rddPrefs != Nil) { + return rddPrefs + } + // If the RDD has narrow dependencies, pick the first partition of the first narrow dep + // that has any placement preferences. Ideally we would choose based on transfer sizes, + // but this will do for now. + rdd.dependencies.foreach(_ match { + case n: NarrowDependency[_] => + for (inPart <- n.getParents(partition)) { + val locs = getPreferredLocs(n.rdd, inPart) + if (locs != Nil) + return locs; + } + }) + return Nil } } + +case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any]) + +class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String]) +extends Task[U] { + val split = rdd.splits(partition) + + override def run: U = { + func(rdd.iterator(split)) + } + + override def preferredLocations: Seq[String] = locs + + override def toString = "ResultTask " + partition +} + +class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) { + val outputLocs = Array.fill[List[String]](numPartitions)(Nil) + var numAvailableOutputs = 0 + + def isAvailable: Boolean = { + if (parents.size == 0 && !isShuffleMap) + true + else + numAvailableOutputs == numPartitions + } + + def addOutputLoc(partition: Int, host: String) { + val prevList = outputLocs(partition) + outputLocs(partition) = host :: prevList + if (prevList == Nil) + numAvailableOutputs += 1 + } + + def removeOutputLoc(partition: Int, host: String) { + val prevList = outputLocs(partition) + val newList = prevList - host + outputLocs(partition) = newList + if (prevList != Nil && newList == Nil) + numAvailableOutputs -= 1 + } + + override def toString = "Stage " + id + + override def hashCode(): Int = id +} \ No newline at end of file diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index 6a592d13c3..35ad552775 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -105,7 +105,7 @@ extends MScheduler with spark.Scheduler with Logging * The primary means to submit a job to the scheduler. Given a list of tasks, * runs them and returns an array of the results. */ - override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = { + def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = { waitForRegister() val jobId = newJobId() val myJob = new SimpleJob(this, tasks, jobId) @@ -291,4 +291,9 @@ extends MScheduler with spark.Scheduler with Logging // Serialize the map as an array of (String, String) pairs return Utils.serialize(props.toArray) } + + override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]) + : Array[U] = { + new Array[U](0) + } } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 39f2dc4458..391b54f4eb 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -12,25 +12,51 @@ import SparkContext._ import mesos._ +abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) + +abstract class NarrowDependency[T](rdd: RDD[T]) +extends Dependency(rdd, false) { + def getParents(outputPartition: Int): Seq[Int] +} + +class ShuffleDependency[K, V, C]( + rdd: RDD[(K, V)], + val spec: ShuffleSpec[K, V, C] +) extends Dependency(rdd, true) + +class ShuffleSpec[K, V, C] ( + val createCombiner: V => C, + val mergeValue: (C, V) => C, + val mergeCombiners: (C, C) => C, + val partitioner: Partitioner[K] +) + +abstract class Partitioner[K] { + def numPartitions: Int + def getPartition(key: K): Int +} @serializable abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { def splits: Array[Split] def iterator(split: Split): Iterator[T] def preferredLocations(split: Split): Seq[String] + + def dependencies: List[Dependency[_]] = Nil + def partitioner: Option[Partitioner[_]] = None def taskStarted(split: Split, slot: SlaveOffer) {} def sparkContext = sc - def map[U: ClassManifest](f: T => U) = new MappedRDD(this, sc.clean(f)) - def filter(f: T => Boolean) = new FilteredRDD(this, sc.clean(f)) + def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) + def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) def cache() = new CachedRDD(this) - def sample(withReplacement: Boolean, frac: Double, seed: Int) = + def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] = new SampledRDD(this, withReplacement, frac, seed) - def flatMap[U: ClassManifest](f: T => Traversable[U]) = + def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) def foreach(f: T => Unit) { @@ -40,8 +66,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { } def collect(): Array[T] = { - val tasks = splits.map(s => new CollectTask(this, s)) - val results = sc.runTaskObjects(tasks) + val results = sc.scheduler.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } @@ -49,9 +74,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) - val tasks = splits.map(s => new ReduceTask(this, s, f)) + val reducePartition: Iterator[T] => Option[T] = iter => { + if (iter.hasNext) + Some(iter.reduceLeft(f)) + else + None + } + val options = sc.scheduler.runJob(this, reducePartition) val results = new ArrayBuffer[T] - for (option <- sc.runTaskObjects(tasks); elem <- option) + for (opt <- options; elem <- opt) results += elem if (results.size == 0) throw new UnsupportedOperationException("empty collection") @@ -77,20 +108,20 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { } def count(): Long = { - try { + try { map(x => 1L).reduce(_+_) } catch { case e: UnsupportedOperationException => 0L // No elements in RDD } } - def union(other: RDD[T]) = new UnionRDD(sc, Array(this, other)) + def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other)) - def ++(other: RDD[T]) = this.union(other) + def ++(other: RDD[T]): RDD[T] = this.union(other) - def splitRdd() = new SplitRDD(this) + def splitRdd(): RDD[Array[T]] = new SplitRDD(this) - def cartesian[U: ClassManifest](other: RDD[U]) = + def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] = diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala index b9f3128c82..fbcbb3e935 100644 --- a/core/src/main/scala/spark/Scheduler.scala +++ b/core/src/main/scala/spark/Scheduler.scala @@ -4,7 +4,8 @@ package spark private trait Scheduler { def start() def waitForRegister() - def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T] + //def runTasks[T](tasks: Array[Task[T]])(implicit m: ClassManifest[T]): Array[T] + def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U]): Array[U] def stop() def numCores(): Int } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bf70b5fcb1..7c30587928 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -14,7 +14,7 @@ class SparkContext( val sparkHome: String = null, val jars: Seq[String] = Nil) extends Logging { - private var scheduler: Scheduler = { + private[spark] var scheduler: Scheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r master match { @@ -126,19 +126,17 @@ extends Logging { None } - // Submit an array of tasks (passed as functions) to the scheduler - def runTasks[T: ClassManifest](tasks: Array[() => T]): Array[T] = { - runTaskObjects(tasks.map(f => new FunctionTask(f))) - } - // Run an array of spark.Task objects private[spark] def runTaskObjects[T: ClassManifest](tasks: Seq[Task[T]]) : Array[T] = { + return null; + /* logInfo("Running " + tasks.length + " tasks in parallel") val start = System.nanoTime val result = scheduler.runTasks(tasks.toArray) logInfo("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s") return result + */ } // Clean a closure to make it ready to serialized and send to tasks diff --git a/examples/src/main/scala/spark/examples/CpuHog.scala b/examples/src/main/scala/spark/examples/CpuHog.scala deleted file mode 100644 index 94b3709850..0000000000 --- a/examples/src/main/scala/spark/examples/CpuHog.scala +++ /dev/null @@ -1,26 +0,0 @@ -package spark.examples - -import spark._ - -object CpuHog { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: CpuHog "); - System.exit(1) - } - val sc = new SparkContext(args(0), "CPU hog") - val tasks = args(1).toInt - val threads = args(2).toInt - def task { - for (i <- 0 until threads-1) { - new Thread() { - override def run { - while(true) {} - } - }.start() - } - while(true) {} - } - sc.runTasks(Array.make(tasks, () => task)) - } -} diff --git a/examples/src/main/scala/spark/examples/SleepJob.scala b/examples/src/main/scala/spark/examples/SleepJob.scala deleted file mode 100644 index 02673a5f88..0000000000 --- a/examples/src/main/scala/spark/examples/SleepJob.scala +++ /dev/null @@ -1,21 +0,0 @@ -package spark.examples - -import spark._ - -object SleepJob { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: SleepJob "); - System.exit(1) - } - val sc = new SparkContext(args(0), "Sleep job") - val tasks = args(1).toInt - val duration = args(2).toInt - def task { - val start = System.currentTimeMillis - while (System.currentTimeMillis - start < duration * 1000L) - Thread.sleep(200) - } - sc.runTasks(Array.make(tasks, () => task)) - } -}