Switching from Seq to IndexedSeq

This commit is contained in:
Joseph E. Gonzalez 2013-10-17 19:55:36 -07:00
parent 9a03c5fe28
commit 3f3d28c73f
3 changed files with 30 additions and 25 deletions

View file

@ -24,7 +24,6 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.BitSet
@ -72,7 +71,7 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI
*/
class IndexedRDD[K: ClassManifest, V: ClassManifest](
@transient val index: RDDIndex[K],
@transient val valuesRDD: RDD[ (Seq[V], BitSet) ])
@transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
extends RDD[(K, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
@ -119,13 +118,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
*/
def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = {
val cleanF = index.rdd.context.clean(f)
val newValuesRDD = valuesRDD.mapPartitions(iter => iter.map{
val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
valuesRDD.mapPartitions(iter => iter.map{
case (values, bs) =>
val newValues = new Array[U](values.size)
for ( ind <- bs ) {
newValues(ind) = f(values(ind))
}
(newValues.toSeq, bs)
(newValues.toIndexedSeq, bs)
}, preservesPartitioning = true)
new IndexedRDD[K,U](index, newValuesRDD)
}
@ -137,7 +137,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
*/
def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = {
val cleanF = index.rdd.context.clean(f)
val newValues = index.rdd.zipPartitions(valuesRDD){
val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
index.rdd.zipPartitions(valuesRDD){
(keysIter, valuesIter) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
@ -149,7 +150,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
for( (k,i) <- index ) {
if (bs(i)) { newValues(i) = f(k, oldValues(i)) }
}
Array((newValues.toSeq, bs)).iterator
Array((newValues.toIndexedSeq, bs)).iterator
}
new IndexedRDD[K,U](index, newValues)
}
@ -159,7 +160,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
val newValuesRDD: RDD[ (Seq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
(thisIter, otherIter) =>
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
@ -167,7 +168,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
assert(!otherIter.hasNext)
val newBS = thisBS & otherBS
val newValues = thisValues.view.zip(otherValues)
Iterator((newValues, newBS))
Iterator((newValues.toIndexedSeq, newBS))
}
new IndexedRDD(index, newValuesRDD)
}
@ -177,7 +178,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
val newValuesRDD: RDD[ (Seq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
(thisIter, otherIter) =>
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
@ -186,7 +187,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val otherOption = otherValues.view.zipWithIndex
.map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None }
val newValues = thisValues.view.zip(otherOption)
Iterator((newValues, thisBS))
Iterator((newValues.toIndexedSeq, thisBS))
}
new IndexedRDD(index, newValuesRDD)
}
@ -197,6 +198,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a):
IndexedRDD[K, (V, Option[W]) ] = {
val cleanMerge = index.rdd.context.clean(merge)
other match {
case other: IndexedRDD[_, _] if index == other.index => {
leftZipJoin(other)
@ -211,7 +213,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val otherShuffled =
if (other.partitioner == Some(partitioner)) other
else other.partitionBy(partitioner)
val newValues = index.rdd.zipPartitions(valuesRDD, other) {
val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
index.rdd.zipPartitions(valuesRDD, other) {
(thisIndexIter, thisIter, tuplesIter) =>
val index = thisIndexIter.next()
assert(!thisIndexIter.hasNext)
@ -236,7 +239,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
.map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None }
val newValues = thisValues.view.zip(otherOption)
Iterator((newValues.toSeq, thisBS))
Iterator((newValues.toIndexedSeq, thisBS))
} // end of newValues
new IndexedRDD(index, newValues)
}
@ -496,11 +499,12 @@ object IndexedRDD {
values(ind) = reduceFunc(values(ind), v)
}
}
Iterator( (indexMap, (values.toSeq, bs)) )
Iterator( (indexMap, (values.toIndexedSeq, bs)) )
}, true).cache
// extract the index and the values
val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
val values: RDD[(IndexedSeq[V], BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K,V](new RDDIndex(index), values)
}
@ -580,7 +584,7 @@ object IndexedRDD {
}
// Use the index to build the new values table
val values: RDD[ (Seq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// There is only one map
val index = indexIter.next()
assert(!indexIter.hasNext())

View file

@ -60,7 +60,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
*/
override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = {
val cleanF = self.index.rdd.context.clean(f)
val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{
val newValuesRDD: RDD[(IndexedSeq[U], BitSet)] = self.valuesRDD.mapPartitions(iter => iter.map{
case (values, bs) =>
val newValues = new Array[U](values.size)
val newBS = new BitSet(values.size)
@ -71,7 +71,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
newBS(ind) = true
}
}
(newValues.toSeq, newBS)
(newValues.toIndexedSeq, newBS)
}, preservesPartitioning = true)
new IndexedRDD[K,U](self.index, newValuesRDD)
}
@ -120,7 +120,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
// then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value
val newValues =
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
self.valuesRDD.zipPartitions(other.valuesRDD){
(thisIter, otherIter) =>
val (thisValues, thisBS) = thisIter.next()
@ -136,7 +136,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
newValues(ind) = (a, b)
}
Iterator((newValues.toSeq, newBS))
Iterator((newValues.toIndexedSeq, newBS))
}
new IndexedRDD(self.index, newValues)
}
@ -166,7 +166,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
List(newIndex).iterator
}).cache()
// Use the new index along with the this and the other indices to merge the values
val newValues =
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
newIndex.zipPartitions(self.tuples, other.tuples)(
(newIndexIter, thisTuplesIter, otherTuplesIter) => {
// Get the new index for this partition
@ -199,7 +199,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
newBS(ind) = true
}
}
Iterator((newValues.toSeq, newBS))
Iterator((newValues.toIndexedSeq, newBS))
})
new IndexedRDD(new RDDIndex(newIndex), newValues)
}
@ -262,12 +262,13 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
// case null => null
// case (s, ab) => Seq((s, ab.toSeq))
// }.toSeq
Iterator( (newIndex, (newValues.toSeq, newBS)) )
Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
}).cache()
// Extract the index and values from the above RDD
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues)
}

View file

@ -545,7 +545,7 @@ object GraphImpl {
val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){
(mapIter, msgsIter) =>
val (Seq(vidToIndex), bs) = mapIter.next()
val (IndexedSeq(vidToIndex), bs) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.size)
@ -553,7 +553,7 @@ object GraphImpl {
val ind = vidToIndex(msg.data._1)
vertexArray(ind) = msg.data._2
}
Iterator((Seq(vertexArray), bs))
Iterator((IndexedSeq(vertexArray), bs))
}
new IndexedRDD(replicationMap.index, newValuesRDD)