diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index 29ea38ec67..f858797df8 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -1,5 +1,8 @@ package org.apache.spark.graph +import org.apache.spark.util.hash.BitSet + + import com.esotericsoftware.kryo.Kryo import org.apache.spark.graph.impl.MessageToPartition @@ -14,7 +17,8 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[MessageToPartition[Object]]) kryo.register(classOf[(Vid, Object)]) kryo.register(classOf[EdgePartition[Object]]) - + kryo.register(classOf[BitSet]) + kryo.register(classOf[VertexIdToIndexMap]) // This avoids a large number of hash table lookups. kryo.setReferences(false) } diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index b737a9d0e6..52d0baea4b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -20,12 +20,9 @@ package org.apache.spark.graph import java.nio.ByteBuffer -import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.BitSet - import org.apache.spark._ import org.apache.spark.rdd._ @@ -33,7 +30,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.Partitioner._ import org.apache.spark.storage.StorageLevel - +import org.apache.spark.util.hash.BitSet @@ -167,7 +164,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( // Walk the index to construct the key, value pairs indexMap.iterator // Extract rows with key value pairs and indicators - .map{ case (k, ind) => (bs(ind), k, ind) } + .map{ case (k, ind) => (bs.get(ind), k, ind) } // Remove tuples that aren't actually present in the array .filter( _._1 ) // Extract the pair (removing the indicator from the tuple) @@ -188,7 +185,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( * modifies the bitmap index and so no new values are allocated. */ override def filter(pred: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = { - val cleanF = index.rdd.context.clean(pred) + val cleanPred = index.rdd.context.clean(pred) val newValues = index.rdd.zipPartitions(valuesRDD){ (keysIter: Iterator[VertexIdToIndexMap], valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => @@ -200,7 +197,9 @@ class VertexSetRDD[@specialized V: ClassManifest]( val newBS = new BitSet(oldValues.size) // Populate the new Values for( (k,i) <- index ) { - newBS(i) = bs(i) && cleanF( (k, oldValues(i)) ) + if( bs.get(i) && cleanPred( (k, oldValues(i)) ) ) { + newBS.set(i) + } } Array((oldValues, newBS)).iterator } @@ -224,6 +223,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = valuesRDD.mapPartitions(iter => iter.map{ case (values, bs: BitSet) => + /** * @todo Consider using a view rather than creating a new * array. This is already being done for join operations. @@ -231,8 +231,17 @@ class VertexSetRDD[@specialized V: ClassManifest]( * recomputation. */ val newValues = new Array[U](values.size) - for ( ind <- bs ) { - newValues(ind) = f(values(ind)) + var ind = bs.nextSetBit(0) + while(ind >= 0) { + // if(ind >= newValues.size) { + // println(ind) + // println(newValues.size) + // bs.iterator.foreach(print(_)) + // } + // assert(ind < newValues.size) + // assert(ind < values.size) + newValues(ind) = cleanF(values(ind)) + ind = bs.nextSetBit(ind+1) } (newValues.toIndexedSeq, bs) }, preservesPartitioning = true) @@ -271,7 +280,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( val newValues: Array[U] = new Array[U](oldValues.size) // Populate the new Values for( (k,i) <- index ) { - if (bs(i)) { newValues(i) = f(k, oldValues(i)) } + if (bs.get(i)) { newValues(i) = cleanF(k, oldValues(i)) } } Array((newValues.toIndexedSeq, bs)).iterator } @@ -304,7 +313,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( assert(!thisIter.hasNext) val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) - val newBS = thisBS & otherBS + val newBS: BitSet = thisBS & otherBS val newValues = thisValues.view.zip(otherValues) Iterator((newValues.toIndexedSeq, newBS)) } @@ -340,7 +349,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) val otherOption = otherValues.view.zipWithIndex - .map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None } + .map{ (x: (W, Int)) => if(otherBS.get(x._2)) Option(x._1) else None } val newValues = thisValues.view.zip(otherOption) Iterator((newValues.toIndexedSeq, thisBS)) } @@ -406,19 +415,19 @@ class VertexSetRDD[@specialized V: ClassManifest]( val ind = index.get(k) // Not all the vertex ids in the index are in this VertexSet. // If there is a vertex in this set then record the other value - if(thisBS(ind)) { - if(wBS(ind)) { + if(thisBS.get(ind)) { + if(wBS.get(ind)) { newW(ind) = cleanMerge(newW(ind), w) } else { newW(ind) = w - wBS(ind) = true + wBS.set(ind) } } } // end of for loop over tuples // Some vertices in this vertex set may not have a corresponding // tuple in the join and so a None value should be returned. val otherOption = newW.view.zipWithIndex - .map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None } + .map{ (x: (W, Int)) => if(wBS.get(x._2)) Option(x._1) else None } // the final values is the zip of the values in this RDD along with // the values in the other val newValues = thisValues.view.zip(otherOption) @@ -456,10 +465,13 @@ class VertexSetRDD[@specialized V: ClassManifest]( */ val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) val newBS = thisBS | otherBS - for( ind <- newBS ) { - val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] - val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + + var ind = newBS.nextSetBit(0) + while(ind >= 0) { + val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V] + val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W] newValues(ind) = (a, b) + ind = newBS.nextSetBit(ind+1) } Iterator((newValues.toIndexedSeq, newBS)) } @@ -511,17 +523,17 @@ class VertexSetRDD[@specialized V: ClassManifest]( // Get the left key val a = if (thisIndex.contains(k)) { val ind = thisIndex.get(k) - if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + if(thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V] } else Seq.empty[V] // Get the right key val b = if (otherIndex.contains(k)) { val ind = otherIndex.get(k) - if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W] } else Seq.empty[W] // If at least one key was present then we generate a tuple. if (!a.isEmpty || !b.isEmpty) { newValues(ind) = (a, b) - newBS(ind) = true + newBS.set(ind) } } Iterator((newValues.toIndexedSeq, newBS)) @@ -554,28 +566,28 @@ class VertexSetRDD[@specialized V: ClassManifest]( val newBS = new BitSet(thisValues.size) // populate the newValues with the values in this VertexSetRDD for ((k,i) <- thisIndex) { - if (thisBS(i)) { + if (thisBS.get(i)) { newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) - newBS(i) = true + newBS.set(i) } } // Now iterate through the other tuples updating the map for ((k,w) <- otherTuplesIter){ if (newIndex.contains(k)) { val ind = newIndex.get(k) - if(newBS(ind)) { + if(newBS.get(ind)) { newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) } else { // If the other key was in the index but not in the values // of this indexed RDD then create a new values entry for it - newBS(ind) = true + newBS.set(ind) newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) } } else { // update the index val ind = newIndex.size newIndex.put(k, ind) - newBS(ind) = true + newBS.set(ind) // Update the values newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) } @@ -592,6 +604,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( } } } // end of cogroup + + } // End of VertexSetRDD @@ -633,18 +647,18 @@ object VertexSetRDD { val groups = preAgg.mapPartitions( iter => { val indexMap = new VertexIdToIndexMap() val values = new ArrayBuffer[V] - val bs = new BitSet for ((k,v) <- iter) { if(!indexMap.contains(k)) { val ind = indexMap.size indexMap.put(k, ind) values.append(v) - bs(ind) = true } else { val ind = indexMap.get(k) values(ind) = reduceFunc(values(ind), v) } } + val bs = new BitSet(indexMap.size) + bs.setUntil(indexMap.size) Iterator( (indexMap, (values.toIndexedSeq, bs)) ) }, true).cache // extract the index and the values @@ -732,16 +746,17 @@ object VertexSetRDD { val values = new Array[C](index.size) val bs = new BitSet(index.size) for ((k,c) <- tblIter) { + // @todo this extra check may be costing us a lot! if (!index.contains(k)) { throw new SparkException("Error: Trying to bind an external index " + "to an RDD which contains keys that are not in the index.") } val ind = index(k) - if (bs(ind)) { + if (bs.get(ind)) { values(ind) = mergeCombiners(values(ind), c) } else { values(ind) = c - bs(ind) = true + bs.set(ind) } } Iterator((values, bs))