Merged with changes to zipPartitions

This commit is contained in:
Joseph E. Gonzalez 2013-08-18 10:57:35 -07:00
parent 2b568520bf
commit 8fd37adf83

View file

@ -262,7 +262,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
// However it is possible that both RDDs are missing a value for a given key in // However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value // which case the returned RDD should have a null value
val newValues = val newValues =
valuesRDD.zipPartitions[ Seq[Seq[W]], Seq[Seq[(Seq[V], Seq[W])]] ]( valuesRDD.zipPartitions(other.valuesRDD)(
(thisIter, otherIter) => { (thisIter, otherIter) => {
val thisValues: Seq[Seq[V]] = thisIter.next() val thisValues: Seq[Seq[V]] = thisIter.next()
assert(!thisIter.hasNext()) assert(!thisIter.hasNext())
@ -277,7 +277,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
case (a, b) => Seq((a,b)) case (a, b) => Seq((a,b))
}.toSeq }.toSeq
List(tmp).iterator List(tmp).iterator
}, other.valuesRDD) })
new IndexedRDD[K, (Seq[V], Seq[W])](index, newValues) new IndexedRDD[K, (Seq[V], Seq[W])](index, newValues)
} }
case other: IndexedRDD[_, _] if other.index.partitioner == index.partitioner => { case other: IndexedRDD[_, _] if other.index.partitioner == index.partitioner => {
@ -285,7 +285,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
// then we we need to first merge the indicies and then use the merged index to // then we we need to first merge the indicies and then use the merged index to
// merge the values. // merge the values.
val newIndex = val newIndex =
index.zipPartitions[JHashMap[K,Int], JHashMap[K,Int]]( (thisIter, otherIter) => { index.zipPartitions(other.index)(
(thisIter, otherIter) => {
val thisIndex = thisIter.next() val thisIndex = thisIter.next()
assert(!thisIter.hasNext()) assert(!thisIter.hasNext())
val otherIndex = otherIter.next() val otherIndex = otherIter.next()
@ -302,13 +303,10 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
ctr += 1 ctr += 1
} }
List(newIndex).iterator List(newIndex).iterator
}, other.index).cache() }).cache()
// Use the new index along with the this and the other indices to merge the values // Use the new index along with the this and the other indices to merge the values
val newValues = val newValues =
newIndex.zipPartitions[ newIndex.zipPartitions(tuples, other.tuples)(
(JHashMap[K, Int], Seq[Seq[V]]),
(JHashMap[K, Int], Seq[Seq[W]]),
Seq[Seq[(Seq[V],Seq[W])]] ](
(newIndexIter, thisTuplesIter, otherTuplesIter) => { (newIndexIter, thisTuplesIter, otherTuplesIter) => {
// Get the new index for this partition // Get the new index for this partition
val newIndex = newIndexIter.next() val newIndex = newIndexIter.next()
@ -334,7 +332,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
} }
} }
List(newValues.toSeq).iterator List(newValues.toSeq).iterator
}, tuples, other.tuples) })
new IndexedRDD(newIndex, newValues) new IndexedRDD(newIndex, newValues)
} }
case _ => { case _ => {
@ -351,7 +349,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
new ShuffledRDD[K,W](other, partitioner) new ShuffledRDD[K,W](other, partitioner)
} }
// Join the other RDD with this RDD building a new valueset and new index on the fly // Join the other RDD with this RDD building a new valueset and new index on the fly
val groups = tuples.zipPartitions[(K, W), (JHashMap[K, Int], Seq[Seq[(Seq[V],Seq[W])]]) ]( val groups =
tuples.zipPartitions(otherShuffled)(
(thisTuplesIter, otherTuplesIter) => { (thisTuplesIter, otherTuplesIter) => {
// Get the corresponding indicies and values for this IndexedRDD // Get the corresponding indicies and values for this IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next() val (thisIndex, thisValues) = thisTuplesIter.next()
@ -383,7 +382,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] =
newValues.view.map{ case (s, ab) => Seq((s, ab.toSeq)) }.toSeq newValues.view.map{ case (s, ab) => Seq((s, ab.toSeq)) }.toSeq
List( (newIndex, newValuesArray) ).iterator List( (newIndex, newValuesArray) ).iterator
}, otherShuffled).cache() }).cache()
// Extract the index and values from the above RDD // Extract the index and values from the above RDD
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
@ -615,7 +614,7 @@ object IndexedRDD {
} }
// Use the index to build the new values table // Use the index to build the new values table
val values = index.zipPartitions[ (K, V), Seq[Seq[V]] ]( val values = index.zipPartitions(shuffledTbl)(
(indexIter, tblIter) => { (indexIter, tblIter) => {
// There is only one map // There is only one map
val index: JHashMap[K,Int] = indexIter.next() val index: JHashMap[K,Int] = indexIter.next()
@ -633,7 +632,7 @@ object IndexedRDD {
values(ind).asInstanceOf[ArrayBuffer[V]].append(v) values(ind).asInstanceOf[ArrayBuffer[V]].append(v)
} }
List(values.toSeq).iterator List(values.toSeq).iterator
}, shuffledTbl) })
new IndexedRDD[K,V](index, values) new IndexedRDD[K,V](index, values)
} }