IndexedRDD passes all PairRDD Function tests

This commit is contained in:
Joseph E. Gonzalez 2013-08-15 14:20:59 -07:00
parent 54b54903c3
commit 61281756f2
2 changed files with 338 additions and 39 deletions

View file

@ -17,18 +17,21 @@
package spark.rdd
import java.nio.ByteBuffer
import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import spark.PairRDDFunctions
import spark._
import spark.rdd._
import spark.SparkContext._
import spark.SparkException
import spark.Partitioner
import spark.Partitioner._
// import java.io.{ObjectOutputStream, IOException}
@ -123,15 +126,129 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
/**
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): IndexedRDD[K, C] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
}
/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)
: IndexedRDD[K, C] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(index))
}
/**
* Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): IndexedRDD[K, V] = {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
}
/**
* Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): IndexedRDD[K, V] = {
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
}
/**
* Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V)(func: (V, V) => V): IndexedRDD[K, V] = {
foldByKey(zeroValue, defaultPartitioner(index))(func)
}
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): IndexedRDD[K, V] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
def reduceByKey(func: (V, V) => V, numPartitions: Int): IndexedRDD[K, V] = {
reduceByKey(new HashPartitioner(numPartitions), func)
}
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): IndexedRDD[K, V] = {
reduceByKey(defaultPartitioner(index), func)
}
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): IndexedRDD[K, Seq[V]] = {
val newValues = valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true)
new IndexedRDD[K, Seq[V]](index, newValues)
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): IndexedRDD[K, Seq[V]] = {
groupByKey(new HashPartitioner(numPartitions))
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): IndexedRDD[K, Seq[V]] = {
groupByKey(defaultPartitioner(index))
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W: ClassManifest](other: RDD[(K, W)], partitionerUnused: Partitioner):
def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner):
IndexedRDD[K, (Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = {
assert(false)
other match {
case other: IndexedRDD[_, _] if other.index == index => {
// if both RDDs share exactly the same index and therefore the same super set of keys
@ -233,14 +350,17 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
// Get the corresponding indicies and values for this IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
assert(thisIndex.size == thisValues.size)
// Construct a new index
val newIndex = thisIndex.clone().asInstanceOf[JHashMap[K, Int]]
assert(thisIndex.size == newIndex.size)
// Construct a new array Buffer to store the values
val newValues = new ArrayBuffer[(Seq[V], ArrayBuffer[W])](thisValues.size)
val newValues = ArrayBuffer.fill[(Seq[V], ArrayBuffer[W])](thisValues.size)(null)
// populate the newValues with the values in this IndexedRDD
for ((k,i) <- thisIndex) {
assert(i < thisValues.size)
if (thisValues(i) != null) {
newValues(i) = (thisValues(i), new ArrayBuffer[W]())
newValues(i) = (thisValues(i), ArrayBuffer.empty[W])
}
}
// Now iterate through the other tuples updating the map
@ -249,11 +369,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
// update the index
val ind = newIndex.size
newIndex.put(k, ind)
// Create the buffer for w
val wbuffer = new ArrayBuffer[W]()
wbuffer.append(w)
// Update the values
newValues.append( (Seq.empty[V], wbuffer) )
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
} else {
val ind = newIndex.get(k)
newValues(ind)._2.append(w)
@ -270,9 +387,159 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K, (Seq[V], Seq[W])](newIndex, newValues)
}
}
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Seq[V], Seq[W])] = {
cogroup(other, defaultPartitioner(this, other))
}
// /**
// * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
// * tuple with the list of values for that key in `this`, `other1` and `other2`.
// */
// def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
// : IndexedRDD[K, (Seq[V], Seq[W1], Seq[W2])] = {
// cogroup(other1, other2, defaultPartitioner(this, other1, other2))
// }
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): IndexedRDD[K, (Seq[V], Seq[W])] = {
cogroup(other, new HashPartitioner(numPartitions))
}
// /**
// * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
// * tuple with the list of values for that key in `this`, `other1` and `other2`.
// */
// def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
// : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
// cogroup(other1, other2, new HashPartitioner(numPartitions))
// }
/** Alias for cogroup. */
def groupWith[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Seq[V], Seq[W])] = {
cogroup(other, defaultPartitioner(this, other))
}
// /** Alias for cogroup. */
// def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
// : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
// cogroup(other1, other2, defaultPartitioner(self, other1, other2))
// }
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): IndexedRDD[K, (V, W)] = {
cogroup(other, partitioner).flatMapValues {
case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
}
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): IndexedRDD[K, (V, Option[W])] = {
cogroup(other, partitioner).flatMapValues {
case (vs, ws) =>
if (ws.isEmpty) {
vs.iterator.map(v => (v, None))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
}
}
}
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner)
: IndexedRDD[K, (Option[V], W)] = {
cogroup(other, partitioner).flatMapValues {
case (vs, ws) =>
if (vs.isEmpty) {
ws.iterator.map(w => (None, w))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
}
}
}
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (V, W)] = {
join(other, defaultPartitioner(this, other))
}
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): IndexedRDD[K, (V, W)] = {
join(other, new HashPartitioner(numPartitions))
}
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (V, Option[W])] = {
leftOuterJoin(other, defaultPartitioner(this, other))
}
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): IndexedRDD[K, (V, Option[W])] = {
leftOuterJoin(other, new HashPartitioner(numPartitions))
}
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Option[V], W)] = {
rightOuterJoin(other, defaultPartitioner(this, other))
}
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): IndexedRDD[K, (Option[V], W)] = {
rightOuterJoin(other, new HashPartitioner(numPartitions))
}
@ -318,7 +585,7 @@ object IndexedRDD {
if(!indexMap.contains(k)) {
val ind = indexMap.size
indexMap.put(k, ind)
values.append(new ArrayBuffer[V]())
values.append(ArrayBuffer.empty[V])
}
val ind = indexMap.get(k)
values(ind).asInstanceOf[ArrayBuffer[V]].append(v)
@ -355,7 +622,7 @@ object IndexedRDD {
assert(index.contains(k))
val ind = index(k)
if (values(ind) == null) {
values(ind) = new ArrayBuffer[V]()
values(ind) = ArrayBuffer.empty[V]
}
values(ind).asInstanceOf[ArrayBuffer[V]].append(v)
}

View file

@ -31,6 +31,8 @@ import scala.collection.mutable.HashSet
import spark.rdd.ShuffledRDD
import spark.SparkContext._
import spark._
class IndexedRDDSuite extends FunSuite with SharedSparkContext {
@ -165,6 +167,7 @@ class IndexedRDDSuite extends FunSuite with SharedSparkContext {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).index()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).index(rdd1.index)
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
@ -177,8 +180,8 @@ class IndexedRDDSuite extends FunSuite with SharedSparkContext {
test("join all-to-all") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).index()
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).index(rdd1.index)
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
@ -191,8 +194,8 @@ class IndexedRDDSuite extends FunSuite with SharedSparkContext {
))
}
test("leftOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
test("leftOuterJoinIndex") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).index()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 5)
@ -205,6 +208,35 @@ class IndexedRDDSuite extends FunSuite with SharedSparkContext {
))
}
test("leftOuterJoinIndextoIndex") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).index()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).index()
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (1, Some('x'))),
(1, (2, Some('x'))),
(2, (1, Some('y'))),
(2, (1, Some('z'))),
(3, (1, None))
))
}
test("leftOuterJoinIndextoSharedIndex") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).index()
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).index(rdd1.index)
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (1, Some('x'))),
(1, (2, Some('x'))),
(2, (1, Some('y'))),
(2, (1, Some('z'))),
(4, (-4, Some('w'))),
(3, (1, None))
))
}
test("rightOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))