2012-10-09 21:38:36 -04:00
|
|
|
package spark
|
2011-07-10 00:06:15 -04:00
|
|
|
|
2012-09-19 15:31:45 -04:00
|
|
|
import java.util.{Date, HashMap => JHashMap}
|
2011-12-01 17:01:28 -05:00
|
|
|
import java.text.SimpleDateFormat
|
2011-07-10 00:06:15 -04:00
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
import scala.collection.Map
|
2011-07-10 00:06:15 -04:00
|
|
|
import scala.collection.mutable.ArrayBuffer
|
|
|
|
import scala.collection.mutable.HashMap
|
2012-06-07 03:25:47 -04:00
|
|
|
import scala.collection.JavaConversions._
|
2011-07-10 00:06:15 -04:00
|
|
|
|
2012-07-10 14:11:35 -04:00
|
|
|
import org.apache.hadoop.conf.Configuration
|
2011-12-01 17:01:28 -05:00
|
|
|
import org.apache.hadoop.fs.Path
|
2011-07-13 23:04:06 -04:00
|
|
|
import org.apache.hadoop.mapred.FileOutputCommitter
|
|
|
|
import org.apache.hadoop.mapred.FileOutputFormat
|
2011-07-14 12:40:56 -04:00
|
|
|
import org.apache.hadoop.mapred.HadoopWriter
|
2011-07-13 23:04:06 -04:00
|
|
|
import org.apache.hadoop.mapred.JobConf
|
2011-07-10 00:06:15 -04:00
|
|
|
import org.apache.hadoop.mapred.OutputFormat
|
|
|
|
|
2011-12-01 17:01:28 -05:00
|
|
|
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
|
2012-10-09 18:21:38 -04:00
|
|
|
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext}
|
2011-12-01 17:01:28 -05:00
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
import spark.partial.BoundedDouble
|
|
|
|
import spark.partial.PartialResult
|
2012-10-09 21:38:36 -04:00
|
|
|
import spark.rdd._
|
2012-10-05 21:50:56 -04:00
|
|
|
import spark.SparkContext._
|
2013-02-16 01:29:11 -05:00
|
|
|
import spark.Partitioner._
|
2011-07-10 00:06:15 -04:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
|
2012-10-09 21:38:36 -04:00
|
|
|
* Import `spark.SparkContext._` at the top of your program to use these functions.
|
2011-07-10 00:06:15 -04:00
|
|
|
*/
|
2012-10-02 22:25:26 -04:00
|
|
|
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
|
2012-02-10 11:19:53 -05:00
|
|
|
self: RDD[(K, V)])
|
|
|
|
extends Logging
|
2012-10-09 18:21:38 -04:00
|
|
|
with HadoopMapReduceUtil
|
2012-02-10 11:19:53 -05:00
|
|
|
with Serializable {
|
2011-07-10 00:06:15 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
2012-12-13 18:41:53 -05:00
|
|
|
* Generic function to combine the elements for each key using a custom set of aggregation
|
2012-10-09 21:38:36 -04:00
|
|
|
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
|
|
|
|
* Note that V and C can be different -- for example, one might group an RDD of type
|
|
|
|
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
|
2012-12-13 18:41:53 -05:00
|
|
|
*
|
2012-10-09 21:38:36 -04:00
|
|
|
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
|
|
|
|
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
|
|
|
|
* - `mergeCombiners`, to combine two C's into a single one.
|
|
|
|
*
|
|
|
|
* In addition, users can control the partitioning of the output RDD, and whether to perform
|
|
|
|
* map-side aggregation (if a mapper can produce multiple items with the same key).
|
|
|
|
*/
|
2011-07-10 00:06:15 -04:00
|
|
|
def combineByKey[C](createCombiner: V => C,
|
2012-02-10 11:19:53 -05:00
|
|
|
mergeValue: (C, V) => C,
|
|
|
|
mergeCombiners: (C, C) => C,
|
2012-09-19 15:31:45 -04:00
|
|
|
partitioner: Partitioner,
|
|
|
|
mapSideCombine: Boolean = true): RDD[(K, C)] = {
|
2012-12-30 15:43:06 -05:00
|
|
|
if (getKeyClass().isArray) {
|
|
|
|
if (mapSideCombine) {
|
|
|
|
throw new SparkException("Cannot use map-side combining with array keys.")
|
|
|
|
}
|
|
|
|
if (partitioner.isInstanceOf[HashPartitioner]) {
|
|
|
|
throw new SparkException("Default partitioner cannot partition array keys.")
|
|
|
|
}
|
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
val aggregator =
|
2012-10-08 20:29:33 -04:00
|
|
|
new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
|
2013-02-16 02:16:40 -05:00
|
|
|
if (self.partitioner == Some(partitioner)) {
|
2013-02-16 01:54:03 -05:00
|
|
|
self.mapPartitions(aggregator.combineValuesByKey(_), true)
|
|
|
|
} else if (mapSideCombine) {
|
2012-10-13 17:57:33 -04:00
|
|
|
val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
|
|
|
|
val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
|
|
|
|
partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
|
2012-10-08 20:29:33 -04:00
|
|
|
} else {
|
|
|
|
// Don't apply map-side combiner.
|
|
|
|
// A sanity check to make sure mergeCombiners is not defined.
|
|
|
|
assert(mergeCombiners == null)
|
2012-10-13 17:57:33 -04:00
|
|
|
val values = new ShuffledRDD[K, V](self, partitioner)
|
2012-10-08 20:29:33 -04:00
|
|
|
values.mapPartitions(aggregator.combineValuesByKey(_), true)
|
|
|
|
}
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Simplified version of combineByKey that hash-partitions the output RDD.
|
|
|
|
*/
|
2011-10-09 18:47:20 -04:00
|
|
|
def combineByKey[C](createCombiner: V => C,
|
2012-02-10 11:19:53 -05:00
|
|
|
mergeValue: (C, V) => C,
|
|
|
|
mergeCombiners: (C, C) => C,
|
2013-02-18 01:13:26 -05:00
|
|
|
numPartitions: Int): RDD[(K, C)] = {
|
|
|
|
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
|
2011-10-09 18:47:20 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Merge the values for each key using an associative reduce function. This will also perform
|
|
|
|
* the merging locally on each mapper before sending results to a reducer, similarly to a
|
|
|
|
* "combiner" in MapReduce.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
|
|
|
|
combineByKey[V]((v: V) => v, func, func, partitioner)
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Merge the values for each key using an associative reduce function, but return the results
|
|
|
|
* immediately to the master as a Map. This will also perform the merging locally on each mapper
|
|
|
|
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
|
|
|
|
*/
|
2012-06-07 03:25:47 -04:00
|
|
|
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
|
2012-12-30 15:43:06 -05:00
|
|
|
|
|
|
|
if (getKeyClass().isArray) {
|
|
|
|
throw new SparkException("reduceByKeyLocally() does not support array keys")
|
|
|
|
}
|
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
|
|
|
|
val map = new JHashMap[K, V]
|
|
|
|
for ((k, v) <- iter) {
|
|
|
|
val old = map.get(k)
|
|
|
|
map.put(k, if (old == null) v else func(old, v))
|
|
|
|
}
|
|
|
|
Iterator(map)
|
|
|
|
}
|
|
|
|
|
|
|
|
def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
|
|
|
|
for ((k, v) <- m2) {
|
|
|
|
val old = m1.get(k)
|
|
|
|
m1.put(k, if (old == null) v else func(old, v))
|
|
|
|
}
|
|
|
|
return m1
|
|
|
|
}
|
|
|
|
|
|
|
|
self.mapPartitions(reducePartition).reduce(mergeMaps)
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/** Alias for reduceByKeyLocally */
|
2012-06-07 03:25:47 -04:00
|
|
|
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/** Count the number of elements for each key, and return the result to the master as a Map. */
|
2012-06-07 03:25:47 -04:00
|
|
|
def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
|
|
|
|
|
2012-12-13 18:41:53 -05:00
|
|
|
/**
|
2012-10-09 21:38:36 -04:00
|
|
|
* (Experimental) Approximate version of countByKey that can return a partial result if it does
|
|
|
|
* not finish within a timeout.
|
|
|
|
*/
|
2012-06-07 03:25:47 -04:00
|
|
|
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
|
|
|
|
: PartialResult[Map[K, BoundedDouble]] = {
|
|
|
|
self.map(_._1).countByValueApprox(timeout, confidence)
|
|
|
|
}
|
2011-07-10 00:06:15 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Merge the values for each key using an associative reduce function. This will also perform
|
|
|
|
* the merging locally on each mapper before sending results to a reducer, similarly to a
|
2013-02-18 01:13:26 -05:00
|
|
|
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
2013-02-18 01:13:26 -05:00
|
|
|
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
|
|
|
|
reduceByKey(new HashPartitioner(numPartitions), func)
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Group the values for each key in the RDD into a single sequence. Allows controlling the
|
|
|
|
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
|
|
|
|
*/
|
2011-10-17 14:07:35 -04:00
|
|
|
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
|
|
|
|
def createCombiner(v: V) = ArrayBuffer(v)
|
|
|
|
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
|
|
|
|
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
|
|
|
|
val bufs = combineByKey[ArrayBuffer[V]](
|
2012-06-09 17:44:18 -04:00
|
|
|
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
|
2011-10-17 14:07:35 -04:00
|
|
|
bufs.asInstanceOf[RDD[(K, Seq[V])]]
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
|
2013-02-18 01:13:26 -05:00
|
|
|
* resulting RDD with into `numPartitions` partitions.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
2013-02-18 01:13:26 -05:00
|
|
|
def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
|
|
|
|
groupByKey(new HashPartitioner(numPartitions))
|
2012-06-09 17:44:18 -04:00
|
|
|
}
|
|
|
|
|
2012-09-19 15:31:45 -04:00
|
|
|
/**
|
2012-10-09 21:38:36 -04:00
|
|
|
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
|
|
|
|
* is true, Spark will group values of the same key together on the map side before the
|
|
|
|
* repartitioning, to only send each key over the network once. If a large number of
|
|
|
|
* duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
|
|
|
|
* be set to true.
|
2012-09-19 15:31:45 -04:00
|
|
|
*/
|
|
|
|
def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
|
2012-12-30 15:43:06 -05:00
|
|
|
if (getKeyClass().isArray) {
|
|
|
|
if (mapSideCombine) {
|
|
|
|
throw new SparkException("Cannot use map-side combining with array keys.")
|
|
|
|
}
|
|
|
|
if (partitioner.isInstanceOf[HashPartitioner]) {
|
|
|
|
throw new SparkException("Default partitioner cannot partition array keys.")
|
|
|
|
}
|
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
if (mapSideCombine) {
|
|
|
|
def createCombiner(v: V) = ArrayBuffer(v)
|
|
|
|
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
|
|
|
|
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
|
|
|
|
val bufs = combineByKey[ArrayBuffer[V]](
|
|
|
|
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
|
|
|
|
bufs.flatMapValues(buf => buf)
|
|
|
|
} else {
|
2012-10-13 17:57:33 -04:00
|
|
|
new ShuffledRDD[K, V](self, partitioner)
|
2012-09-19 15:31:45 -04:00
|
|
|
}
|
2011-10-09 18:52:09 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
2013-01-14 00:08:35 -05:00
|
|
|
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
|
|
|
|
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
|
|
|
|
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
|
|
|
|
this.cogroup(other, partitioner).flatMapValues {
|
|
|
|
case (vs, ws) =>
|
|
|
|
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
|
|
|
|
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
|
|
|
|
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
|
|
|
|
* partition the output RDD.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
|
|
|
|
this.cogroup(other, partitioner).flatMapValues {
|
|
|
|
case (vs, ws) =>
|
|
|
|
if (ws.isEmpty) {
|
|
|
|
vs.iterator.map(v => (v, None))
|
|
|
|
} else {
|
|
|
|
for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
|
|
|
|
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
|
|
|
|
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
|
|
|
|
* partition the output RDD.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
|
|
|
|
: RDD[(K, (Option[V], W))] = {
|
|
|
|
this.cogroup(other, partitioner).flatMapValues {
|
|
|
|
case (vs, ws) =>
|
|
|
|
if (vs.isEmpty) {
|
|
|
|
ws.iterator.map(w => (None, w))
|
|
|
|
} else {
|
|
|
|
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-12-13 18:41:53 -05:00
|
|
|
/**
|
2013-02-16 01:45:03 -05:00
|
|
|
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
|
|
|
|
* existing partitioner/parallelism level.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
|
|
|
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
|
|
|
|
: RDD[(K, C)] = {
|
2012-06-09 17:44:18 -04:00
|
|
|
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Merge the values for each key using an associative reduce function. This will also perform
|
|
|
|
* the merging locally on each mapper before sending results to a reducer, similarly to a
|
2013-02-16 01:45:03 -05:00
|
|
|
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
|
|
|
|
* parallelism level.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
2011-07-10 00:06:15 -04:00
|
|
|
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
|
2012-06-09 17:44:18 -04:00
|
|
|
reduceByKey(defaultPartitioner(self), func)
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
|
2013-02-16 01:45:03 -05:00
|
|
|
* resulting RDD with the existing partitioner/parallelism level.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
2011-07-10 00:06:15 -04:00
|
|
|
def groupByKey(): RDD[(K, Seq[V])] = {
|
2012-06-09 17:44:18 -04:00
|
|
|
groupByKey(defaultPartitioner(self))
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
|
|
|
|
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
|
|
|
|
* (k, v2) is in `other`. Performs a hash join across the cluster.
|
|
|
|
*/
|
2011-07-10 00:06:15 -04:00
|
|
|
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
|
2012-06-09 17:44:18 -04:00
|
|
|
join(other, defaultPartitioner(self, other))
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
|
|
|
|
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
|
|
|
|
* (k, v2) is in `other`. Performs a hash join across the cluster.
|
|
|
|
*/
|
2013-02-18 01:13:26 -05:00
|
|
|
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
|
|
|
|
join(other, new HashPartitioner(numPartitions))
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
|
|
|
|
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
|
|
|
|
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
|
2013-02-16 01:45:03 -05:00
|
|
|
* using the existing partitioner/parallelism level.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
2011-07-10 00:06:15 -04:00
|
|
|
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
|
2012-06-09 17:44:18 -04:00
|
|
|
leftOuterJoin(other, defaultPartitioner(self, other))
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
|
|
|
|
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
|
|
|
|
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
|
2013-02-18 01:13:26 -05:00
|
|
|
* into `numPartitions` partitions.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
2013-02-18 01:13:26 -05:00
|
|
|
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
|
|
|
|
leftOuterJoin(other, new HashPartitioner(numPartitions))
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
|
|
|
|
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
|
|
|
|
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
|
2013-02-16 01:45:03 -05:00
|
|
|
* RDD using the existing partitioner/parallelism level.
|
2012-10-09 21:38:36 -04:00
|
|
|
*/
|
2011-07-10 00:06:15 -04:00
|
|
|
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
|
2012-06-09 17:44:18 -04:00
|
|
|
rightOuterJoin(other, defaultPartitioner(self, other))
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
|
|
|
|
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
|
|
|
|
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
|
|
|
|
* RDD into the given number of partitions.
|
|
|
|
*/
|
2013-02-18 01:13:26 -05:00
|
|
|
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
|
|
|
|
rightOuterJoin(other, new HashPartitioner(numPartitions))
|
2012-06-09 17:44:18 -04:00
|
|
|
}
|
2011-07-10 00:06:15 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Return the key-value pairs in this RDD to the master as a Map.
|
|
|
|
*/
|
2011-07-10 00:06:15 -04:00
|
|
|
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Pass each value in the key-value pair RDD through a map function without changing the keys;
|
|
|
|
* this also retains the original RDD's partitioning.
|
|
|
|
*/
|
2011-07-10 00:06:15 -04:00
|
|
|
def mapValues[U](f: V => U): RDD[(K, U)] = {
|
|
|
|
val cleanF = self.context.clean(f)
|
|
|
|
new MappedValuesRDD(self, cleanF)
|
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Pass each value in the key-value pair RDD through a flatMap function without changing the
|
|
|
|
* keys; this also retains the original RDD's partitioning.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
|
2011-07-10 00:06:15 -04:00
|
|
|
val cleanF = self.context.clean(f)
|
|
|
|
new FlatMappedValuesRDD(self, cleanF)
|
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
|
|
|
|
* list of values for that key in `this` as well as `other`.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
|
2012-12-30 15:43:06 -05:00
|
|
|
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
|
|
|
|
throw new SparkException("Default partitioner cannot partition array keys.")
|
|
|
|
}
|
2012-02-10 11:19:53 -05:00
|
|
|
val cg = new CoGroupedRDD[K](
|
2013-02-16 14:10:31 -05:00
|
|
|
Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
|
2012-06-09 17:44:18 -04:00
|
|
|
partitioner)
|
|
|
|
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
|
2011-10-09 18:48:46 -04:00
|
|
|
prfs.mapValues {
|
|
|
|
case Seq(vs, ws) =>
|
|
|
|
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
|
|
|
|
* tuple with the list of values for that key in `this`, `other1` and `other2`.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
|
2011-07-10 00:06:15 -04:00
|
|
|
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
|
2012-12-30 15:43:06 -05:00
|
|
|
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
|
|
|
|
throw new SparkException("Default partitioner cannot partition array keys.")
|
|
|
|
}
|
2012-06-09 17:44:18 -04:00
|
|
|
val cg = new CoGroupedRDD[K](
|
2013-02-16 14:10:31 -05:00
|
|
|
Seq(self.asInstanceOf[RDD[(K, _)]],
|
|
|
|
other1.asInstanceOf[RDD[(K, _)]],
|
|
|
|
other2.asInstanceOf[RDD[(K, _)]]),
|
2012-06-09 17:44:18 -04:00
|
|
|
partitioner)
|
|
|
|
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
|
|
|
|
prfs.mapValues {
|
|
|
|
case Seq(vs, w1s, w2s) =>
|
|
|
|
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
|
|
|
|
* list of values for that key in `this` as well as `other`.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
|
|
|
|
cogroup(other, defaultPartitioner(self, other))
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
|
|
|
|
* tuple with the list of values for that key in `this`, `other1` and `other2`.
|
|
|
|
*/
|
2012-06-09 17:44:18 -04:00
|
|
|
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
|
|
|
|
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
|
|
|
|
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
|
|
|
|
* list of values for that key in `this` as well as `other`.
|
|
|
|
*/
|
2013-02-18 01:13:26 -05:00
|
|
|
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
|
|
|
|
cogroup(other, new HashPartitioner(numPartitions))
|
2012-06-09 17:44:18 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
|
|
|
|
* tuple with the list of values for that key in `this`, `other1` and `other2`.
|
|
|
|
*/
|
2013-02-18 01:13:26 -05:00
|
|
|
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
|
2012-06-09 17:44:18 -04:00
|
|
|
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
|
2013-02-18 01:13:26 -05:00
|
|
|
cogroup(other1, other2, new HashPartitioner(numPartitions))
|
2012-06-09 17:44:18 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/** Alias for cogroup. */
|
2012-06-09 17:44:18 -04:00
|
|
|
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
|
|
|
|
cogroup(other, defaultPartitioner(self, other))
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/** Alias for cogroup. */
|
2012-06-09 17:44:18 -04:00
|
|
|
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
|
|
|
|
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
|
|
|
|
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
|
|
|
|
}
|
|
|
|
|
2013-03-14 11:35:34 -04:00
|
|
|
/**
|
|
|
|
* Return an RDD with the pairs from `this` whose keys are not in `other`.
|
|
|
|
*
|
|
|
|
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
|
|
|
|
* RDD will be <= us.
|
|
|
|
*/
|
|
|
|
def subtractByKey(other: RDD[(K, V)]): RDD[(K, V)] =
|
|
|
|
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
|
|
|
|
|
|
|
|
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
|
|
|
|
def subtractByKey(other: RDD[(K, V)], numPartitions: Int): RDD[(K, V)] =
|
|
|
|
subtractByKey(other, new HashPartitioner(numPartitions))
|
|
|
|
|
|
|
|
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
|
|
|
|
def subtractByKey(other: RDD[(K, V)], p: Partitioner): RDD[(K, V)] =
|
|
|
|
new SubtractedRDD[K, V](self, other, p)
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
|
|
|
|
* RDD has a known partitioner by only searching the partition that the key maps to.
|
|
|
|
*/
|
2011-07-14 12:40:56 -04:00
|
|
|
def lookup(key: K): Seq[V] = {
|
|
|
|
self.partitioner match {
|
|
|
|
case Some(p) =>
|
|
|
|
val index = p.getPartition(key)
|
|
|
|
def process(it: Iterator[(K, V)]): Seq[V] = {
|
|
|
|
val buf = new ArrayBuffer[V]
|
2012-02-10 11:19:53 -05:00
|
|
|
for ((k, v) <- it if k == key) {
|
2011-07-14 12:40:56 -04:00
|
|
|
buf += v
|
2012-02-10 11:19:53 -05:00
|
|
|
}
|
2011-07-14 12:40:56 -04:00
|
|
|
buf
|
|
|
|
}
|
|
|
|
val res = self.context.runJob(self, process _, Array(index), false)
|
|
|
|
res(0)
|
|
|
|
case None =>
|
2013-02-01 18:38:42 -05:00
|
|
|
self.filter(_._1 == key).map(_._2).collect()
|
2011-07-14 12:40:56 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
|
|
|
|
* supporting the key and value types K and V in this RDD.
|
|
|
|
*/
|
2012-07-10 14:16:34 -04:00
|
|
|
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
|
2011-07-13 23:04:06 -04:00
|
|
|
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
|
|
|
|
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
|
|
|
|
*/
|
2012-07-10 14:16:34 -04:00
|
|
|
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
|
2011-12-01 17:01:28 -05:00
|
|
|
saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
|
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
|
|
|
|
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
|
|
|
|
*/
|
2012-02-10 11:19:53 -05:00
|
|
|
def saveAsNewAPIHadoopFile(
|
|
|
|
path: String,
|
|
|
|
keyClass: Class[_],
|
|
|
|
valueClass: Class[_],
|
2012-07-10 14:11:35 -04:00
|
|
|
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
|
2013-01-21 13:42:11 -05:00
|
|
|
conf: Configuration = self.context.hadoopConfiguration) {
|
2012-07-10 14:11:35 -04:00
|
|
|
val job = new NewAPIHadoopJob(conf)
|
2011-12-01 17:01:28 -05:00
|
|
|
job.setOutputKeyClass(keyClass)
|
|
|
|
job.setOutputValueClass(valueClass)
|
|
|
|
val wrappedConf = new SerializableWritable(job.getConfiguration)
|
|
|
|
NewFileOutputFormat.setOutputPath(job, new Path(path))
|
|
|
|
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
|
|
|
|
val jobtrackerID = formatter.format(new Date())
|
|
|
|
val stageId = self.id
|
|
|
|
def writeShard(context: spark.TaskContext, iter: Iterator[(K,V)]): Int = {
|
2012-07-06 18:23:26 -04:00
|
|
|
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
|
|
|
|
// around by taking a mod. We expect that no task will be attempted 2 billion times.
|
|
|
|
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
|
2011-12-01 17:01:28 -05:00
|
|
|
/* "reduce task" <split #> <attempt # = spark task #> */
|
|
|
|
val attemptId = new TaskAttemptID(jobtrackerID,
|
2012-07-06 18:23:26 -04:00
|
|
|
stageId, false, context.splitId, attemptNumber)
|
2012-10-09 18:21:38 -04:00
|
|
|
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
|
2011-12-01 17:01:28 -05:00
|
|
|
val format = outputFormatClass.newInstance
|
|
|
|
val committer = format.getOutputCommitter(hadoopContext)
|
|
|
|
committer.setupTask(hadoopContext)
|
|
|
|
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
|
|
|
|
while (iter.hasNext) {
|
|
|
|
val (k, v) = iter.next
|
|
|
|
writer.write(k, v)
|
|
|
|
}
|
|
|
|
writer.close(hadoopContext)
|
|
|
|
committer.commitTask(hadoopContext)
|
|
|
|
return 1
|
|
|
|
}
|
|
|
|
val jobFormat = outputFormatClass.newInstance
|
|
|
|
/* apparently we need a TaskAttemptID to construct an OutputCommitter;
|
|
|
|
* however we're only going to use this local OutputCommitter for
|
|
|
|
* setupJob/commitJob, so we just use a dummy "map" task.
|
|
|
|
*/
|
|
|
|
val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0)
|
2012-10-09 18:21:38 -04:00
|
|
|
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
|
2011-12-01 17:01:28 -05:00
|
|
|
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
|
|
|
|
jobCommitter.setupJob(jobTaskContext)
|
|
|
|
val count = self.context.runJob(self, writeShard _).sum
|
|
|
|
jobCommitter.cleanupJob(jobTaskContext)
|
|
|
|
}
|
2011-07-10 00:06:15 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
|
|
|
|
* supporting the key and value types K and V in this RDD.
|
|
|
|
*/
|
2012-02-10 11:19:53 -05:00
|
|
|
def saveAsHadoopFile(
|
|
|
|
path: String,
|
|
|
|
keyClass: Class[_],
|
|
|
|
valueClass: Class[_],
|
|
|
|
outputFormatClass: Class[_ <: OutputFormat[_, _]],
|
2013-01-11 12:24:20 -05:00
|
|
|
conf: JobConf = new JobConf(self.context.hadoopConfiguration)) {
|
2011-07-13 23:04:06 -04:00
|
|
|
conf.setOutputKeyClass(keyClass)
|
|
|
|
conf.setOutputValueClass(valueClass)
|
2011-07-13 23:09:33 -04:00
|
|
|
// conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
|
|
|
|
conf.set("mapred.output.format.class", outputFormatClass.getName)
|
2011-07-13 23:04:06 -04:00
|
|
|
conf.setOutputCommitter(classOf[FileOutputCommitter])
|
2011-07-14 12:40:56 -04:00
|
|
|
FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
|
2011-07-13 23:04:06 -04:00
|
|
|
saveAsHadoopDataset(conf)
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
|
|
|
|
* that storage system. The JobConf should set an OutputFormat and any output paths required
|
|
|
|
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
|
|
|
|
* MapReduce job.
|
|
|
|
*/
|
2011-07-13 23:04:06 -04:00
|
|
|
def saveAsHadoopDataset(conf: JobConf) {
|
|
|
|
val outputFormatClass = conf.getOutputFormat
|
|
|
|
val keyClass = conf.getOutputKeyClass
|
|
|
|
val valueClass = conf.getOutputValueClass
|
2012-02-10 11:19:53 -05:00
|
|
|
if (outputFormatClass == null) {
|
2011-07-13 23:04:06 -04:00
|
|
|
throw new SparkException("Output format class not set")
|
2012-02-10 11:19:53 -05:00
|
|
|
}
|
|
|
|
if (keyClass == null) {
|
2011-07-13 23:04:06 -04:00
|
|
|
throw new SparkException("Output key class not set")
|
2012-02-10 11:19:53 -05:00
|
|
|
}
|
|
|
|
if (valueClass == null) {
|
2011-07-13 23:04:06 -04:00
|
|
|
throw new SparkException("Output value class not set")
|
2012-02-10 11:19:53 -05:00
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2011-07-13 23:04:06 -04:00
|
|
|
logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
|
|
|
|
|
2011-07-14 12:40:56 -04:00
|
|
|
val writer = new HadoopWriter(conf)
|
2011-07-10 00:06:15 -04:00
|
|
|
writer.preSetup()
|
|
|
|
|
2012-06-06 19:46:53 -04:00
|
|
|
def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
|
2012-07-06 18:23:26 -04:00
|
|
|
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
|
|
|
|
// around by taking a mod. We expect that no task will be attempted 2 billion times.
|
|
|
|
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
|
|
|
|
|
|
|
|
writer.setup(context.stageId, context.splitId, attemptNumber)
|
2011-07-10 00:06:15 -04:00
|
|
|
writer.open()
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2011-07-10 00:06:15 -04:00
|
|
|
var count = 0
|
|
|
|
while(iter.hasNext) {
|
2013-02-01 18:38:42 -05:00
|
|
|
val record = iter.next()
|
2011-07-10 00:06:15 -04:00
|
|
|
count += 1
|
|
|
|
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
|
|
|
|
}
|
2012-09-19 15:31:45 -04:00
|
|
|
|
2011-07-10 00:06:15 -04:00
|
|
|
writer.close()
|
2012-06-06 19:46:53 -04:00
|
|
|
writer.commit()
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-06-06 19:46:53 -04:00
|
|
|
self.context.runJob(self, writeToFile _)
|
2011-07-10 00:06:15 -04:00
|
|
|
writer.cleanup()
|
|
|
|
}
|
|
|
|
|
2013-01-05 11:54:05 -05:00
|
|
|
/**
|
|
|
|
* Return an RDD with the keys of each tuple.
|
|
|
|
*/
|
|
|
|
def keys: RDD[K] = self.map(_._1)
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Return an RDD with the values of each tuple.
|
|
|
|
*/
|
|
|
|
def values: RDD[V] = self.map(_._2)
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
|
2011-07-10 00:06:15 -04:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
|
|
|
|
* an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
|
|
|
|
* functions. They will work with any key type that has a `scala.math.Ordered` implementation.
|
|
|
|
*/
|
2012-10-02 22:25:26 -04:00
|
|
|
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
|
2012-03-29 18:21:57 -04:00
|
|
|
self: RDD[(K, V)])
|
2012-09-19 15:31:45 -04:00
|
|
|
extends Logging
|
2012-03-29 18:21:57 -04:00
|
|
|
with Serializable {
|
2012-02-11 03:56:28 -05:00
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
/**
|
|
|
|
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
|
|
|
|
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
|
|
|
|
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
|
|
|
|
* order of the keys).
|
|
|
|
*/
|
2013-02-18 01:13:26 -05:00
|
|
|
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K,V)] = {
|
2012-10-08 20:29:33 -04:00
|
|
|
val shuffled =
|
2013-02-18 01:13:26 -05:00
|
|
|
new ShuffledRDD[K, V](self, new RangePartitioner(numPartitions, self, ascending))
|
2012-10-08 20:29:33 -04:00
|
|
|
shuffled.mapPartitions(iter => {
|
|
|
|
val buf = iter.toArray
|
|
|
|
if (ascending) {
|
|
|
|
buf.sortWith((x, y) => x._1 < y._1).iterator
|
|
|
|
} else {
|
|
|
|
buf.sortWith((x, y) => x._1 > y._1).iterator
|
|
|
|
}
|
|
|
|
}, true)
|
2012-02-11 03:56:28 -05:00
|
|
|
}
|
2012-03-29 18:21:57 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
private[spark]
|
2013-01-29 01:30:12 -05:00
|
|
|
class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) {
|
2013-02-18 01:13:26 -05:00
|
|
|
override def getPartitions = firstParent[(K, V)].partitions
|
2012-10-29 14:55:27 -04:00
|
|
|
override val partitioner = firstParent[(K, V)].partitioner
|
2013-02-18 01:13:26 -05:00
|
|
|
override def compute(split: Partition, context: TaskContext) =
|
2012-12-20 17:53:40 -05:00
|
|
|
firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) }
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
|
2012-10-09 21:38:36 -04:00
|
|
|
private[spark]
|
2012-06-09 17:44:18 -04:00
|
|
|
class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
|
2012-12-26 22:09:01 -05:00
|
|
|
extends RDD[(K, U)](prev) {
|
2012-06-09 17:44:18 -04:00
|
|
|
|
2013-02-18 01:13:26 -05:00
|
|
|
override def getPartitions = firstParent[(K, V)].partitions
|
2012-10-29 14:55:27 -04:00
|
|
|
override val partitioner = firstParent[(K, V)].partitioner
|
2013-02-18 01:13:26 -05:00
|
|
|
override def compute(split: Partition, context: TaskContext) = {
|
2012-12-20 17:53:40 -05:00
|
|
|
firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) }
|
2011-07-10 00:06:15 -04:00
|
|
|
}
|
|
|
|
}
|
2011-10-09 18:48:46 -04:00
|
|
|
|
2012-10-02 22:00:19 -04:00
|
|
|
private[spark] object Manifests {
|
2011-10-09 18:48:46 -04:00
|
|
|
val seqSeqManifest = classManifest[Seq[Seq[_]]]
|
|
|
|
}
|