From 61281756f225902386e8f41d12a3c58f06da3977 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 15 Aug 2013 14:20:59 -0700 Subject: [PATCH] IndexedRDD passes all PairRDD Function tests --- .../src/main/scala/spark/rdd/IndexedRDD.scala | 337 ++++++++++++++++-- .../test/scala/spark/IndexedRDDSuite.scala | 40 ++- 2 files changed, 338 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/spark/rdd/IndexedRDD.scala b/core/src/main/scala/spark/rdd/IndexedRDD.scala index a6852f3f8a..b65efa4447 100644 --- a/core/src/main/scala/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/spark/rdd/IndexedRDD.scala @@ -17,18 +17,21 @@ package spark.rdd +import java.nio.ByteBuffer + + import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer -import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext} - -import spark.PairRDDFunctions +import spark._ +import spark.rdd._ import spark.SparkContext._ -import spark.SparkException -import spark.Partitioner +import spark.Partitioner._ + + // import java.io.{ObjectOutputStream, IOException} @@ -110,7 +113,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( val newValues = valuesRDD.mapPartitions( _.map{ groups: Seq[Seq[V]] => groups.map{ group: Seq[V] => - if(group != null && !group.isEmpty) { + if (group != null && !group.isEmpty) { val c: C = createCombiner(group.head) val sum: C = group.tail.foldLeft(c)(mergeValue) Seq(sum) @@ -123,15 +126,129 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } + /** + * Simplified version of combineByKey that hash-partitions the output RDD. + */ + def combineByKey[C: ClassManifest](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + numPartitions: Int): IndexedRDD[K, C] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) + } + + + /** + * Simplified version of combineByKey that hash-partitions the resulting RDD using the + * existing partitioner/parallelism level. + */ + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C) + : IndexedRDD[K, C] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(index)) + } + + + /** + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). + */ + def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): IndexedRDD[K, V] = { + // Serialize the zero value to a byte array so that we can get a new clone of it on each key + val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue) + val zeroArray = new Array[Byte](zeroBuffer.limit) + zeroBuffer.get(zeroArray) + + // When deserializing, use a lazy val to create just one instance of the serializer per task + lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() + def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) + combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner) + } + + /** + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). + */ + def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): IndexedRDD[K, V] = { + foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) + } + + + /** + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). + */ + def foldByKey(zeroValue: V)(func: (V, V) => V): IndexedRDD[K, V] = { + foldByKey(zeroValue, defaultPartitioner(index))(func) + } + + + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. + */ + def reduceByKey(partitioner: Partitioner, func: (V, V) => V): IndexedRDD[K, V] = { + combineByKey[V]((v: V) => v, func, func, partitioner) + } + + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. + */ + def reduceByKey(func: (V, V) => V, numPartitions: Int): IndexedRDD[K, V] = { + reduceByKey(new HashPartitioner(numPartitions), func) + } + + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ + * parallelism level. + */ + def reduceByKey(func: (V, V) => V): IndexedRDD[K, V] = { + reduceByKey(defaultPartitioner(index), func) + } + + + /** + * Group the values for each key in the RDD into a single sequence. Allows controlling the + * partitioning of the resulting key-value pair RDD by passing a Partitioner. + */ + def groupByKey(partitioner: Partitioner): IndexedRDD[K, Seq[V]] = { + val newValues = valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) + new IndexedRDD[K, Seq[V]](index, newValues) + } + + + /** + * Group the values for each key in the RDD into a single sequence. Hash-partitions the + * resulting RDD with into `numPartitions` partitions. + */ + def groupByKey(numPartitions: Int): IndexedRDD[K, Seq[V]] = { + groupByKey(new HashPartitioner(numPartitions)) + } + + + /** + * Group the values for each key in the RDD into a single sequence. Hash-partitions the + * resulting RDD with the existing partitioner/parallelism level. + */ + def groupByKey(): IndexedRDD[K, Seq[V]] = { + groupByKey(defaultPartitioner(index)) + } + /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W: ClassManifest](other: RDD[(K, W)], partitionerUnused: Partitioner): - IndexedRDD[K, (Seq[V], Seq[W])] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): + IndexedRDD[K, (Seq[V], Seq[W])] = { //RDD[(K, (Seq[V], Seq[W]))] = { - assert(false) other match { case other: IndexedRDD[_, _] if other.index == index => { // if both RDDs share exactly the same index and therefore the same super set of keys @@ -174,7 +291,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( newIndex.putAll(otherIndex) // We need to rekey the index var ctr = 0 - for(e <- newIndex.entrySet) { + for (e <- newIndex.entrySet) { e.setValue(ctr) ctr += 1 } @@ -198,9 +315,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( // Preallocate the new Values array val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size) // Lookup the sequences in both submaps - for((k,ind) <- newIndex) { - val thisSeq = if(thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null - val otherSeq = if(otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null + for ((k,ind) <- newIndex) { + val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null + val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null // if either of the sequences is not null then the key was in one of the two tables // and so the value should appear in the returned table newValues(ind) = (thisSeq, otherSeq) match { @@ -233,27 +350,27 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( // Get the corresponding indicies and values for this IndexedRDD val (thisIndex, thisValues) = thisTuplesIter.next() assert(!thisTuplesIter.hasNext()) + assert(thisIndex.size == thisValues.size) // Construct a new index val newIndex = thisIndex.clone().asInstanceOf[JHashMap[K, Int]] + assert(thisIndex.size == newIndex.size) // Construct a new array Buffer to store the values - val newValues = new ArrayBuffer[(Seq[V], ArrayBuffer[W])](thisValues.size) + val newValues = ArrayBuffer.fill[(Seq[V], ArrayBuffer[W])](thisValues.size)(null) // populate the newValues with the values in this IndexedRDD - for((k,i) <- thisIndex) { - if(thisValues(i) != null) { - newValues(i) = (thisValues(i), new ArrayBuffer[W]()) + for ((k,i) <- thisIndex) { + assert(i < thisValues.size) + if (thisValues(i) != null) { + newValues(i) = (thisValues(i), ArrayBuffer.empty[W]) } } // Now iterate through the other tuples updating the map - for((k,w) <- otherTuplesIter){ - if(!newIndex.contains(k)) { + for ((k,w) <- otherTuplesIter){ + if (!newIndex.contains(k)) { // update the index val ind = newIndex.size newIndex.put(k, ind) - // Create the buffer for w - val wbuffer = new ArrayBuffer[W]() - wbuffer.append(w) // Update the values - newValues.append( (Seq.empty[V], wbuffer) ) + newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) } else { val ind = newIndex.get(k) newValues(ind)._2.append(w) @@ -265,17 +382,167 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( List( (newIndex, newValuesArray) ).iterator }, otherShuffled).cache() - // Extract the index and values from the above RDD - val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + // Extract the index and values from the above RDD + val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) + val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - new IndexedRDD[K, (Seq[V], Seq[W])](newIndex, newValues) - - } + new IndexedRDD[K, (Seq[V], Seq[W])](newIndex, newValues) } } + } + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def cogroup[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Seq[V], Seq[W])] = { + cogroup(other, defaultPartitioner(this, other)) + } + + // /** + // * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + // * tuple with the list of values for that key in `this`, `other1` and `other2`. + // */ + // def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + // : IndexedRDD[K, (Seq[V], Seq[W1], Seq[W2])] = { + // cogroup(other1, other2, defaultPartitioner(this, other1, other2)) + // } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): IndexedRDD[K, (Seq[V], Seq[W])] = { + cogroup(other, new HashPartitioner(numPartitions)) + } + + // /** + // * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + // * tuple with the list of values for that key in `this`, `other1` and `other2`. + // */ + // def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) + // : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + // cogroup(other1, other2, new HashPartitioner(numPartitions)) + // } + + /** Alias for cogroup. */ + def groupWith[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Seq[V], Seq[W])] = { + cogroup(other, defaultPartitioner(this, other)) + } + + // /** Alias for cogroup. */ + // def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + // : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + // cogroup(other1, other2, defaultPartitioner(self, other1, other2)) + // } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + */ + def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): IndexedRDD[K, (V, W)] = { + cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + } + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): IndexedRDD[K, (V, Option[W])] = { + cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + if (ws.isEmpty) { + vs.iterator.map(v => (v, None)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) + } + } + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner) + : IndexedRDD[K, (Option[V], W)] = { + cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + if (vs.isEmpty) { + ws.iterator.map(w => (None, w)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) + } + } + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + */ + def join[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (V, W)] = { + join(other, defaultPartitioner(this, other)) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + */ + def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): IndexedRDD[K, (V, W)] = { + join(other, new HashPartitioner(numPartitions)) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the existing partitioner/parallelism level. + */ + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (V, Option[W])] = { + leftOuterJoin(other, defaultPartitioner(this, other)) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numPartitions` partitions. + */ + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): IndexedRDD[K, (V, Option[W])] = { + leftOuterJoin(other, new HashPartitioner(numPartitions)) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Option[V], W)] = { + rightOuterJoin(other, defaultPartitioner(this, other)) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): IndexedRDD[K, (Option[V], W)] = { + rightOuterJoin(other, new HashPartitioner(numPartitions)) + } + + /** @@ -304,7 +571,7 @@ object IndexedRDD { tbl: RDD[(K,V)], existingIndex: RDD[JHashMap[K,Int]] = null ): IndexedRDD[K, V] = { - if(existingIndex == null) { + if (existingIndex == null) { // Shuffle the table (if necessary) val shuffledTbl = if (tbl.partitioner.isEmpty) { @@ -314,11 +581,11 @@ object IndexedRDD { val groups = shuffledTbl.mapPartitions( iter => { val indexMap = new JHashMap[K, Int]() val values = new ArrayBuffer[Seq[V]]() - for((k,v) <- iter){ + for ((k,v) <- iter){ if(!indexMap.contains(k)) { val ind = indexMap.size indexMap.put(k, ind) - values.append(new ArrayBuffer[V]()) + values.append(ArrayBuffer.empty[V]) } val ind = indexMap.get(k) values(ind).asInstanceOf[ArrayBuffer[V]].append(v) @@ -351,11 +618,11 @@ object IndexedRDD { val index: JHashMap[K,Int] = indexIter.next() assert(!indexIter.hasNext()) val values = new Array[Seq[V]](index.size) - for((k,v) <- tblIter) { + for ((k,v) <- tblIter) { assert(index.contains(k)) val ind = index(k) - if(values(ind) == null){ - values(ind) = new ArrayBuffer[V]() + if (values(ind) == null) { + values(ind) = ArrayBuffer.empty[V] } values(ind).asInstanceOf[ArrayBuffer[V]].append(v) } diff --git a/core/src/test/scala/spark/IndexedRDDSuite.scala b/core/src/test/scala/spark/IndexedRDDSuite.scala index aacb6423ee..f30e1f57fa 100644 --- a/core/src/test/scala/spark/IndexedRDDSuite.scala +++ b/core/src/test/scala/spark/IndexedRDDSuite.scala @@ -31,6 +31,8 @@ import scala.collection.mutable.HashSet import spark.rdd.ShuffledRDD import spark.SparkContext._ +import spark._ + class IndexedRDDSuite extends FunSuite with SharedSparkContext { @@ -165,6 +167,7 @@ class IndexedRDDSuite extends FunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).index() val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).index(rdd1.index) val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) assert(joined.toSet === Set( (1, (1, 'x')), (1, (2, 'x')), @@ -177,8 +180,8 @@ class IndexedRDDSuite extends FunSuite with SharedSparkContext { test("join all-to-all") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))) - val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).index() + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).index(rdd1.index) val joined = rdd1.join(rdd2).collect() assert(joined.size === 6) assert(joined.toSet === Set( @@ -191,8 +194,8 @@ class IndexedRDDSuite extends FunSuite with SharedSparkContext { )) } - test("leftOuterJoin") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + test("leftOuterJoinIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).index() val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.leftOuterJoin(rdd2).collect() assert(joined.size === 5) @@ -205,6 +208,35 @@ class IndexedRDDSuite extends FunSuite with SharedSparkContext { )) } + test("leftOuterJoinIndextoIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).index() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).index() + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("leftOuterJoinIndextoSharedIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).index() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).index(rdd1.index) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (4, (-4, Some('w'))), + (3, (1, None)) + )) + } + test("rightOuterJoin") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))