From 8fd37adf832a7cd234dca64ea7679668b89fafa8 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Sun, 18 Aug 2013 10:57:35 -0700 Subject: [PATCH] Merged with changes to zipPartitions --- .../src/main/scala/spark/rdd/IndexedRDD.scala | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/spark/rdd/IndexedRDD.scala b/core/src/main/scala/spark/rdd/IndexedRDD.scala index 6b1cb7608b..37ff11e639 100644 --- a/core/src/main/scala/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/spark/rdd/IndexedRDD.scala @@ -262,7 +262,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( // However it is possible that both RDDs are missing a value for a given key in // which case the returned RDD should have a null value val newValues = - valuesRDD.zipPartitions[ Seq[Seq[W]], Seq[Seq[(Seq[V], Seq[W])]] ]( + valuesRDD.zipPartitions(other.valuesRDD)( (thisIter, otherIter) => { val thisValues: Seq[Seq[V]] = thisIter.next() assert(!thisIter.hasNext()) @@ -277,7 +277,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( case (a, b) => Seq((a,b)) }.toSeq List(tmp).iterator - }, other.valuesRDD) + }) new IndexedRDD[K, (Seq[V], Seq[W])](index, newValues) } case other: IndexedRDD[_, _] if other.index.partitioner == index.partitioner => { @@ -285,7 +285,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( // then we we need to first merge the indicies and then use the merged index to // merge the values. val newIndex = - index.zipPartitions[JHashMap[K,Int], JHashMap[K,Int]]( (thisIter, otherIter) => { + index.zipPartitions(other.index)( + (thisIter, otherIter) => { val thisIndex = thisIter.next() assert(!thisIter.hasNext()) val otherIndex = otherIter.next() @@ -302,13 +303,10 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( ctr += 1 } List(newIndex).iterator - }, other.index).cache() + }).cache() // Use the new index along with the this and the other indices to merge the values val newValues = - newIndex.zipPartitions[ - (JHashMap[K, Int], Seq[Seq[V]]), - (JHashMap[K, Int], Seq[Seq[W]]), - Seq[Seq[(Seq[V],Seq[W])]] ]( + newIndex.zipPartitions(tuples, other.tuples)( (newIndexIter, thisTuplesIter, otherTuplesIter) => { // Get the new index for this partition val newIndex = newIndexIter.next() @@ -334,7 +332,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } } List(newValues.toSeq).iterator - }, tuples, other.tuples) + }) new IndexedRDD(newIndex, newValues) } case _ => { @@ -351,7 +349,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( new ShuffledRDD[K,W](other, partitioner) } // Join the other RDD with this RDD building a new valueset and new index on the fly - val groups = tuples.zipPartitions[(K, W), (JHashMap[K, Int], Seq[Seq[(Seq[V],Seq[W])]]) ]( + val groups = + tuples.zipPartitions(otherShuffled)( (thisTuplesIter, otherTuplesIter) => { // Get the corresponding indicies and values for this IndexedRDD val (thisIndex, thisValues) = thisTuplesIter.next() @@ -383,7 +382,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = newValues.view.map{ case (s, ab) => Seq((s, ab.toSeq)) }.toSeq List( (newIndex, newValuesArray) ).iterator - }, otherShuffled).cache() + }).cache() // Extract the index and values from the above RDD val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) @@ -615,7 +614,7 @@ object IndexedRDD { } // Use the index to build the new values table - val values = index.zipPartitions[ (K, V), Seq[Seq[V]] ]( + val values = index.zipPartitions(shuffledTbl)( (indexIter, tblIter) => { // There is only one map val index: JHashMap[K,Int] = indexIter.next() @@ -633,7 +632,7 @@ object IndexedRDD { values(ind).asInstanceOf[ArrayBuffer[V]].append(v) } List(values.toSeq).iterator - }, shuffledTbl) + }) new IndexedRDD[K,V](index, values) }