More work on new RDD design
This commit is contained in:
parent
f38f86d59e
commit
9e59afd710
8
core/src/main/scala/spark/Aggregator.scala
Normal file
8
core/src/main/scala/spark/Aggregator.scala
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
@serializable
|
||||||
|
class Aggregator[K, V, C] (
|
||||||
|
val createCombiner: V => C,
|
||||||
|
val mergeValue: (C, V) => C,
|
||||||
|
val mergeCombiners: (C, C) => C
|
||||||
|
)
|
44
core/src/main/scala/spark/CartesianRDD.scala
Normal file
44
core/src/main/scala/spark/CartesianRDD.scala
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
@serializable class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
|
||||||
|
extends Split {
|
||||||
|
override val index = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
@serializable
|
||||||
|
class CartesianRDD[T: ClassManifest, U:ClassManifest](
|
||||||
|
sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U])
|
||||||
|
extends RDD[Pair[T, U]](sc) {
|
||||||
|
val numSplitsInRdd2 = rdd2.splits.size
|
||||||
|
|
||||||
|
@transient val splits_ = {
|
||||||
|
// create the cross product split
|
||||||
|
val array = new Array[Split](rdd1.splits.size * rdd2.splits.size)
|
||||||
|
for (s1 <- rdd1.splits; s2 <- rdd2.splits) {
|
||||||
|
val idx = s1.index * numSplitsInRdd2 + s2.index
|
||||||
|
array(idx) = new CartesianSplit(idx, s1, s2)
|
||||||
|
}
|
||||||
|
array
|
||||||
|
}
|
||||||
|
|
||||||
|
override def splits = splits_.asInstanceOf[Array[Split]]
|
||||||
|
|
||||||
|
override def preferredLocations(split: Split) = {
|
||||||
|
val currSplit = split.asInstanceOf[CartesianSplit]
|
||||||
|
rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def compute(split: Split) = {
|
||||||
|
val currSplit = split.asInstanceOf[CartesianSplit]
|
||||||
|
for (x <- rdd1.iterator(currSplit.s1); y <- rdd2.iterator(currSplit.s2)) yield (x, y)
|
||||||
|
}
|
||||||
|
|
||||||
|
override val dependencies = List(
|
||||||
|
new NarrowDependency(rdd1) {
|
||||||
|
def getParents(id: Int): Seq[Int] = List(id / numSplitsInRdd2)
|
||||||
|
},
|
||||||
|
new NarrowDependency(rdd2) {
|
||||||
|
def getParents(id: Int): Seq[Int] = List(id % numSplitsInRdd2)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
30
core/src/main/scala/spark/Dependency.scala
Normal file
30
core/src/main/scala/spark/Dependency.scala
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
@serializable
|
||||||
|
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
|
||||||
|
|
||||||
|
abstract class NarrowDependency[T](rdd: RDD[T])
|
||||||
|
extends Dependency(rdd, false) {
|
||||||
|
def getParents(outputPartition: Int): Seq[Int]
|
||||||
|
}
|
||||||
|
|
||||||
|
class ShuffleDependency[K, V, C](
|
||||||
|
val shuffleId: Int,
|
||||||
|
rdd: RDD[(K, V)],
|
||||||
|
val aggregator: Aggregator[K, V, C],
|
||||||
|
val partitioner: Partitioner[K]
|
||||||
|
) extends Dependency(rdd, true)
|
||||||
|
|
||||||
|
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
|
||||||
|
override def getParents(partitionId: Int) = List(partitionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
|
||||||
|
extends NarrowDependency[T](rdd) {
|
||||||
|
override def getParents(partitionId: Int) = {
|
||||||
|
if (partitionId >= outStart && partitionId < outStart + length)
|
||||||
|
List(partitionId - outStart + inStart)
|
||||||
|
else
|
||||||
|
Nil
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,8 @@ class Executor extends mesos.Executor with Logging {
|
||||||
// Initialize cache and broadcast system (uses some properties read above)
|
// Initialize cache and broadcast system (uses some properties read above)
|
||||||
Cache.initialize()
|
Cache.initialize()
|
||||||
Broadcast.initialize(false)
|
Broadcast.initialize(false)
|
||||||
|
MapOutputTracker.initialize(false)
|
||||||
|
RDDCache.initialize(false)
|
||||||
|
|
||||||
// Create our ClassLoader (using spark properties) and set it on this thread
|
// Create our ClassLoader (using spark properties) and set it on this thread
|
||||||
classLoader = createClassLoader()
|
classLoader = createClassLoader()
|
||||||
|
|
|
@ -14,12 +14,13 @@ import org.apache.hadoop.mapred.Reporter
|
||||||
import org.apache.hadoop.util.ReflectionUtils
|
import org.apache.hadoop.util.ReflectionUtils
|
||||||
|
|
||||||
/** A Spark split class that wraps around a Hadoop InputSplit */
|
/** A Spark split class that wraps around a Hadoop InputSplit */
|
||||||
@serializable class HadoopSplit(@transient s: InputSplit)
|
@serializable class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
|
||||||
extends Split {
|
extends Split {
|
||||||
val inputSplit = new SerializableWritable[InputSplit](s)
|
val inputSplit = new SerializableWritable[InputSplit](s)
|
||||||
|
|
||||||
// Hadoop gives each split a unique toString value, so use this as our ID
|
override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
|
||||||
override def getId() = "HadoopSplit(" + inputSplit.toString + ")"
|
|
||||||
|
override val index = idx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -39,7 +40,10 @@ extends RDD[(K, V)](sc) {
|
||||||
FileInputFormat.setInputPaths(conf, path)
|
FileInputFormat.setInputPaths(conf, path)
|
||||||
val inputFormat = createInputFormat(conf)
|
val inputFormat = createInputFormat(conf)
|
||||||
val inputSplits = inputFormat.getSplits(conf, sc.numCores)
|
val inputSplits = inputFormat.getSplits(conf, sc.numCores)
|
||||||
inputSplits.map(x => new HadoopSplit(x): Split).toArray
|
val array = new Array[Split] (inputSplits.size)
|
||||||
|
for (i <- 0 until inputSplits.size)
|
||||||
|
array(i) = new HadoopSplit(id, i, inputSplits(i))
|
||||||
|
array
|
||||||
}
|
}
|
||||||
|
|
||||||
def createInputFormat(conf: JobConf): InputFormat[K, V] = {
|
def createInputFormat(conf: JobConf): InputFormat[K, V] = {
|
||||||
|
@ -49,7 +53,7 @@ extends RDD[(K, V)](sc) {
|
||||||
|
|
||||||
override def splits = splits_
|
override def splits = splits_
|
||||||
|
|
||||||
override def iterator(theSplit: Split) = new Iterator[(K, V)] {
|
override def compute(theSplit: Split) = new Iterator[(K, V)] {
|
||||||
val split = theSplit.asInstanceOf[HadoopSplit]
|
val split = theSplit.asInstanceOf[HadoopSplit]
|
||||||
var reader: RecordReader[K, V] = null
|
var reader: RecordReader[K, V] = null
|
||||||
|
|
||||||
|
@ -99,6 +103,8 @@ extends RDD[(K, V)](sc) {
|
||||||
val hadoopSplit = split.asInstanceOf[HadoopSplit]
|
val hadoopSplit = split.asInstanceOf[HadoopSplit]
|
||||||
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
|
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override val dependencies: List[Dependency[_]] = Nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,34 @@ package spark
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
|
import scala.actors._
|
||||||
|
import scala.actors.Actor._
|
||||||
|
import scala.actors.remote._
|
||||||
|
|
||||||
|
class MapOutputTracker extends DaemonActor with Logging {
|
||||||
|
def act() {
|
||||||
|
val port = System.getProperty("spark.master.port", "50501").toInt
|
||||||
|
RemoteActor.alive(port)
|
||||||
|
RemoteActor.register('MapOutputTracker, self)
|
||||||
|
logInfo("Started on port " + port)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
object MapOutputTracker {
|
object MapOutputTracker {
|
||||||
|
var trackerActor: AbstractActor = null
|
||||||
|
|
||||||
|
def initialize(isMaster: Boolean) {
|
||||||
|
if (isMaster) {
|
||||||
|
val tracker = new MapOutputTracker
|
||||||
|
tracker.start
|
||||||
|
trackerActor = tracker
|
||||||
|
} else {
|
||||||
|
val host = System.getProperty("spark.master.host")
|
||||||
|
val port = System.getProperty("spark.master.port").toInt
|
||||||
|
trackerActor = RemoteActor.select(Node(host, port), 'MapOutputTracker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private val serverUris = new ConcurrentHashMap[Int, Array[String]]
|
private val serverUris = new ConcurrentHashMap[Int, Array[String]]
|
||||||
|
|
||||||
def registerMapOutput(shuffleId: Int, numMaps: Int, mapId: Int, serverUri: String) {
|
def registerMapOutput(shuffleId: Int, numMaps: Int, mapId: Int, serverUri: String) {
|
||||||
|
|
|
@ -17,8 +17,7 @@ extends Split {
|
||||||
case _ => false
|
case _ => false
|
||||||
}
|
}
|
||||||
|
|
||||||
override def getId() =
|
override val index = slice
|
||||||
"ParallelArraySplit(arrayId %d, slice %d)".format(arrayId, slice)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class ParallelArray[T: ClassManifest](
|
class ParallelArray[T: ClassManifest](
|
||||||
|
@ -28,8 +27,6 @@ extends RDD[T](sc) {
|
||||||
// the RDD chain it gets cached. It might be worthwhile to write the data to
|
// the RDD chain it gets cached. It might be worthwhile to write the data to
|
||||||
// a file in the DFS and read it in the split instead.
|
// a file in the DFS and read it in the split instead.
|
||||||
|
|
||||||
val id = ParallelArray.newId()
|
|
||||||
|
|
||||||
@transient val splits_ = {
|
@transient val splits_ = {
|
||||||
val slices = ParallelArray.slice(data, numSlices).toArray
|
val slices = ParallelArray.slice(data, numSlices).toArray
|
||||||
slices.indices.map(i => new ParallelArraySplit(id, i, slices(i))).toArray
|
slices.indices.map(i => new ParallelArraySplit(id, i, slices(i))).toArray
|
||||||
|
@ -37,9 +34,11 @@ extends RDD[T](sc) {
|
||||||
|
|
||||||
override def splits = splits_.asInstanceOf[Array[Split]]
|
override def splits = splits_.asInstanceOf[Array[Split]]
|
||||||
|
|
||||||
override def iterator(s: Split) = s.asInstanceOf[ParallelArraySplit[T]].iterator
|
override def compute(s: Split) = s.asInstanceOf[ParallelArraySplit[T]].iterator
|
||||||
|
|
||||||
override def preferredLocations(s: Split): Seq[String] = Nil
|
override def preferredLocations(s: Split): Seq[String] = Nil
|
||||||
|
|
||||||
|
override val dependencies: List[Dependency[_]] = Nil
|
||||||
}
|
}
|
||||||
|
|
||||||
private object ParallelArray {
|
private object ParallelArray {
|
||||||
|
|
22
core/src/main/scala/spark/Partitioner.scala
Normal file
22
core/src/main/scala/spark/Partitioner.scala
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
@serializable
|
||||||
|
abstract class Partitioner[K] {
|
||||||
|
def numPartitions: Int
|
||||||
|
def getPartition(key: K): Int
|
||||||
|
}
|
||||||
|
|
||||||
|
class HashPartitioner[K](partitions: Int) extends Partitioner[K] {
|
||||||
|
def numPartitions = partitions
|
||||||
|
|
||||||
|
def getPartition(key: K) = {
|
||||||
|
val mod = key.hashCode % partitions
|
||||||
|
if (mod < 0) mod + partitions else mod // Guard against negative hash codes
|
||||||
|
}
|
||||||
|
|
||||||
|
override def equals(other: Any): Boolean = other match {
|
||||||
|
case h: HashPartitioner[_] =>
|
||||||
|
h.numPartitions == numPartitions
|
||||||
|
case _ => false
|
||||||
|
}
|
||||||
|
}
|
|
@ -15,85 +15,79 @@ import SparkContext._
|
||||||
|
|
||||||
import mesos._
|
import mesos._
|
||||||
|
|
||||||
@serializable
|
|
||||||
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
|
|
||||||
|
|
||||||
abstract class NarrowDependency[T](rdd: RDD[T])
|
|
||||||
extends Dependency(rdd, false) {
|
|
||||||
def getParents(outputPartition: Int): Seq[Int]
|
|
||||||
}
|
|
||||||
|
|
||||||
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
|
|
||||||
override def getParents(partitionId: Int) = List(partitionId)
|
|
||||||
}
|
|
||||||
|
|
||||||
class ShuffleDependency[K, V, C](
|
|
||||||
val shuffleId: Int,
|
|
||||||
rdd: RDD[(K, V)],
|
|
||||||
val aggregator: Aggregator[K, V, C],
|
|
||||||
val partitioner: Partitioner[K]
|
|
||||||
) extends Dependency(rdd, true)
|
|
||||||
|
|
||||||
@serializable
|
|
||||||
class Aggregator[K, V, C] (
|
|
||||||
val createCombiner: V => C,
|
|
||||||
val mergeValue: (C, V) => C,
|
|
||||||
val mergeCombiners: (C, C) => C
|
|
||||||
)
|
|
||||||
|
|
||||||
@serializable
|
|
||||||
abstract class Partitioner[K] {
|
|
||||||
def numPartitions: Int
|
|
||||||
def getPartition(key: K): Int
|
|
||||||
}
|
|
||||||
|
|
||||||
class HashPartitioner[K](partitions: Int) extends Partitioner[K] {
|
|
||||||
def numPartitions = partitions
|
|
||||||
|
|
||||||
def getPartition(key: K) = {
|
|
||||||
val mod = key.hashCode % partitions
|
|
||||||
if (mod < 0) mod + partitions else mod // Careful of negative hash codes
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@serializable
|
@serializable
|
||||||
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
|
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
|
||||||
|
// Methods that must be implemented by subclasses
|
||||||
def splits: Array[Split]
|
def splits: Array[Split]
|
||||||
def iterator(split: Split): Iterator[T]
|
def compute(split: Split): Iterator[T]
|
||||||
def preferredLocations(split: Split): Seq[String]
|
def preferredLocations(split: Split): Seq[String]
|
||||||
|
val dependencies: List[Dependency[_]]
|
||||||
|
|
||||||
val dependencies: List[Dependency[_]] = Nil
|
// Optionally overridden by subclasses to specify how they are partitioned
|
||||||
val partitioner: Option[Partitioner[_]] = None
|
val partitioner: Option[Partitioner[_]] = None
|
||||||
|
|
||||||
def sparkContext = sc
|
def context = sc
|
||||||
|
|
||||||
|
// Get a unique ID for this RDD
|
||||||
|
val id = sc.newRddId()
|
||||||
|
|
||||||
|
// Variables relating to caching
|
||||||
|
private var shouldCache = false
|
||||||
|
|
||||||
|
// Change this RDD's caching
|
||||||
|
def cache(): RDD[T] = {
|
||||||
|
shouldCache = true
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read this RDD; will read from cache if applicable, or otherwise compute
|
||||||
|
final def iterator(split: Split): Iterator[T] = {
|
||||||
|
if (shouldCache) {
|
||||||
|
RDDCache.getOrCompute[T](this, split)
|
||||||
|
} else {
|
||||||
|
compute(split)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transformations
|
||||||
|
|
||||||
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
|
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
|
||||||
|
|
||||||
def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
|
def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
|
||||||
new FlatMappedRDD(this, sc.clean(f))
|
new FlatMappedRDD(this, sc.clean(f))
|
||||||
|
|
||||||
/*
|
|
||||||
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
|
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
|
||||||
|
|
||||||
def cache() = new CachedRDD(this)
|
|
||||||
|
|
||||||
def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] =
|
def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] =
|
||||||
new SampledRDD(this, withReplacement, frac, seed)
|
new SampledRDD(this, withReplacement, frac, seed)
|
||||||
|
|
||||||
|
def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
|
||||||
|
|
||||||
|
def ++(other: RDD[T]): RDD[T] = this.union(other)
|
||||||
|
|
||||||
|
def glom(): RDD[Array[T]] = new SplitRDD(this)
|
||||||
|
|
||||||
|
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
|
||||||
|
new CartesianRDD(sc, this, other)
|
||||||
|
|
||||||
|
def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
|
||||||
|
this.map(t => (func(t), t)).groupByKey(numSplits)
|
||||||
|
|
||||||
|
def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
|
||||||
|
groupBy[K](func, sc.numCores)
|
||||||
|
|
||||||
|
// Parallel operations
|
||||||
|
|
||||||
def foreach(f: T => Unit) {
|
def foreach(f: T => Unit) {
|
||||||
val cleanF = sc.clean(f)
|
val cleanF = sc.clean(f)
|
||||||
val tasks = splits.map(s => new ForeachTask(this, s, cleanF)).toArray
|
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
|
||||||
sc.runTaskObjects(tasks)
|
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
def collect(): Array[T] = {
|
def collect(): Array[T] = {
|
||||||
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
|
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
|
||||||
Array.concat(results: _*)
|
Array.concat(results: _*)
|
||||||
}
|
}
|
||||||
|
|
||||||
def toArray(): Array[T] = collect()
|
|
||||||
|
|
||||||
def reduce(f: (T, T) => T): T = {
|
def reduce(f: (T, T) => T): T = {
|
||||||
val cleanF = sc.clean(f)
|
val cleanF = sc.clean(f)
|
||||||
val reducePartition: Iterator[T] => Option[T] = iter => {
|
val reducePartition: Iterator[T] => Option[T] = iter => {
|
||||||
|
@ -112,6 +106,14 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
|
||||||
return results.reduceLeft(f)
|
return results.reduceLeft(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def count(): Long = {
|
||||||
|
sc.runJob(this, (iter: Iterator[T]) => iter.size.toLong).sum
|
||||||
|
}
|
||||||
|
|
||||||
|
def toArray(): Array[T] = collect()
|
||||||
|
|
||||||
|
// TODO: Reimplement these to properly build any shuffle dependencies on
|
||||||
|
// the cluster rather than attempting to compute a partiton on the master
|
||||||
/*
|
/*
|
||||||
def take(num: Int): Array[T] = {
|
def take(num: Int): Array[T] = {
|
||||||
if (num == 0)
|
if (num == 0)
|
||||||
|
@ -130,280 +132,44 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
|
||||||
case _ => throw new UnsupportedOperationException("empty collection")
|
case _ => throw new UnsupportedOperationException("empty collection")
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
def count(): Long = {
|
|
||||||
try {
|
|
||||||
map(x => 1L).reduce(_+_)
|
|
||||||
} catch {
|
|
||||||
case e: UnsupportedOperationException => 0L // No elements in RDD
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
|
|
||||||
|
|
||||||
def ++(other: RDD[T]): RDD[T] = this.union(other)
|
|
||||||
|
|
||||||
//def splitRdd(): RDD[Array[T]] = new SplitRDD(this)
|
|
||||||
|
|
||||||
//def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
|
|
||||||
// new CartesianRDD(sc, this, other)
|
|
||||||
|
|
||||||
def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
|
|
||||||
this.map(t => (func(t), t)).groupByKey(numSplits)
|
|
||||||
|
|
||||||
def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
|
|
||||||
groupBy[K](func, sc.numCores)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class MappedRDD[U: ClassManifest, T: ClassManifest](
|
class MappedRDD[U: ClassManifest, T: ClassManifest](
|
||||||
prev: RDD[T], f: T => U)
|
prev: RDD[T], f: T => U)
|
||||||
extends RDD[U](prev.sparkContext) {
|
extends RDD[U](prev.context) {
|
||||||
override def splits = prev.splits
|
override def splits = prev.splits
|
||||||
override def preferredLocations(split: Split) = prev.preferredLocations(split)
|
override def preferredLocations(split: Split) = prev.preferredLocations(split)
|
||||||
override val dependencies = List(new OneToOneDependency(prev))
|
override val dependencies = List(new OneToOneDependency(prev))
|
||||||
override def iterator(split: Split) = prev.iterator(split).map(f)
|
override def compute(split: Split) = prev.iterator(split).map(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
|
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
|
||||||
prev: RDD[T], f: T => Traversable[U])
|
prev: RDD[T], f: T => Traversable[U])
|
||||||
extends RDD[U](prev.sparkContext) {
|
extends RDD[U](prev.context) {
|
||||||
override def splits = prev.splits
|
override def splits = prev.splits
|
||||||
override def preferredLocations(split: Split) = prev.preferredLocations(split)
|
override def preferredLocations(split: Split) = prev.preferredLocations(split)
|
||||||
override val dependencies = List(new OneToOneDependency(prev))
|
override val dependencies = List(new OneToOneDependency(prev))
|
||||||
override def iterator(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator
|
override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
class FilteredRDD[T: ClassManifest](
|
class FilteredRDD[T: ClassManifest](
|
||||||
prev: RDD[T], f: T => Boolean)
|
prev: RDD[T], f: T => Boolean)
|
||||||
extends RDD[T](prev.sparkContext) {
|
extends RDD[T](prev.context) {
|
||||||
override def splits = prev.splits
|
override def splits = prev.splits
|
||||||
override def preferredLocations(split: Split) = prev.preferredLocations(split)
|
override def preferredLocations(split: Split) = prev.preferredLocations(split)
|
||||||
override def iterator(split: Split) = prev.iterator(split).filter(f)
|
override val dependencies = List(new OneToOneDependency(prev))
|
||||||
|
override def compute(split: Split) = prev.iterator(split).filter(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
class SplitRDD[T: ClassManifest](prev: RDD[T])
|
class SplitRDD[T: ClassManifest](prev: RDD[T])
|
||||||
extends RDD[Array[T]](prev.sparkContext) {
|
extends RDD[Array[T]](prev.context) {
|
||||||
override def splits = prev.splits
|
override def splits = prev.splits
|
||||||
override def preferredLocations(split: Split) = prev.preferredLocations(split)
|
override def preferredLocations(split: Split) = prev.preferredLocations(split)
|
||||||
override def iterator(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
|
override val dependencies = List(new OneToOneDependency(prev))
|
||||||
|
override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {
|
|
||||||
override def getId() =
|
|
||||||
"SeededSplit(" + prev.getId() + ", seed " + seed + ")"
|
|
||||||
}
|
|
||||||
|
|
||||||
class SampledRDD[T: ClassManifest](
|
|
||||||
prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int)
|
|
||||||
extends RDD[T](prev.sparkContext) {
|
|
||||||
|
|
||||||
@transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SeededSplit(x, rg.nextInt)) }
|
|
||||||
|
|
||||||
override def splits = splits_.asInstanceOf[Array[Split]]
|
|
||||||
|
|
||||||
override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SeededSplit].prev)
|
|
||||||
|
|
||||||
override def iterator(splitIn: Split) = {
|
|
||||||
val split = splitIn.asInstanceOf[SeededSplit]
|
|
||||||
val rg = new Random(split.seed);
|
|
||||||
// Sampling with replacement (TODO: use reservoir sampling to make this more efficient?)
|
|
||||||
if (withReplacement) {
|
|
||||||
val oldData = prev.iterator(split.prev).toArray
|
|
||||||
val sampleSize = (oldData.size * frac).ceil.toInt
|
|
||||||
val sampledData = for (i <- 1 to sampleSize) yield oldData(rg.nextInt(oldData.size)) // all of oldData's indices are candidates, even if sampleSize < oldData.size
|
|
||||||
sampledData.iterator
|
|
||||||
}
|
|
||||||
// Sampling without replacement
|
|
||||||
else {
|
|
||||||
prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class CachedRDD[T](
|
|
||||||
prev: RDD[T])(implicit m: ClassManifest[T])
|
|
||||||
extends RDD[T](prev.sparkContext) with Logging {
|
|
||||||
val id = CachedRDD.newId()
|
|
||||||
@transient val cacheLocs = Map[Split, List[String]]()
|
|
||||||
|
|
||||||
override def splits = prev.splits
|
|
||||||
|
|
||||||
override def preferredLocations(split: Split) = {
|
|
||||||
if (cacheLocs.contains(split))
|
|
||||||
cacheLocs(split)
|
|
||||||
else
|
|
||||||
prev.preferredLocations(split)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def iterator(split: Split): Iterator[T] = {
|
|
||||||
val key = id + "::" + split.getId()
|
|
||||||
logInfo("CachedRDD split key is " + key)
|
|
||||||
val cache = CachedRDD.cache
|
|
||||||
val loading = CachedRDD.loading
|
|
||||||
val cachedVal = cache.get(key)
|
|
||||||
if (cachedVal != null) {
|
|
||||||
// Split is in cache, so just return its values
|
|
||||||
return Iterator.fromArray(cachedVal.asInstanceOf[Array[T]])
|
|
||||||
} else {
|
|
||||||
// Mark the split as loading (unless someone else marks it first)
|
|
||||||
loading.synchronized {
|
|
||||||
if (loading.contains(key)) {
|
|
||||||
while (loading.contains(key)) {
|
|
||||||
try {loading.wait()} catch {case _ =>}
|
|
||||||
}
|
|
||||||
return Iterator.fromArray(cache.get(key).asInstanceOf[Array[T]])
|
|
||||||
} else {
|
|
||||||
loading.add(key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If we got here, we have to load the split
|
|
||||||
logInfo("Loading and caching " + split)
|
|
||||||
val array = prev.iterator(split).toArray(m)
|
|
||||||
cache.put(key, array)
|
|
||||||
loading.synchronized {
|
|
||||||
loading.remove(key)
|
|
||||||
loading.notifyAll()
|
|
||||||
}
|
|
||||||
return Iterator.fromArray(array)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override def taskStarted(split: Split, slot: SlaveOffer) {
|
|
||||||
val oldList = cacheLocs.getOrElse(split, Nil)
|
|
||||||
val host = slot.getHost
|
|
||||||
if (!oldList.contains(host))
|
|
||||||
cacheLocs(split) = host :: oldList
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private object CachedRDD {
|
|
||||||
val nextId = new AtomicLong(0) // Generates IDs for cached RDDs (on master)
|
|
||||||
def newId() = nextId.getAndIncrement()
|
|
||||||
|
|
||||||
// Stores map results for various splits locally (on workers)
|
|
||||||
val cache = Cache.newKeySpace()
|
|
||||||
|
|
||||||
// Remembers which splits are currently being loaded (on workers)
|
|
||||||
val loading = new HashSet[String]
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
@serializable
|
|
||||||
class UnionSplit[T: ClassManifest](rdd: RDD[T], index: Int, split: Split)
|
|
||||||
extends Split {
|
|
||||||
def iterator() = rdd.iterator(split)
|
|
||||||
def preferredLocations() = rdd.preferredLocations(split)
|
|
||||||
override def getId() = "UnionSplit(" + index + ", " + split.getId() + ")"
|
|
||||||
}
|
|
||||||
|
|
||||||
@serializable
|
|
||||||
class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
|
|
||||||
extends RDD[T](sc) {
|
|
||||||
@transient val splits_ : Array[Split] = {
|
|
||||||
val splits: Seq[Split] =
|
|
||||||
for ((rdd, index) <- rdds.zipWithIndex; split <- rdd.splits)
|
|
||||||
yield new UnionSplit(rdd, index, split)
|
|
||||||
splits.toArray
|
|
||||||
}
|
|
||||||
|
|
||||||
override def splits = splits_
|
|
||||||
|
|
||||||
override def iterator(s: Split): Iterator[T] =
|
|
||||||
s.asInstanceOf[UnionSplit[T]].iterator()
|
|
||||||
|
|
||||||
override def preferredLocations(s: Split): Seq[String] =
|
|
||||||
s.asInstanceOf[UnionSplit[T]].preferredLocations()
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
|
|
||||||
override def getId() =
|
|
||||||
"CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")"
|
|
||||||
}
|
|
||||||
|
|
||||||
@serializable
|
|
||||||
class CartesianRDD[T: ClassManifest, U:ClassManifest](
|
|
||||||
sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U])
|
|
||||||
extends RDD[Pair[T, U]](sc) {
|
|
||||||
@transient val splits_ = {
|
|
||||||
// create the cross product split
|
|
||||||
rdd2.splits.map(y => rdd1.splits.map(x => new CartesianSplit(x, y))).flatten
|
|
||||||
}
|
|
||||||
|
|
||||||
override def splits = splits_.asInstanceOf[Array[Split]]
|
|
||||||
|
|
||||||
override def preferredLocations(split: Split) = {
|
|
||||||
val currSplit = split.asInstanceOf[CartesianSplit]
|
|
||||||
rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def iterator(split: Split) = {
|
|
||||||
val currSplit = split.asInstanceOf[CartesianSplit]
|
|
||||||
for (x <- rdd1.iterator(currSplit.s1); y <- rdd2.iterator(currSplit.s2)) yield (x, y)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def taskStarted(split: Split, slot: SlaveOffer) = {
|
|
||||||
val currSplit = split.asInstanceOf[CartesianSplit]
|
|
||||||
rdd1.taskStarted(currSplit.s1, slot)
|
|
||||||
rdd2.taskStarted(currSplit.s2, slot)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
class ShuffledRDDSplit(val id: Int) extends Split {
|
|
||||||
override def getId() = "ShuffleRDDSplit(" + id + ")"
|
|
||||||
}
|
|
||||||
|
|
||||||
class ShuffledRDD[K, V, C](
|
|
||||||
parent: RDD[(K, V)],
|
|
||||||
aggregator: Aggregator[K, V, C],
|
|
||||||
partitioner: Partitioner[K])
|
|
||||||
extends RDD[(K, C)](parent.sparkContext) {
|
|
||||||
@transient val splits_ =
|
|
||||||
Array.tabulate[Split](partitioner.numPartitions)(i => new ShuffledRDDSplit(i))
|
|
||||||
|
|
||||||
val dep = new ShuffleDependency(sparkContext.newShuffleId, parent, aggregator, partitioner)
|
|
||||||
|
|
||||||
override def splits = splits_
|
|
||||||
|
|
||||||
override def preferredLocations(split: Split) = Nil
|
|
||||||
|
|
||||||
override def iterator(split: Split): Iterator[(K, C)] = {
|
|
||||||
val shuffleId = dep.shuffleId
|
|
||||||
val splitId = split.asInstanceOf[ShuffledRDDSplit].id
|
|
||||||
val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
|
|
||||||
val serverUris = MapOutputTracker.getServerUris(shuffleId)
|
|
||||||
for ((serverUri, index) <- serverUris.zipWithIndex) {
|
|
||||||
splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index
|
|
||||||
}
|
|
||||||
val combiners = new HashMap[K, C]
|
|
||||||
for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
|
|
||||||
for (i <- inputIds) {
|
|
||||||
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, splitId)
|
|
||||||
val inputStream = new ObjectInputStream(new URL(url).openStream())
|
|
||||||
try {
|
|
||||||
while (true) {
|
|
||||||
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
|
|
||||||
combiners(k) = combiners.get(k) match {
|
|
||||||
case Some(oldC) => aggregator.mergeCombiners(oldC, c)
|
|
||||||
case None => c
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
case e: EOFException => {}
|
|
||||||
}
|
|
||||||
inputStream.close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
combiners.iterator
|
|
||||||
}
|
|
||||||
|
|
||||||
override val dependencies = List(dep)
|
|
||||||
}
|
|
||||||
|
|
||||||
@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) {
|
@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) {
|
||||||
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
|
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
|
||||||
def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
|
def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
|
||||||
|
@ -427,13 +193,6 @@ extends RDD[(K, C)](parent.sparkContext) {
|
||||||
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
|
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
|
||||||
val partitioner = new HashPartitioner[K](numSplits)
|
val partitioner = new HashPartitioner[K](numSplits)
|
||||||
new ShuffledRDD(self, aggregator, partitioner)
|
new ShuffledRDD(self, aggregator, partitioner)
|
||||||
// TODO
|
|
||||||
/*
|
|
||||||
val shufClass = Class.forName(System.getProperty(
|
|
||||||
"spark.shuffle.class", "spark.LocalFileShuffle"))
|
|
||||||
val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]]
|
|
||||||
shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners)
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
|
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
|
||||||
|
@ -484,7 +243,7 @@ extends RDD[(K, C)](parent.sparkContext) {
|
||||||
join(other, numCores)
|
join(other, numCores)
|
||||||
}
|
}
|
||||||
|
|
||||||
def numCores = self.sparkContext.numCores
|
def numCores = self.context.numCores
|
||||||
|
|
||||||
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
|
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
|
||||||
}
|
}
|
||||||
|
|
92
core/src/main/scala/spark/RDDCache.scala
Normal file
92
core/src/main/scala/spark/RDDCache.scala
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
import scala.actors._
|
||||||
|
import scala.actors.Actor._
|
||||||
|
import scala.actors.remote._
|
||||||
|
|
||||||
|
sealed trait CacheMessage
|
||||||
|
case class CacheEntryAdded(rddId: Int, partition: Int, host: String)
|
||||||
|
case class CacheEntryRemoved(rddId: Int, partition: Int, host: String)
|
||||||
|
|
||||||
|
class RDDCacheTracker extends DaemonActor with Logging {
|
||||||
|
def act() {
|
||||||
|
val port = System.getProperty("spark.master.port", "50501").toInt
|
||||||
|
RemoteActor.alive(port)
|
||||||
|
RemoteActor.register('RDDCacheTracker, self)
|
||||||
|
logInfo("Started on port " + port)
|
||||||
|
|
||||||
|
loop {
|
||||||
|
react {
|
||||||
|
case CacheEntryAdded(rddId, partition, host) =>
|
||||||
|
logInfo("Cache entry added: %s, %s, %s".format(rddId, partition, host))
|
||||||
|
|
||||||
|
case CacheEntryRemoved(rddId, partition, host) =>
|
||||||
|
logInfo("Cache entry removed: %s, %s, %s".format(rddId, partition, host))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
import scala.collection.mutable.HashSet
|
||||||
|
private object RDDCache extends Logging {
|
||||||
|
// Stores map results for various splits locally
|
||||||
|
val cache = Cache.newKeySpace()
|
||||||
|
|
||||||
|
// Remembers which splits are currently being loaded
|
||||||
|
val loading = new HashSet[(Int, Int)]
|
||||||
|
|
||||||
|
// Tracker actor on the master, or remote reference to it on workers
|
||||||
|
var trackerActor: AbstractActor = null
|
||||||
|
|
||||||
|
def initialize(isMaster: Boolean) {
|
||||||
|
if (isMaster) {
|
||||||
|
val tracker = new RDDCacheTracker
|
||||||
|
tracker.start
|
||||||
|
trackerActor = tracker
|
||||||
|
} else {
|
||||||
|
val host = System.getProperty("spark.master.host")
|
||||||
|
val port = System.getProperty("spark.master.port").toInt
|
||||||
|
trackerActor = RemoteActor.select(Node(host, port), 'RDDCacheTracker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gets or computes an RDD split
|
||||||
|
def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
|
||||||
|
: Iterator[T] = {
|
||||||
|
val key = (rdd.id, split.index)
|
||||||
|
logInfo("CachedRDD split key is " + key)
|
||||||
|
val cache = RDDCache.cache
|
||||||
|
val loading = RDDCache.loading
|
||||||
|
val cachedVal = cache.get(key)
|
||||||
|
if (cachedVal != null) {
|
||||||
|
// Split is in cache, so just return its values
|
||||||
|
return Iterator.fromArray(cachedVal.asInstanceOf[Array[T]])
|
||||||
|
} else {
|
||||||
|
// Mark the split as loading (unless someone else marks it first)
|
||||||
|
loading.synchronized {
|
||||||
|
if (loading.contains(key)) {
|
||||||
|
while (loading.contains(key)) {
|
||||||
|
try {loading.wait()} catch {case _ =>}
|
||||||
|
}
|
||||||
|
return Iterator.fromArray(cache.get(key).asInstanceOf[Array[T]])
|
||||||
|
} else {
|
||||||
|
loading.add(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val host = System.getProperty("spark.hostname", Utils.localHostName)
|
||||||
|
trackerActor ! CacheEntryAdded(rdd.id, split.index, host)
|
||||||
|
// If we got here, we have to load the split
|
||||||
|
// TODO: fetch any remote copy of the split that may be available
|
||||||
|
// TODO: also notify the master that we're loading it
|
||||||
|
// TODO: also register a listener for when it unloads
|
||||||
|
logInfo("Computing and caching " + split)
|
||||||
|
val array = rdd.compute(split).toArray(m)
|
||||||
|
cache.put(key, array)
|
||||||
|
loading.synchronized {
|
||||||
|
loading.remove(key)
|
||||||
|
loading.notifyAll()
|
||||||
|
}
|
||||||
|
return Iterator.fromArray(array)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
36
core/src/main/scala/spark/SampledRDD.scala
Normal file
36
core/src/main/scala/spark/SampledRDD.scala
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
import java.util.Random
|
||||||
|
|
||||||
|
@serializable class SampledRDDSplit(val prev: Split, val seed: Int) extends Split {
|
||||||
|
override val index = prev.index
|
||||||
|
}
|
||||||
|
|
||||||
|
class SampledRDD[T: ClassManifest](
|
||||||
|
prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int)
|
||||||
|
extends RDD[T](prev.context) {
|
||||||
|
|
||||||
|
@transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt)) }
|
||||||
|
|
||||||
|
override def splits = splits_.asInstanceOf[Array[Split]]
|
||||||
|
|
||||||
|
override val dependencies = List(new OneToOneDependency(prev))
|
||||||
|
|
||||||
|
override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
|
||||||
|
|
||||||
|
override def compute(splitIn: Split) = {
|
||||||
|
val split = splitIn.asInstanceOf[SampledRDDSplit]
|
||||||
|
val rg = new Random(split.seed);
|
||||||
|
// Sampling with replacement (TODO: use reservoir sampling to make this more efficient?)
|
||||||
|
if (withReplacement) {
|
||||||
|
val oldData = prev.iterator(split.prev).toArray
|
||||||
|
val sampleSize = (oldData.size * frac).ceil.toInt
|
||||||
|
val sampledData = for (i <- 1 to sampleSize) yield oldData(rg.nextInt(oldData.size)) // all of oldData's indices are candidates, even if sampleSize < oldData.size
|
||||||
|
sampledData.iterator
|
||||||
|
}
|
||||||
|
// Sampling without replacement
|
||||||
|
else {
|
||||||
|
prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
60
core/src/main/scala/spark/ShuffledRDD.scala
Normal file
60
core/src/main/scala/spark/ShuffledRDD.scala
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
import java.net.URL
|
||||||
|
import java.io.EOFException
|
||||||
|
import java.io.ObjectInputStream
|
||||||
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
import scala.collection.mutable.HashMap
|
||||||
|
|
||||||
|
class ShuffledRDDSplit(val idx: Int) extends Split {
|
||||||
|
override val index = idx
|
||||||
|
override def hashCode(): Int = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
class ShuffledRDD[K, V, C](
|
||||||
|
parent: RDD[(K, V)],
|
||||||
|
aggregator: Aggregator[K, V, C],
|
||||||
|
part : Partitioner[K])
|
||||||
|
extends RDD[(K, C)](parent.context) {
|
||||||
|
override val partitioner = Some(part)
|
||||||
|
|
||||||
|
@transient val splits_ =
|
||||||
|
Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
|
||||||
|
|
||||||
|
override def splits = splits_
|
||||||
|
|
||||||
|
override def preferredLocations(split: Split) = Nil
|
||||||
|
|
||||||
|
val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
|
||||||
|
override val dependencies = List(dep)
|
||||||
|
|
||||||
|
override def compute(split: Split): Iterator[(K, C)] = {
|
||||||
|
val shuffleId = dep.shuffleId
|
||||||
|
val splitId = split.index
|
||||||
|
val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
|
||||||
|
val serverUris = MapOutputTracker.getServerUris(shuffleId)
|
||||||
|
for ((serverUri, index) <- serverUris.zipWithIndex) {
|
||||||
|
splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index
|
||||||
|
}
|
||||||
|
val combiners = new HashMap[K, C]
|
||||||
|
for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
|
||||||
|
for (i <- inputIds) {
|
||||||
|
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, splitId)
|
||||||
|
val inputStream = new ObjectInputStream(new URL(url).openStream())
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
|
||||||
|
combiners(k) = combiners.get(k) match {
|
||||||
|
case Some(oldC) => aggregator.mergeCombiners(oldC, c)
|
||||||
|
case None => c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
case e: EOFException => {}
|
||||||
|
}
|
||||||
|
inputStream.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
combiners.iterator
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package spark
|
package spark
|
||||||
|
|
||||||
import java.io._
|
import java.io._
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
|
@ -34,6 +35,8 @@ extends Logging {
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
Cache.initialize()
|
Cache.initialize()
|
||||||
Broadcast.initialize(true)
|
Broadcast.initialize(true)
|
||||||
|
MapOutputTracker.initialize(true)
|
||||||
|
RDDCache.initialize(true)
|
||||||
|
|
||||||
// Methods for creating RDDs
|
// Methods for creating RDDs
|
||||||
|
|
||||||
|
@ -43,6 +46,12 @@ extends Logging {
|
||||||
def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] =
|
def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] =
|
||||||
parallelize(seq, numCores)
|
parallelize(seq, numCores)
|
||||||
|
|
||||||
|
def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] =
|
||||||
|
parallelize(seq, numSlices)
|
||||||
|
|
||||||
|
def makeRDD[T: ClassManifest](seq: Seq[T]): RDD[T] =
|
||||||
|
parallelize(seq, numCores)
|
||||||
|
|
||||||
def textFile(path: String): RDD[String] =
|
def textFile(path: String): RDD[String] =
|
||||||
new HadoopTextFile(this, path)
|
new HadoopTextFile(this, path)
|
||||||
|
|
||||||
|
@ -158,12 +167,16 @@ extends Logging {
|
||||||
// Get the number of cores available to run tasks (as reported by Scheduler)
|
// Get the number of cores available to run tasks (as reported by Scheduler)
|
||||||
def numCores = scheduler.numCores
|
def numCores = scheduler.numCores
|
||||||
|
|
||||||
private var nextShuffleId: Int = 0
|
private var nextShuffleId = new AtomicInteger(0)
|
||||||
|
|
||||||
private[spark] def newShuffleId(): Int = {
|
private[spark] def newShuffleId(): Int = {
|
||||||
val id = nextShuffleId
|
nextShuffleId.getAndIncrement()
|
||||||
nextShuffleId += 1
|
}
|
||||||
id
|
|
||||||
|
private var nextRddId = new AtomicInteger(0)
|
||||||
|
|
||||||
|
private[spark] def newRddId(): Int = {
|
||||||
|
nextRddId.getAndIncrement()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,9 +5,10 @@ package spark
|
||||||
*/
|
*/
|
||||||
@serializable trait Split {
|
@serializable trait Split {
|
||||||
/**
|
/**
|
||||||
* Get a unique ID for this split which can be used, for example, to
|
* Get the split's index within its parent RDD
|
||||||
* set up caches based on it. The ID should stay the same if we serialize
|
|
||||||
* and then deserialize the split.
|
|
||||||
*/
|
*/
|
||||||
def getId(): String
|
val index: Int
|
||||||
|
|
||||||
|
// A better default implementation of HashCode
|
||||||
|
override def hashCode(): Int = index
|
||||||
}
|
}
|
||||||
|
|
43
core/src/main/scala/spark/UnionRDD.scala
Normal file
43
core/src/main/scala/spark/UnionRDD.scala
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
|
@serializable
|
||||||
|
class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split)
|
||||||
|
extends Split {
|
||||||
|
def iterator() = rdd.iterator(split)
|
||||||
|
def preferredLocations() = rdd.preferredLocations(split)
|
||||||
|
override val index = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
@serializable
|
||||||
|
class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
|
||||||
|
extends RDD[T](sc) {
|
||||||
|
@transient val splits_ : Array[Split] = {
|
||||||
|
val array = new Array[Split](rdds.map(_.splits.size).sum)
|
||||||
|
var pos = 0
|
||||||
|
for (rdd <- rdds; split <- rdd.splits) {
|
||||||
|
array(pos) = new UnionSplit(pos, rdd, split)
|
||||||
|
pos += 1
|
||||||
|
}
|
||||||
|
array
|
||||||
|
}
|
||||||
|
|
||||||
|
override def splits = splits_
|
||||||
|
|
||||||
|
override val dependencies = {
|
||||||
|
val deps = new ArrayBuffer[Dependency[_]]
|
||||||
|
var pos = 0
|
||||||
|
for ((rdd, index) <- rdds.zipWithIndex) {
|
||||||
|
deps += new RangeDependency(rdd, 0, pos, rdd.splits.size)
|
||||||
|
pos += rdd.splits.size
|
||||||
|
}
|
||||||
|
deps.toList
|
||||||
|
}
|
||||||
|
|
||||||
|
override def compute(s: Split): Iterator[T] =
|
||||||
|
s.asInstanceOf[UnionSplit[T]].iterator()
|
||||||
|
|
||||||
|
override def preferredLocations(s: Split): Seq[String] =
|
||||||
|
s.asInstanceOf[UnionSplit[T]].preferredLocations()
|
||||||
|
}
|
|
@ -124,4 +124,11 @@ object Utils {
|
||||||
// and join them into a string
|
// and join them into a string
|
||||||
return bytes.map(b => (b.toInt + 256) % 256).mkString(".")
|
return bytes.map(b => (b.toInt + 256) % 256).mkString(".")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the local machine's hostname
|
||||||
|
*/
|
||||||
|
def localHostName(): String = {
|
||||||
|
return InetAddress.getLocalHost().getHostName
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue