2010-06-17 15:49:42 -04:00
|
|
|
package spark
|
|
|
|
|
2011-02-27 17:27:12 -05:00
|
|
|
import java.io.EOFException
|
|
|
|
import java.net.URL
|
|
|
|
import java.io.ObjectInputStream
|
2010-06-17 15:49:42 -04:00
|
|
|
import java.util.concurrent.atomic.AtomicLong
|
|
|
|
import java.util.HashSet
|
2010-08-18 18:25:57 -04:00
|
|
|
import java.util.Random
|
2011-06-05 07:14:43 -04:00
|
|
|
import java.util.Date
|
2010-06-17 15:49:42 -04:00
|
|
|
|
|
|
|
import scala.collection.mutable.ArrayBuffer
|
|
|
|
import scala.collection.mutable.Map
|
2010-10-03 23:28:20 -04:00
|
|
|
import scala.collection.mutable.HashMap
|
2010-06-17 15:49:42 -04:00
|
|
|
|
2011-07-14 12:40:56 -04:00
|
|
|
import org.apache.hadoop.io.BytesWritable
|
|
|
|
import org.apache.hadoop.io.NullWritable
|
|
|
|
import org.apache.hadoop.io.Text
|
|
|
|
import org.apache.hadoop.io.Writable
|
|
|
|
import org.apache.hadoop.mapred.FileOutputCommitter
|
|
|
|
import org.apache.hadoop.mapred.HadoopWriter
|
2011-06-24 22:51:21 -04:00
|
|
|
import org.apache.hadoop.mapred.JobConf
|
2011-07-14 12:40:56 -04:00
|
|
|
import org.apache.hadoop.mapred.OutputCommitter
|
2011-06-24 22:51:21 -04:00
|
|
|
import org.apache.hadoop.mapred.OutputFormat
|
|
|
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
2011-07-14 12:40:56 -04:00
|
|
|
import org.apache.hadoop.mapred.TextOutputFormat
|
2011-06-05 07:14:43 -04:00
|
|
|
|
2010-11-04 02:58:53 -04:00
|
|
|
import SparkContext._
|
|
|
|
|
2011-07-10 00:06:15 -04:00
|
|
|
/**
|
|
|
|
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents
|
|
|
|
* an immutable, partitioned collection of elements that can be operated on in parallel.
|
|
|
|
*
|
|
|
|
* Each RDD is characterized by five main properties:
|
|
|
|
* - A list of splits (partitions)
|
|
|
|
* - A function for computing each split
|
|
|
|
* - A list of dependencies on other RDDs
|
|
|
|
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
|
|
|
|
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for HDFS)
|
|
|
|
*
|
|
|
|
* All the scheduling and execution in Spark is done based on these methods, allowing each
|
|
|
|
* RDD to implement its own way of computing itself.
|
|
|
|
*
|
|
|
|
* This class also contains transformation methods available on all RDDs (e.g. map and filter).
|
|
|
|
* In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs,
|
|
|
|
* and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles.
|
|
|
|
*/
|
2011-08-02 05:16:33 -04:00
|
|
|
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable {
|
2011-02-27 22:15:52 -05:00
|
|
|
// Methods that must be implemented by subclasses
|
2010-06-17 15:49:42 -04:00
|
|
|
def splits: Array[Split]
|
2011-02-27 22:15:52 -05:00
|
|
|
def compute(split: Split): Iterator[T]
|
|
|
|
val dependencies: List[Dependency[_]]
|
2011-02-27 02:15:33 -05:00
|
|
|
|
2011-02-27 22:15:52 -05:00
|
|
|
// Optionally overridden by subclasses to specify how they are partitioned
|
2011-03-07 02:38:16 -05:00
|
|
|
val partitioner: Option[Partitioner] = None
|
2011-05-22 20:12:29 -04:00
|
|
|
|
|
|
|
// Optionally overridden by subclasses to specify placement preferences
|
|
|
|
def preferredLocations(split: Split): Seq[String] = Nil
|
2011-02-27 22:15:52 -05:00
|
|
|
|
|
|
|
def context = sc
|
|
|
|
|
|
|
|
// Get a unique ID for this RDD
|
|
|
|
val id = sc.newRddId()
|
|
|
|
|
|
|
|
// Variables relating to caching
|
|
|
|
private var shouldCache = false
|
|
|
|
|
|
|
|
// Change this RDD's caching
|
|
|
|
def cache(): RDD[T] = {
|
|
|
|
shouldCache = true
|
|
|
|
this
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read this RDD; will read from cache if applicable, or otherwise compute
|
|
|
|
final def iterator(split: Split): Iterator[T] = {
|
|
|
|
if (shouldCache) {
|
2011-05-17 15:41:13 -04:00
|
|
|
SparkEnv.get.cacheTracker.getOrCompute[T](this, split)
|
2011-02-27 22:15:52 -05:00
|
|
|
} else {
|
|
|
|
compute(split)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
// Transformations (return a new RDD)
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2011-02-27 02:15:33 -05:00
|
|
|
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
|
2011-02-27 17:27:12 -05:00
|
|
|
|
|
|
|
def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
|
|
|
|
new FlatMappedRDD(this, sc.clean(f))
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2011-02-27 02:15:33 -05:00
|
|
|
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
|
2011-02-27 17:27:12 -05:00
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
|
|
|
|
new SampledRDD(this, withReplacement, fraction, seed)
|
2010-10-03 23:28:20 -04:00
|
|
|
|
2011-02-27 22:15:52 -05:00
|
|
|
def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
|
|
|
|
|
|
|
|
def ++(other: RDD[T]): RDD[T] = this.union(other)
|
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
def glom(): RDD[Array[T]] = new GlommedRDD(this)
|
2011-02-27 22:15:52 -05:00
|
|
|
|
|
|
|
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
|
|
|
|
new CartesianRDD(sc, this, other)
|
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
|
|
|
|
val cleanF = sc.clean(f)
|
|
|
|
this.map(t => (cleanF(t), t)).groupByKey(numSplits)
|
|
|
|
}
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
|
2011-07-14 12:40:56 -04:00
|
|
|
groupBy[K](f, sc.defaultParallelism)
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2011-06-20 02:05:19 -04:00
|
|
|
def pipe(command: String): RDD[String] =
|
|
|
|
new PipedRDD(this, command)
|
|
|
|
|
|
|
|
def pipe(command: Seq[String]): RDD[String] =
|
|
|
|
new PipedRDD(this, command)
|
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] =
|
|
|
|
new MapPartitionsRDD(this, sc.clean(f))
|
|
|
|
|
|
|
|
// Actions (launch a job to return a value to the user program)
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2010-06-17 15:49:42 -04:00
|
|
|
def foreach(f: T => Unit) {
|
|
|
|
val cleanF = sc.clean(f)
|
2011-02-27 22:15:52 -05:00
|
|
|
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
def collect(): Array[T] = {
|
2011-02-27 02:41:44 -05:00
|
|
|
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
|
2010-06-17 15:49:42 -04:00
|
|
|
Array.concat(results: _*)
|
|
|
|
}
|
|
|
|
|
|
|
|
def reduce(f: (T, T) => T): T = {
|
|
|
|
val cleanF = sc.clean(f)
|
2011-02-27 02:15:33 -05:00
|
|
|
val reducePartition: Iterator[T] => Option[T] = iter => {
|
|
|
|
if (iter.hasNext)
|
|
|
|
Some(iter.reduceLeft(f))
|
|
|
|
else
|
|
|
|
None
|
|
|
|
}
|
2011-02-27 02:41:44 -05:00
|
|
|
val options = sc.runJob(this, reducePartition)
|
2010-06-17 15:49:42 -04:00
|
|
|
val results = new ArrayBuffer[T]
|
2011-02-27 02:15:33 -05:00
|
|
|
for (opt <- options; elem <- opt)
|
2010-06-17 15:49:42 -04:00
|
|
|
results += elem
|
|
|
|
if (results.size == 0)
|
|
|
|
throw new UnsupportedOperationException("empty collection")
|
|
|
|
else
|
|
|
|
return results.reduceLeft(f)
|
|
|
|
}
|
2011-02-27 22:15:52 -05:00
|
|
|
|
|
|
|
def count(): Long = {
|
2011-05-13 13:41:34 -04:00
|
|
|
sc.runJob(this, (iter: Iterator[T]) => {
|
|
|
|
var result = 0L
|
|
|
|
while (iter.hasNext) {
|
|
|
|
result += 1L
|
|
|
|
iter.next
|
|
|
|
}
|
|
|
|
result
|
|
|
|
}).sum
|
2011-02-27 22:15:52 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
def toArray(): Array[T] = collect()
|
2011-03-07 02:38:16 -05:00
|
|
|
|
2011-05-19 15:47:09 -04:00
|
|
|
// Take the first num elements of the RDD. This currently scans the partitions
|
|
|
|
// *one by one*, so it will be slow if a lot of partitions are required. In that
|
|
|
|
// case, use collect() to get the whole RDD instead.
|
2010-06-17 15:49:42 -04:00
|
|
|
def take(num: Int): Array[T] = {
|
|
|
|
if (num == 0)
|
|
|
|
return new Array[T](0)
|
|
|
|
val buf = new ArrayBuffer[T]
|
2011-05-19 15:47:09 -04:00
|
|
|
var p = 0
|
|
|
|
while (buf.size < num && p < splits.size) {
|
|
|
|
val left = num - buf.size
|
2011-07-14 12:40:56 -04:00
|
|
|
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
|
2011-05-19 15:47:09 -04:00
|
|
|
buf ++= res(0)
|
|
|
|
if (buf.size == num)
|
2010-06-17 15:49:42 -04:00
|
|
|
return buf.toArray
|
2011-05-19 15:47:09 -04:00
|
|
|
p += 1
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
return buf.toArray
|
|
|
|
}
|
|
|
|
|
2011-07-14 12:40:56 -04:00
|
|
|
def first(): T = take(1) match {
|
2010-06-17 15:49:42 -04:00
|
|
|
case Array(t) => t
|
|
|
|
case _ => throw new UnsupportedOperationException("empty collection")
|
|
|
|
}
|
2011-06-24 22:51:21 -04:00
|
|
|
|
|
|
|
def saveAsTextFile(path: String) {
|
2011-07-13 23:04:06 -04:00
|
|
|
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
|
2011-06-24 22:51:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
def saveAsObjectFile(path: String) {
|
2011-07-13 23:04:06 -04:00
|
|
|
this.glom.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)
|
2011-06-24 22:51:21 -04:00
|
|
|
}
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
|
2010-08-31 15:08:09 -04:00
|
|
|
class MappedRDD[U: ClassManifest, T: ClassManifest](
|
2010-10-17 00:21:16 -04:00
|
|
|
prev: RDD[T], f: T => U)
|
2011-02-27 22:15:52 -05:00
|
|
|
extends RDD[U](prev.context) {
|
2010-06-17 15:49:42 -04:00
|
|
|
override def splits = prev.splits
|
2011-02-27 17:27:12 -05:00
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
2011-02-27 22:15:52 -05:00
|
|
|
override def compute(split: Split) = prev.iterator(split).map(f)
|
2011-02-27 17:27:12 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
|
|
|
|
prev: RDD[T], f: T => Traversable[U])
|
2011-02-27 22:15:52 -05:00
|
|
|
extends RDD[U](prev.context) {
|
2011-02-27 17:27:12 -05:00
|
|
|
override def splits = prev.splits
|
2011-02-27 02:41:44 -05:00
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
2011-06-01 11:12:23 -04:00
|
|
|
override def compute(split: Split) = prev.iterator(split).flatMap(f)
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
|
2010-08-31 15:08:09 -04:00
|
|
|
class FilteredRDD[T: ClassManifest](
|
2010-10-17 00:21:16 -04:00
|
|
|
prev: RDD[T], f: T => Boolean)
|
2011-02-27 22:15:52 -05:00
|
|
|
extends RDD[T](prev.context) {
|
2010-06-17 15:49:42 -04:00
|
|
|
override def splits = prev.splits
|
2011-02-27 22:15:52 -05:00
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
|
|
|
override def compute(split: Split) = prev.iterator(split).filter(f)
|
2010-10-03 23:28:20 -04:00
|
|
|
}
|
|
|
|
|
2011-07-13 00:19:52 -04:00
|
|
|
class GlommedRDD[T: ClassManifest](prev: RDD[T])
|
2011-02-27 22:15:52 -05:00
|
|
|
extends RDD[Array[T]](prev.context) {
|
2010-08-18 18:25:57 -04:00
|
|
|
override def splits = prev.splits
|
2011-02-27 22:15:52 -05:00
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
2011-05-26 17:04:42 -04:00
|
|
|
override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
|
2011-02-27 17:27:12 -05:00
|
|
|
}
|
2011-07-13 00:19:52 -04:00
|
|
|
|
|
|
|
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
|
|
|
|
prev: RDD[T], f: Iterator[T] => Iterator[U])
|
|
|
|
extends RDD[U](prev.context) {
|
|
|
|
override def splits = prev.splits
|
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
|
|
|
override def compute(split: Split) = f(prev.iterator(split))
|
|
|
|
}
|