Merge branch 'indexed_rdd' of /Users/jegonzal/Documents/amplab/spark

This commit is contained in:
Joseph E. Gonzalez 2013-09-18 11:58:33 -07:00
commit 9335aff946
7 changed files with 1098 additions and 36 deletions

View file

@ -893,7 +893,7 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
rdd.pairRDDFunctions
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
rdd: RDD[(K, V)]) =

View file

@ -264,8 +264,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other, partitioner))
}
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@ -275,6 +278,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
@ -287,6 +292,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
@ -325,16 +332,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] =
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other))
}
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] =
def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other, numPartitions))
}
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@ -343,6 +356,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
@ -354,6 +369,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* into `numPartitions` partitions.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
@ -365,6 +382,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
@ -376,6 +395,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* RDD into the given number of partitions.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
@ -412,55 +433,86 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W])] =
: JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
}
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
}
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
= fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
}
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
}
/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
}
/** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
}
/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the

View file

@ -68,7 +68,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
def combineByKey[C](createCombiner: V => C,
def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
@ -102,7 +102,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
/**
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
def combineByKey[C](createCombiner: V => C,
def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = {
@ -247,7 +247,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
@ -259,7 +259,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner):
RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
vs.iterator.map(v => (v, None))
@ -275,7 +277,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
@ -290,7 +292,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
: RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
@ -318,7 +320,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
def join[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
join(other, defaultPartitioner(self, other))
}
@ -327,7 +329,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
join(other, new HashPartitioner(numPartitions))
}
@ -337,7 +339,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, defaultPartitioner(self, other))
}
@ -347,7 +349,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, new HashPartitioner(numPartitions))
}
@ -357,7 +359,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, defaultPartitioner(self, other))
}
@ -367,7 +369,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, new HashPartitioner(numPartitions))
}
@ -386,7 +388,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)] = {
def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF)
}
@ -395,7 +397,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
}
@ -404,7 +406,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
@ -419,7 +421,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
@ -435,7 +437,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
@ -443,7 +445,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
@ -452,7 +454,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, new HashPartitioner(numPartitions))
}
@ -460,18 +462,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}
/** Alias for cogroup. */
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
def groupWith[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
/** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
def groupWith[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
@ -692,6 +694,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
*/
def values: RDD[V] = self.map(_._2)
def indexed(numPartitions: Int): IndexedRDD[K,V] =
IndexedRDD(self.partitionBy(new HashPartitioner(numPartitions)))
def indexed(partitioner: Partitioner): IndexedRDD[K,V] =
IndexedRDD(self.partitionBy(partitioner))
def indexed(existingIndex: RDDIndex[K] = null): IndexedRDD[K,V] =
IndexedRDD(self, existingIndex)
private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure

View file

@ -774,6 +774,23 @@ abstract class RDD[T: ClassManifest](
return buf.toArray
}
/**
* For RDD[(K,V)] this function returns a pair-functions object for this RDD
*/
def pairRDDFunctions[K, V](
implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]):
PairRDDFunctions[K, V] = {
new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]])
}
def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] =
IndexedRDD.makeIndex(this, partitioner)
/**
* Return the first element in this RDD.
*/

View file

@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.rdd
import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K,V])
extends PairRDDFunctions[K,V](self) {
/**
* Construct a new IndexedRDD that is indexed by only the keys in the RDD
*/
def reindex(): IndexedRDD[K,V] = IndexedRDD(self)
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = {
val cleanF = self.index.rdd.context.clean(f)
val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{
case null => null
case row => row.map(x => f(x))
}), true)
new IndexedRDD[K,U](self.index, newValues)
}
/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = {
val cleanF = self.index.rdd.context.clean(f)
val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{
case null => null
case row => row.flatMap(x => f(x))
}), true)
new IndexedRDD[K,U](self.index, newValues)
}
/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*/
override def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializerClass: String = null): RDD[(K, C)] = {
val newValues = self.valuesRDD.mapPartitions(
_.map{ groups: Seq[Seq[V]] =>
groups.map{ group: Seq[V] =>
if (group != null && !group.isEmpty) {
val c: C = createCombiner(group.head)
val sum: C = group.tail.foldLeft(c)(mergeValue)
Seq(sum)
} else {
null
}
}
}, true)
new IndexedRDD[K,C](self.index, newValues)
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true)
new IndexedRDD[K, Seq[V]](self.index, newValues)
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
override def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner):
IndexedRDD[K, (Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = {
other match {
case other: IndexedRDD[_, _] if self.index == other.index => {
// if both RDDs share exactly the same index and therefore the same super set of keys
// then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value
val newValues =
self.valuesRDD.zipPartitions(other.valuesRDD)(
(thisIter, otherIter) => {
val thisValues: Seq[Seq[V]] = thisIter.next()
assert(!thisIter.hasNext)
val otherValues: Seq[Seq[W]] = otherIter.next()
assert(!otherIter.hasNext)
// Zip the values and if both arrays are null then the key is not present and
// so the resulting value must be null (not a tuple of empty sequences)
val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{
case (null, null) => null // The key is not present in either RDD
case (a, null) => Seq((a, Seq.empty[W]))
case (null, b) => Seq((Seq.empty[V], b))
case (a, b) => Seq((a,b))
}.toSeq
List(tmp).iterator
})
new IndexedRDD[K, (Seq[V], Seq[W])](self.index, newValues)
}
case other: IndexedRDD[_, _]
if self.index.rdd.partitioner == other.index.rdd.partitioner => {
// If both RDDs are indexed using different indices but with the same partitioners
// then we we need to first merge the indicies and then use the merged index to
// merge the values.
val newIndex =
self.index.rdd.zipPartitions(other.index.rdd)(
(thisIter, otherIter) => {
val thisIndex = thisIter.next()
assert(!thisIter.hasNext)
val otherIndex = otherIter.next()
assert(!otherIter.hasNext)
val newIndex = new BlockIndex[K]()
// @todo Merge only the keys that correspond to non-null values
// Merge the keys
newIndex.putAll(thisIndex)
newIndex.putAll(otherIndex)
// We need to rekey the index
var ctr = 0
for (e <- newIndex.entrySet) {
e.setValue(ctr)
ctr += 1
}
List(newIndex).iterator
}).cache()
// Use the new index along with the this and the other indices to merge the values
val newValues =
newIndex.zipPartitions(self.tuples, other.tuples)(
(newIndexIter, thisTuplesIter, otherTuplesIter) => {
// Get the new index for this partition
val newIndex = newIndexIter.next()
assert(!newIndexIter.hasNext)
// Get the corresponding indicies and values for this and the other IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext)
val (otherIndex, otherValues) = otherTuplesIter.next()
assert(!otherTuplesIter.hasNext)
// Preallocate the new Values array
val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size)
// Lookup the sequences in both submaps
for ((k,ind) <- newIndex) {
val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null
val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null
// if either of the sequences is not null then the key was in one of the two tables
// and so the value should appear in the returned table
newValues(ind) = (thisSeq, otherSeq) match {
case (null, null) => null
case (a, null) => Seq( (a, Seq.empty[W]) )
case (null, b) => Seq( (Seq.empty[V], b) )
case (a, b) => Seq( (a,b) )
}
}
List(newValues.toSeq).iterator
})
new IndexedRDD(new RDDIndex(newIndex), newValues)
}
case _ => {
// Get the partitioner from the index
val partitioner = self.index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Shuffle the other RDD using the partitioner for this index
val otherShuffled =
if (other.partitioner == Some(partitioner)) {
other
} else {
new ShuffledRDD[K, W, (K,W)](other, partitioner)
}
// Join the other RDD with this RDD building a new valueset and new index on the fly
val groups =
self.tuples.zipPartitions(otherShuffled)(
(thisTuplesIter, otherTuplesIter) => {
// Get the corresponding indicies and values for this IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
// Construct a new index
val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]]
// Construct a new array Buffer to store the values
val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null)
// populate the newValues with the values in this IndexedRDD
for ((k,i) <- thisIndex) {
if (thisValues(i) != null) {
newValues(i) = (thisValues(i), ArrayBuffer.empty[W])
}
}
// Now iterate through the other tuples updating the map
for ((k,w) <- otherTuplesIter){
if (!newIndex.contains(k)) {
// update the index
val ind = newIndex.size
newIndex.put(k, ind)
// Update the values
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
} else {
val ind = newIndex.get(k)
if(newValues(ind) == null) {
// If the other key was in the index but not in the values
// of this indexed RDD then create a new values entry for it
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
} else {
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
}
}
}
// Finalize the new values array
val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] =
newValues.view.map{
case null => null
case (s, ab) => Seq((s, ab.toSeq))
}.toSeq
List( (newIndex, newValuesArray) ).iterator
}).cache()
// Extract the index and values from the above RDD
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues)
}
}
}
}
//(self: IndexedRDD[K, V]) extends PairRDDFunctions(self) { }

View file

@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.rdd
import java.nio.ByteBuffer
import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.Partitioner._
import org.apache.spark.storage.StorageLevel
/**
* The BlockIndex is the internal map structure used inside the index
* of the IndexedRDD.
*/
class BlockIndex[@specialized K: ClassManifest] extends JHashMap[K,Int]
/**
* The RDDIndex is an opaque type used to represent the organization
* of values in an RDD
*/
class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockIndex[K]]) {
def persist(newLevel: StorageLevel): RDDIndex[K] = {
rdd.persist(newLevel)
return this
}
}
/**
* An IndexedRDD[K,V] extends the RDD[(K,V)] by pre-indexing the keys and
* organizing the values to enable faster join operations.
*
* In addition to providing the basic RDD[(K,V)] functionality the IndexedRDD
* exposes an index member which can be used to "key" other IndexedRDDs
*
*/
class IndexedRDD[K: ClassManifest, V: ClassManifest](
@transient val index: RDDIndex[K],
@transient val valuesRDD: RDD[ Seq[Seq[V]] ])
extends RDD[(K, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
/**
* An internal representation which joins the block indices with the values
*/
protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
/**
* The partitioner is defined by the index
*/
override val partitioner = index.rdd.partitioner
/**
* The actual partitions are defined by the tuples.
*/
override def getPartitions: Array[Partition] = tuples.getPartitions
/**
* The preferred locations are computed based on the preferred locations of the tuples.
*/
override def getPreferredLocations(s: Partition): Seq[String] =
tuples.getPreferredLocations(s)
/**
* Caching an IndexedRDD causes the index and values to be cached separately.
*/
override def persist(newLevel: StorageLevel): RDD[(K,V)] = {
index.persist(newLevel)
valuesRDD.persist(newLevel)
return this
}
override def pairRDDFunctions[K1, V1](
implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]):
PairRDDFunctions[K1, V1] = {
new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]])
}
/**
* Provide the RDD[(K,V)] equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
tuples.compute(part, context).flatMap { case (indexMap, values) =>
// Walk the index to construct the key, value pairs
indexMap.iterator
// Extract rows with key value pairs and indicators
.map{ case (k, ind) => (k, values(ind)) }
// Remove tuples that aren't actually present in the array
.filter{ case (_, valar) => valar != null && !valar.isEmpty()}
// Extract the pair (removing the indicator from the tuple)
.flatMap{ case (k, valar) => valar.map(v => (k,v))}
}
}
} // End of IndexedRDD
object IndexedRDD {
def apply[K: ClassManifest, V: ClassManifest](
tbl: RDD[(K,V)],
existingIndex: RDDIndex[K] = null ): IndexedRDD[K, V] = {
if (existingIndex == null) {
// Shuffle the table (if necessary)
val shuffledTbl =
if (tbl.partitioner.isEmpty) {
new ShuffledRDD[K, V, (K,V)](tbl, Partitioner.defaultPartitioner(tbl))
} else { tbl }
val groups = shuffledTbl.mapPartitions( iter => {
val indexMap = new BlockIndex[K]()
val values = new ArrayBuffer[Seq[V]]()
for ((k,v) <- iter){
if(!indexMap.contains(k)) {
val ind = indexMap.size
indexMap.put(k, ind)
values.append(ArrayBuffer.empty[V])
}
val ind = indexMap.get(k)
values(ind).asInstanceOf[ArrayBuffer[V]].append(v)
}
List((indexMap, values.toSeq)).iterator
}, true).cache
// extract the index and the values
val index = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K,V](new RDDIndex(index), values)
} else {
val index = existingIndex
val partitioner = index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Shuffle the table according to the index (if necessary)
val shuffledTbl =
if (tbl.partitioner == Some(partitioner)) {
tbl
} else {
new ShuffledRDD[K, V, (K,V)](tbl, partitioner)
}
// Use the index to build the new values table
val values = index.rdd.zipPartitions(shuffledTbl)(
(indexIter, tblIter) => {
// There is only one map
val index = indexIter.next()
assert(!indexIter.hasNext())
val values = new Array[Seq[V]](index.size)
for ((k,v) <- tblIter) {
if (!index.contains(k)) {
throw new SparkException("Error: Trying to bind an external index " +
"to an RDD which contains keys that are not in the index.")
}
val ind = index(k)
if (values(ind) == null) {
values(ind) = ArrayBuffer.empty[V]
}
values(ind).asInstanceOf[ArrayBuffer[V]].append(v)
}
List(values.toSeq).iterator
})
new IndexedRDD[K,V](index, values)
}
}
/**
* Construct and index of the unique values in a given RDD.
*/
def makeIndex[K: ClassManifest](keys: RDD[K],
partitioner: Option[Partitioner] = None): RDDIndex[K] = {
// @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD
// Ugly hack :-(. In order to partition the keys they must have values.
val tbl = keys.mapPartitions(_.map(k => (k, false)), true)
// Shuffle the table (if necessary)
val shuffledTbl = partitioner match {
case None => {
if (tbl.partitioner.isEmpty) {
// @todo: I don't need the boolean its only there to be the second type of the shuffle.
new ShuffledRDD[K, Boolean, (K, Boolean)](tbl, Partitioner.defaultPartitioner(tbl))
} else { tbl }
}
case Some(partitioner) =>
tbl.partitionBy(partitioner)
// new ShuffledRDD[K, Boolean](tbl, partitioner)
}
val index = shuffledTbl.mapPartitions( iter => {
val indexMap = new BlockIndex[K]()
for ( (k,_) <- iter ){
if(!indexMap.contains(k)){
val ind = indexMap.size
indexMap.put(k, ind)
}
}
List(indexMap).iterator
}, true).cache
new RDDIndex(index)
}
}

View file

@ -0,0 +1,460 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import org.scalatest.FunSuite
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
import com.google.common.io.Files
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.rdd.IndexedRDD
import org.apache.spark.SparkContext._
import org.apache.spark._
class IndexedRDDSuite extends FunSuite with SharedSparkContext {
def lineage(rdd: RDD[_]): collection.mutable.HashSet[RDD[_]] = {
val set = new collection.mutable.HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
for (dep <- rdd.dependencies) {
set += dep.rdd
visit(dep.rdd)
}
}
visit(rdd)
set
}
test("groupByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed()
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
test("groupByKey with duplicates") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
test("groupByKey with negative key hash codes") {
val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))).indexed()
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesForMinus1 = groups.find(_._1 == -1).get._2
assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
test("groupByKey with many output partitions") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed(10)
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}
test("reduceByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
val sums = pairs.reduceByKey(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
test("reduceByKey with collectAsMap") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
val sums = pairs.reduceByKey(_+_).collectAsMap()
assert(sums.size === 2)
assert(sums(1) === 7)
assert(sums(2) === 1)
}
test("reduceByKey with many output partitons") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(10)
val sums = pairs.reduceByKey(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
test("reduceByKey with partitioner") {
val p = new Partitioner() {
def numPartitions = 2
def getPartition(key: Any) = key.asInstanceOf[Int]
}
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).indexed(p)
val sums = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set((1, 4), (0, 1)))
assert(sums.partitioner === Some(p))
// count the dependencies to make sure there is only 1 ShuffledRDD
val deps = lineage(sums)
assert(deps.filter(_.isInstanceOf[ShuffledRDD[_,_,_]]).size === 1) // ShuffledRDD, ParallelCollection
}
test("joinIndexVsPair") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z'))
))
}
test("joinIndexVsIndex") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z'))
))
}
test("joinSharedIndex") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).indexed()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index)
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z')),
(4, (-4, 'w')),
(4, (4, 'w'))
))
}
test("join all-to-all") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).indexed()
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).indexed(rdd1.index)
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (1, 'y')),
(1, (2, 'x')),
(1, (2, 'y')),
(1, (3, 'x')),
(1, (3, 'y'))
))
}
test("leftOuterJoinIndex") {
val index = sc.parallelize( 1 to 6 ).makeIndex()
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (1, Some('x'))),
(1, (2, Some('x'))),
(2, (1, Some('y'))),
(2, (1, Some('z'))),
(3, (1, None))
))
}
test("leftOuterJoinIndextoIndex") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (1, Some('x'))),
(1, (2, Some('x'))),
(2, (1, Some('y'))),
(2, (1, Some('z'))),
(3, (1, None))
))
}
test("leftOuterJoinIndextoSharedIndex") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).indexed()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index)
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (1, Some('x'))),
(1, (2, Some('x'))),
(2, (1, Some('y'))),
(2, (1, Some('z'))),
(4, (-4, Some('w'))),
(3, (1, None))
))
}
test("leftOuterJoinIndextoIndexExternal") {
val index = sc.parallelize( 1 to 6 ).makeIndex()
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (1, Some('x'))),
(1, (2, Some('x'))),
(2, (1, Some('y'))),
(2, (1, Some('z'))),
(3, (1, None))
))
}
test("rightOuterJoin") {
val index = sc.parallelize( 1 to 6 ).makeIndex()
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.rightOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (Some(1), 'x')),
(1, (Some(2), 'x')),
(2, (Some(1), 'y')),
(2, (Some(1), 'z')),
(4, (None, 'w'))
))
}
test("rightOuterJoinIndex2Index") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
val joined = rdd1.rightOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (Some(1), 'x')),
(1, (Some(2), 'x')),
(2, (Some(1), 'y')),
(2, (Some(1), 'z')),
(4, (None, 'w'))
))
}
test("rightOuterJoinIndex2Indexshared") {
val index = sc.parallelize( 1 to 6 ).makeIndex()
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
val joined = rdd1.rightOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (Some(1), 'x')),
(1, (Some(2), 'x')),
(2, (Some(1), 'y')),
(2, (Some(1), 'z')),
(4, (None, 'w'))
))
}
test("join with no matches index") {
val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 0)
}
test("join with no matches shared index") {
val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))).indexed(index)
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 0)
}
test("join with many output partitions") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10)
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z'))
))
}
test("join with many output partitions and two indices") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10)
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(20)
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z'))
))
}
test("groupWith") {
val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
val joined = rdd1.groupWith(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
(2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
(3, (ArrayBuffer(1), ArrayBuffer())),
(4, (ArrayBuffer(), ArrayBuffer('w')))
))
}
test("zero-partition RDD") {
val emptyDir = Files.createTempDir()
val file = sc.textFile(emptyDir.getAbsolutePath)
assert(file.partitions.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
test("keys and values") {
val rdd = sc.parallelize(Array((1, "a"), (2, "b"))).indexed()
assert(rdd.keys.collect().toList === List(1, 2))
assert(rdd.values.collect().toList === List("a", "b"))
}
test("default partitioner uses partition size") {
// specify 2000 partitions
val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
// do a map, which loses the partitioner
val b = a.map(a => (a, (a * 2).toString))
// then a group by, and see we didn't revert to 2 partitions
val c = b.groupByKey()
assert(c.partitions.size === 2000)
}
// test("default partitioner uses largest partitioner indexed to indexed") {
// val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2).indexed()
// val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000).indexed()
// val c = a.join(b)
// assert(c.partitions.size === 2000)
// }
test("subtract") {
val a = sc.parallelize(Array(1, 2, 3), 2)
val b = sc.parallelize(Array(2, 3, 4), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set(1))
assert(c.partitions.size === a.partitions.size)
}
test("subtract with narrow dependency") {
// use a deterministic partitioner
val p = new Partitioner() {
def numPartitions = 5
def getPartition(key: Any) = key.asInstanceOf[Int]
}
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).indexed(p)
// more partitions/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set((1, "a"), (3, "c")))
// Ideally we could keep the original partitioner...
assert(c.partitioner === None)
}
test("subtractByKey") {
val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2).indexed()
val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
val c = a.subtractByKey(b)
assert(c.collect().toSet === Set((1, "a"), (1, "a")))
assert(c.partitions.size === a.partitions.size)
}
// test("subtractByKey with narrow dependency") {
// // use a deterministic partitioner
// val p = new Partitioner() {
// def numPartitions = 5
// def getPartition(key: Any) = key.asInstanceOf[Int]
// }
// val index = sc.parallelize( 1 to 6 ).makeIndex(Some(p))
// // partitionBy so we have a narrow dependency
// val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).indexed(index)
// // more partitions/no partitioner so a shuffle dependency
// val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4).indexed(index)
// val c = a.subtractByKey(b)
// assert(c.collect().toSet === Set((1, "a"), (1, "a")))
// assert(c.partitioner.get === p)
// }
test("foldByKey") {
val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index)
val sums = pairs.foldByKey(0)(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
test("foldByKey with mutable result type") {
val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index)
val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
// Fold the values using in-place mutation
val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
// Check that the mutable objects in the original RDD were not changed
assert(bufs.collect().toSet === Set(
(1, ArrayBuffer(1)),
(1, ArrayBuffer(2)),
(1, ArrayBuffer(3)),
(1, ArrayBuffer(1)),
(2, ArrayBuffer(1))))
}
}