2010-06-17 15:49:42 -04:00
|
|
|
package spark
|
|
|
|
|
2011-02-27 17:27:12 -05:00
|
|
|
import java.io.EOFException
|
|
|
|
import java.net.URL
|
|
|
|
import java.io.ObjectInputStream
|
2010-06-17 15:49:42 -04:00
|
|
|
import java.util.concurrent.atomic.AtomicLong
|
|
|
|
import java.util.HashSet
|
2010-08-18 18:25:57 -04:00
|
|
|
import java.util.Random
|
2011-06-05 07:14:43 -04:00
|
|
|
import java.util.Date
|
2010-06-17 15:49:42 -04:00
|
|
|
|
|
|
|
import scala.collection.mutable.ArrayBuffer
|
|
|
|
import scala.collection.mutable.Map
|
2010-10-03 23:28:20 -04:00
|
|
|
import scala.collection.mutable.HashMap
|
2010-06-17 15:49:42 -04:00
|
|
|
|
2011-06-24 22:51:21 -04:00
|
|
|
import org.apache.hadoop.mapred.JobConf
|
2011-06-09 18:29:22 -04:00
|
|
|
import org.apache.hadoop.mapred.HadoopFileWriter
|
2011-06-24 22:51:21 -04:00
|
|
|
import org.apache.hadoop.mapred.OutputFormat
|
|
|
|
import org.apache.hadoop.mapred.TextOutputFormat
|
|
|
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
|
|
|
import org.apache.hadoop.mapred.OutputCommitter
|
|
|
|
import org.apache.hadoop.mapred.FileOutputCommitter
|
|
|
|
import org.apache.hadoop.io.Writable
|
|
|
|
import org.apache.hadoop.io.NullWritable
|
|
|
|
import org.apache.hadoop.io.BytesWritable
|
|
|
|
import org.apache.hadoop.io.Text
|
2011-06-05 07:14:43 -04:00
|
|
|
|
2010-11-04 02:58:53 -04:00
|
|
|
import SparkContext._
|
|
|
|
|
2010-07-25 23:53:46 -04:00
|
|
|
import mesos._
|
2010-06-17 15:49:42 -04:00
|
|
|
|
|
|
|
@serializable
|
2010-10-23 18:34:03 -04:00
|
|
|
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
|
2011-02-27 22:15:52 -05:00
|
|
|
// Methods that must be implemented by subclasses
|
2010-06-17 15:49:42 -04:00
|
|
|
def splits: Array[Split]
|
2011-02-27 22:15:52 -05:00
|
|
|
def compute(split: Split): Iterator[T]
|
|
|
|
val dependencies: List[Dependency[_]]
|
2011-02-27 02:15:33 -05:00
|
|
|
|
2011-02-27 22:15:52 -05:00
|
|
|
// Optionally overridden by subclasses to specify how they are partitioned
|
2011-03-07 02:38:16 -05:00
|
|
|
val partitioner: Option[Partitioner] = None
|
2011-05-22 20:12:29 -04:00
|
|
|
|
|
|
|
// Optionally overridden by subclasses to specify placement preferences
|
|
|
|
def preferredLocations(split: Split): Seq[String] = Nil
|
2011-02-27 22:15:52 -05:00
|
|
|
|
|
|
|
def context = sc
|
|
|
|
|
|
|
|
// Get a unique ID for this RDD
|
|
|
|
val id = sc.newRddId()
|
|
|
|
|
|
|
|
// Variables relating to caching
|
|
|
|
private var shouldCache = false
|
|
|
|
|
|
|
|
// Change this RDD's caching
|
|
|
|
def cache(): RDD[T] = {
|
|
|
|
shouldCache = true
|
|
|
|
this
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read this RDD; will read from cache if applicable, or otherwise compute
|
|
|
|
final def iterator(split: Split): Iterator[T] = {
|
|
|
|
if (shouldCache) {
|
2011-05-17 15:41:13 -04:00
|
|
|
SparkEnv.get.cacheTracker.getOrCompute[T](this, split)
|
2011-02-27 22:15:52 -05:00
|
|
|
} else {
|
|
|
|
compute(split)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Transformations
|
|
|
|
|
2011-02-27 02:15:33 -05:00
|
|
|
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
|
2011-02-27 17:27:12 -05:00
|
|
|
|
|
|
|
def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
|
|
|
|
new FlatMappedRDD(this, sc.clean(f))
|
2011-02-27 22:15:52 -05:00
|
|
|
|
2011-02-27 02:15:33 -05:00
|
|
|
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
|
2011-02-27 17:27:12 -05:00
|
|
|
|
2011-02-27 02:15:33 -05:00
|
|
|
def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] =
|
2010-10-03 23:28:20 -04:00
|
|
|
new SampledRDD(this, withReplacement, frac, seed)
|
|
|
|
|
2011-02-27 22:15:52 -05:00
|
|
|
def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
|
|
|
|
|
|
|
|
def ++(other: RDD[T]): RDD[T] = this.union(other)
|
|
|
|
|
|
|
|
def glom(): RDD[Array[T]] = new SplitRDD(this)
|
|
|
|
|
|
|
|
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
|
|
|
|
new CartesianRDD(sc, this, other)
|
|
|
|
|
2011-06-24 22:51:21 -04:00
|
|
|
def groupBy[K: ClassManifest](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
|
2011-02-27 22:15:52 -05:00
|
|
|
this.map(t => (func(t), t)).groupByKey(numSplits)
|
|
|
|
|
2011-06-24 22:51:21 -04:00
|
|
|
def groupBy[K: ClassManifest](func: T => K): RDD[(K, Seq[T])] =
|
2011-02-27 22:15:52 -05:00
|
|
|
groupBy[K](func, sc.numCores)
|
|
|
|
|
2011-06-20 02:05:19 -04:00
|
|
|
def pipe(command: String): RDD[String] =
|
|
|
|
new PipedRDD(this, command)
|
|
|
|
|
|
|
|
def pipe(command: Seq[String]): RDD[String] =
|
|
|
|
new PipedRDD(this, command)
|
|
|
|
|
2011-02-27 22:15:52 -05:00
|
|
|
// Parallel operations
|
|
|
|
|
2010-06-17 15:49:42 -04:00
|
|
|
def foreach(f: T => Unit) {
|
|
|
|
val cleanF = sc.clean(f)
|
2011-02-27 22:15:52 -05:00
|
|
|
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
def collect(): Array[T] = {
|
2011-02-27 02:41:44 -05:00
|
|
|
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
|
2010-06-17 15:49:42 -04:00
|
|
|
Array.concat(results: _*)
|
|
|
|
}
|
|
|
|
|
|
|
|
def reduce(f: (T, T) => T): T = {
|
|
|
|
val cleanF = sc.clean(f)
|
2011-02-27 02:15:33 -05:00
|
|
|
val reducePartition: Iterator[T] => Option[T] = iter => {
|
|
|
|
if (iter.hasNext)
|
|
|
|
Some(iter.reduceLeft(f))
|
|
|
|
else
|
|
|
|
None
|
|
|
|
}
|
2011-02-27 02:41:44 -05:00
|
|
|
val options = sc.runJob(this, reducePartition)
|
2010-06-17 15:49:42 -04:00
|
|
|
val results = new ArrayBuffer[T]
|
2011-02-27 02:15:33 -05:00
|
|
|
for (opt <- options; elem <- opt)
|
2010-06-17 15:49:42 -04:00
|
|
|
results += elem
|
|
|
|
if (results.size == 0)
|
|
|
|
throw new UnsupportedOperationException("empty collection")
|
|
|
|
else
|
|
|
|
return results.reduceLeft(f)
|
|
|
|
}
|
2011-02-27 22:15:52 -05:00
|
|
|
|
|
|
|
def count(): Long = {
|
2011-05-13 13:41:34 -04:00
|
|
|
sc.runJob(this, (iter: Iterator[T]) => {
|
|
|
|
var result = 0L
|
|
|
|
while (iter.hasNext) {
|
|
|
|
result += 1L
|
|
|
|
iter.next
|
|
|
|
}
|
|
|
|
result
|
|
|
|
}).sum
|
2011-02-27 22:15:52 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
def toArray(): Array[T] = collect()
|
2011-03-07 02:38:16 -05:00
|
|
|
|
|
|
|
override def toString(): String = {
|
|
|
|
"%s(%d)".format(getClass.getSimpleName, id)
|
|
|
|
}
|
2010-06-17 15:49:42 -04:00
|
|
|
|
2011-05-19 15:47:09 -04:00
|
|
|
// Take the first num elements of the RDD. This currently scans the partitions
|
|
|
|
// *one by one*, so it will be slow if a lot of partitions are required. In that
|
|
|
|
// case, use collect() to get the whole RDD instead.
|
2010-06-17 15:49:42 -04:00
|
|
|
def take(num: Int): Array[T] = {
|
|
|
|
if (num == 0)
|
|
|
|
return new Array[T](0)
|
|
|
|
val buf = new ArrayBuffer[T]
|
2011-05-19 15:47:09 -04:00
|
|
|
var p = 0
|
|
|
|
while (buf.size < num && p < splits.size) {
|
|
|
|
val left = num - buf.size
|
|
|
|
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p))
|
|
|
|
buf ++= res(0)
|
|
|
|
if (buf.size == num)
|
2010-06-17 15:49:42 -04:00
|
|
|
return buf.toArray
|
2011-05-19 15:47:09 -04:00
|
|
|
p += 1
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
return buf.toArray
|
|
|
|
}
|
|
|
|
|
|
|
|
def first: T = take(1) match {
|
|
|
|
case Array(t) => t
|
|
|
|
case _ => throw new UnsupportedOperationException("empty collection")
|
|
|
|
}
|
2011-06-24 22:51:21 -04:00
|
|
|
|
|
|
|
def saveAsTextFile(path: String) {
|
|
|
|
this.map( x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text], FileOutputCommitter](path)
|
|
|
|
}
|
|
|
|
|
|
|
|
def saveAsObjectFile(path: String) {
|
|
|
|
this.glom.map( x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))) ).saveAsSequenceFile(path)
|
|
|
|
}
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
|
2010-08-31 15:08:09 -04:00
|
|
|
class MappedRDD[U: ClassManifest, T: ClassManifest](
|
2010-10-17 00:21:16 -04:00
|
|
|
prev: RDD[T], f: T => U)
|
2011-02-27 22:15:52 -05:00
|
|
|
extends RDD[U](prev.context) {
|
2010-06-17 15:49:42 -04:00
|
|
|
override def splits = prev.splits
|
2011-02-27 17:27:12 -05:00
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
2011-02-27 22:15:52 -05:00
|
|
|
override def compute(split: Split) = prev.iterator(split).map(f)
|
2011-02-27 17:27:12 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
|
|
|
|
prev: RDD[T], f: T => Traversable[U])
|
2011-02-27 22:15:52 -05:00
|
|
|
extends RDD[U](prev.context) {
|
2011-02-27 17:27:12 -05:00
|
|
|
override def splits = prev.splits
|
2011-02-27 02:41:44 -05:00
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
2011-02-27 22:15:52 -05:00
|
|
|
override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator
|
2010-06-17 15:49:42 -04:00
|
|
|
}
|
|
|
|
|
2010-08-31 15:08:09 -04:00
|
|
|
class FilteredRDD[T: ClassManifest](
|
2010-10-17 00:21:16 -04:00
|
|
|
prev: RDD[T], f: T => Boolean)
|
2011-02-27 22:15:52 -05:00
|
|
|
extends RDD[T](prev.context) {
|
2010-06-17 15:49:42 -04:00
|
|
|
override def splits = prev.splits
|
2011-02-27 22:15:52 -05:00
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
|
|
|
override def compute(split: Split) = prev.iterator(split).filter(f)
|
2010-10-03 23:28:20 -04:00
|
|
|
}
|
|
|
|
|
2010-10-17 00:21:16 -04:00
|
|
|
class SplitRDD[T: ClassManifest](prev: RDD[T])
|
2011-02-27 22:15:52 -05:00
|
|
|
extends RDD[Array[T]](prev.context) {
|
2010-08-18 18:25:57 -04:00
|
|
|
override def splits = prev.splits
|
2011-02-27 22:15:52 -05:00
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
2011-05-26 17:04:42 -04:00
|
|
|
override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
|
2011-02-27 17:27:12 -05:00
|
|
|
}
|
|
|
|
|
2010-10-03 23:28:20 -04:00
|
|
|
|
2011-06-24 22:51:21 -04:00
|
|
|
@serializable class PairRDDExtras[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging {
|
2010-11-04 02:51:11 -04:00
|
|
|
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
|
2010-10-03 23:28:20 -04:00
|
|
|
def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
|
|
|
|
for ((k, v) <- m2) {
|
|
|
|
m1.get(k) match {
|
|
|
|
case None => m1(k) = v
|
|
|
|
case Some(w) => m1(k) = func(w, v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return m1
|
|
|
|
}
|
2010-11-04 02:51:11 -04:00
|
|
|
self.map(pair => HashMap(pair)).reduce(mergeMaps)
|
2010-10-03 23:28:20 -04:00
|
|
|
}
|
2010-11-04 00:27:24 -04:00
|
|
|
|
2010-11-04 02:51:11 -04:00
|
|
|
def combineByKey[C](createCombiner: V => C,
|
|
|
|
mergeValue: (C, V) => C,
|
|
|
|
mergeCombiners: (C, C) => C,
|
|
|
|
numSplits: Int)
|
2010-11-08 03:45:02 -05:00
|
|
|
: RDD[(K, C)] =
|
|
|
|
{
|
2011-02-27 17:27:12 -05:00
|
|
|
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
|
2011-03-07 02:38:16 -05:00
|
|
|
val partitioner = new HashPartitioner(numSplits)
|
2011-02-27 17:27:12 -05:00
|
|
|
new ShuffledRDD(self, aggregator, partitioner)
|
2010-11-04 02:51:11 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
|
|
|
|
combineByKey[V]((v: V) => v, func, func, numSplits)
|
|
|
|
}
|
|
|
|
|
|
|
|
def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
|
|
|
|
def createCombiner(v: V) = ArrayBuffer(v)
|
|
|
|
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
|
|
|
|
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
|
|
|
|
val bufs = combineByKey[ArrayBuffer[V]](
|
|
|
|
createCombiner _, mergeValue _, mergeCombiners _, numSplits)
|
|
|
|
bufs.asInstanceOf[RDD[(K, Seq[V])]]
|
|
|
|
}
|
|
|
|
|
|
|
|
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
|
|
|
|
val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
|
|
|
|
val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
|
2010-11-04 02:58:53 -04:00
|
|
|
(vs ++ ws).groupByKey(numSplits).flatMap {
|
2010-11-04 02:51:11 -04:00
|
|
|
case (k, seq) => {
|
|
|
|
val vbuf = new ArrayBuffer[V]
|
|
|
|
val wbuf = new ArrayBuffer[W]
|
|
|
|
seq.foreach(_ match {
|
|
|
|
case Left(v) => vbuf += v
|
|
|
|
case Right(w) => wbuf += w
|
|
|
|
})
|
|
|
|
for (v <- vbuf; w <- wbuf) yield (k, (v, w))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2011-06-24 05:00:51 -04:00
|
|
|
def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
|
|
|
|
val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
|
|
|
|
val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
|
|
|
|
(vs ++ ws).groupByKey(numSplits).flatMap {
|
|
|
|
case (k, seq) => {
|
|
|
|
val vbuf = new ArrayBuffer[V]
|
|
|
|
val wbuf = new ArrayBuffer[Option[W]]
|
|
|
|
seq.foreach(_ match {
|
|
|
|
case Left(v) => vbuf += v
|
|
|
|
case Right(w) => wbuf += Some(w)
|
|
|
|
})
|
|
|
|
if (wbuf.isEmpty) {
|
|
|
|
wbuf += None
|
|
|
|
}
|
|
|
|
for (v <- vbuf; w <- wbuf) yield (k, (v, w))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
|
|
|
|
val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
|
|
|
|
val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
|
|
|
|
(vs ++ ws).groupByKey(numSplits).flatMap {
|
|
|
|
case (k, seq) => {
|
|
|
|
val vbuf = new ArrayBuffer[Option[V]]
|
|
|
|
val wbuf = new ArrayBuffer[W]
|
|
|
|
seq.foreach(_ match {
|
|
|
|
case Left(v) => vbuf += Some(v)
|
|
|
|
case Right(w) => wbuf += w
|
|
|
|
})
|
|
|
|
if (vbuf.isEmpty) {
|
|
|
|
vbuf += None
|
|
|
|
}
|
|
|
|
for (v <- vbuf; w <- wbuf) yield (k, (v, w))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2010-11-04 02:51:11 -04:00
|
|
|
def combineByKey[C](createCombiner: V => C,
|
2010-11-04 00:27:24 -04:00
|
|
|
mergeValue: (C, V) => C,
|
|
|
|
mergeCombiners: (C, C) => C)
|
2010-11-04 02:51:11 -04:00
|
|
|
: RDD[(K, C)] = {
|
|
|
|
combineByKey(createCombiner, mergeValue, mergeCombiners, numCores)
|
|
|
|
}
|
|
|
|
|
|
|
|
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
|
|
|
|
reduceByKey(func, numCores)
|
|
|
|
}
|
|
|
|
|
|
|
|
def groupByKey(): RDD[(K, Seq[V])] = {
|
|
|
|
groupByKey(numCores)
|
2010-11-04 00:27:24 -04:00
|
|
|
}
|
2010-11-04 02:51:11 -04:00
|
|
|
|
|
|
|
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
|
|
|
|
join(other, numCores)
|
|
|
|
}
|
|
|
|
|
2011-06-24 05:00:51 -04:00
|
|
|
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
|
|
|
|
leftOuterJoin(other, numCores)
|
|
|
|
}
|
|
|
|
|
|
|
|
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
|
|
|
|
rightOuterJoin(other, numCores)
|
|
|
|
}
|
|
|
|
|
2011-02-27 22:15:52 -05:00
|
|
|
def numCores = self.context.numCores
|
2010-11-04 02:51:11 -04:00
|
|
|
|
|
|
|
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
|
2011-03-06 22:27:03 -05:00
|
|
|
|
2011-04-12 22:51:58 -04:00
|
|
|
def mapValues[U](f: V => U): RDD[(K, U)] = {
|
2011-03-06 22:27:03 -05:00
|
|
|
val cleanF = self.context.clean(f)
|
|
|
|
new MappedValuesRDD(self, cleanF)
|
|
|
|
}
|
|
|
|
|
2011-04-12 22:51:58 -04:00
|
|
|
def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = {
|
|
|
|
val cleanF = self.context.clean(f)
|
|
|
|
new FlatMappedValuesRDD(self, cleanF)
|
|
|
|
}
|
|
|
|
|
2011-03-07 02:38:16 -05:00
|
|
|
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
|
|
|
|
val part = self.partitioner match {
|
|
|
|
case Some(p) => p
|
|
|
|
case None => new HashPartitioner(numCores)
|
|
|
|
}
|
|
|
|
new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map {
|
|
|
|
case (k, Seq(vs, ws)) =>
|
|
|
|
(k, (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
|
|
|
|
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
|
|
|
|
val part = self.partitioner match {
|
|
|
|
case Some(p) => p
|
|
|
|
case None => new HashPartitioner(numCores)
|
|
|
|
}
|
|
|
|
new CoGroupedRDD[K](
|
|
|
|
Seq(self.asInstanceOf[RDD[(_, _)]],
|
|
|
|
other1.asInstanceOf[RDD[(_, _)]],
|
|
|
|
other2.asInstanceOf[RDD[(_, _)]]),
|
|
|
|
part).map {
|
|
|
|
case (k, Seq(vs, w1s, w2s)) =>
|
|
|
|
(k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
|
|
|
|
}
|
|
|
|
}
|
2011-06-24 22:51:21 -04:00
|
|
|
|
|
|
|
def saveAsHadoopFile (path: String, jobConf: JobConf) {
|
|
|
|
saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf)
|
|
|
|
}
|
|
|
|
|
|
|
|
def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) {
|
|
|
|
saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]])
|
|
|
|
}
|
|
|
|
|
|
|
|
def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) {
|
|
|
|
saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass)
|
|
|
|
}
|
|
|
|
|
|
|
|
def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) {
|
|
|
|
saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null)
|
|
|
|
}
|
|
|
|
|
|
|
|
private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) {
|
|
|
|
logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" )
|
|
|
|
val writer = new HadoopFileWriter(path,
|
|
|
|
keyClass,
|
|
|
|
valueClass,
|
|
|
|
outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]],
|
|
|
|
outputCommitterClass.asInstanceOf[Class[OutputCommitter]],
|
|
|
|
null)
|
|
|
|
writer.preSetup()
|
|
|
|
|
|
|
|
def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = {
|
|
|
|
writer.setup(context.stageId, context.splitId, context.attemptId)
|
|
|
|
writer.open()
|
|
|
|
|
|
|
|
var count = 0
|
|
|
|
while(iter.hasNext) {
|
|
|
|
val record = iter.next
|
|
|
|
count += 1
|
|
|
|
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
|
|
|
|
}
|
|
|
|
|
|
|
|
writer.close()
|
|
|
|
return writer
|
|
|
|
}
|
|
|
|
|
|
|
|
self.context.runJob(self, writeToFile _ ).foreach(_.commit())
|
|
|
|
writer.cleanup()
|
|
|
|
}
|
|
|
|
|
|
|
|
def getKeyClass() = implicitly[ClassManifest[K]].erasure
|
|
|
|
|
|
|
|
def getValueClass() = implicitly[ClassManifest[V]].erasure
|
|
|
|
}
|
|
|
|
|
|
|
|
@serializable
|
|
|
|
class SequencePairRDDExtras[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging {
|
|
|
|
|
|
|
|
def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
|
|
|
|
val c = {
|
|
|
|
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))
|
|
|
|
classManifest[T].erasure
|
|
|
|
else
|
|
|
|
implicitly[T => Writable].getClass.getMethods()(0).getReturnType
|
|
|
|
}
|
|
|
|
c.asInstanceOf[Class[ _ <: Writable]]
|
|
|
|
}
|
|
|
|
|
|
|
|
def saveAsSequenceFile(path: String) {
|
|
|
|
|
|
|
|
def anyToWritable[U <% Writable](u: U): Writable = u
|
|
|
|
|
|
|
|
val keyClass = getWritableClass[K]
|
|
|
|
val valueClass = getWritableClass[V]
|
|
|
|
val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
|
|
|
|
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
|
|
|
|
|
|
|
|
logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
|
|
|
|
if (!convertKey && !convertValue) {
|
|
|
|
self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
|
|
|
|
} else if (!convertKey && convertValue) {
|
|
|
|
self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
|
|
|
|
} else if (convertKey && !convertValue) {
|
|
|
|
self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
|
|
|
|
} else if (convertKey && convertValue) {
|
|
|
|
self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
|
|
|
|
}
|
|
|
|
}
|
2011-06-20 14:59:16 -04:00
|
|
|
|
|
|
|
def lookup(key: K): Seq[V] = {
|
|
|
|
self.partitioner match {
|
|
|
|
case Some(p) =>
|
|
|
|
val index = p.getPartition(key)
|
|
|
|
def process(it: Iterator[(K, V)]): Seq[V] = {
|
|
|
|
val buf = new ArrayBuffer[V]
|
|
|
|
for ((k, v) <- it if k == key)
|
|
|
|
buf += v
|
|
|
|
buf
|
|
|
|
}
|
|
|
|
val res = self.context.runJob(self, process, Array(index))
|
|
|
|
res(0)
|
|
|
|
case None =>
|
|
|
|
throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner")
|
|
|
|
}
|
|
|
|
}
|
2010-10-03 23:28:20 -04:00
|
|
|
}
|
2011-03-06 22:27:03 -05:00
|
|
|
|
|
|
|
class MappedValuesRDD[K, V, U](
|
|
|
|
prev: RDD[(K, V)], f: V => U)
|
|
|
|
extends RDD[(K, U)](prev.context) {
|
|
|
|
override def splits = prev.splits
|
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
|
|
|
override val partitioner = prev.partitioner
|
2011-04-12 22:51:58 -04:00
|
|
|
override def compute(split: Split) =
|
|
|
|
prev.iterator(split).map{case (k, v) => (k, f(v))}
|
|
|
|
}
|
|
|
|
|
|
|
|
class FlatMappedValuesRDD[K, V, U](
|
|
|
|
prev: RDD[(K, V)], f: V => Traversable[U])
|
|
|
|
extends RDD[(K, U)](prev.context) {
|
|
|
|
override def splits = prev.splits
|
|
|
|
override val dependencies = List(new OneToOneDependency(prev))
|
|
|
|
override val partitioner = prev.partitioner
|
|
|
|
override def compute(split: Split) = {
|
|
|
|
prev.iterator(split).toStream.flatMap {
|
|
|
|
case (k, v) => f(v).map(x => (k, x))
|
|
|
|
}.iterator
|
|
|
|
}
|
|
|
|
}
|