Switching to the @rxin BitSet implementation for VertexSet Value tables.

This commit is contained in:
Joseph E. Gonzalez 2013-10-31 01:44:24 -07:00
parent 51aff8ddcf
commit d6b5122532
2 changed files with 51 additions and 32 deletions

View file

@ -1,5 +1,8 @@
package org.apache.spark.graph
import org.apache.spark.util.hash.BitSet
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl.MessageToPartition
@ -14,7 +17,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[(Vid, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
}

View file

@ -20,12 +20,9 @@ package org.apache.spark.graph
import java.nio.ByteBuffer
import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.BitSet
import org.apache.spark._
import org.apache.spark.rdd._
@ -33,7 +30,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.Partitioner._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.hash.BitSet
@ -167,7 +164,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
// Walk the index to construct the key, value pairs
indexMap.iterator
// Extract rows with key value pairs and indicators
.map{ case (k, ind) => (bs(ind), k, ind) }
.map{ case (k, ind) => (bs.get(ind), k, ind) }
// Remove tuples that aren't actually present in the array
.filter( _._1 )
// Extract the pair (removing the indicator from the tuple)
@ -188,7 +185,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
* modifies the bitmap index and so no new values are allocated.
*/
override def filter(pred: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
val cleanF = index.rdd.context.clean(pred)
val cleanPred = index.rdd.context.clean(pred)
val newValues = index.rdd.zipPartitions(valuesRDD){
(keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(IndexedSeq[V], BitSet)]) =>
@ -200,7 +197,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
val newBS = new BitSet(oldValues.size)
// Populate the new Values
for( (k,i) <- index ) {
newBS(i) = bs(i) && cleanF( (k, oldValues(i)) )
if( bs.get(i) && cleanPred( (k, oldValues(i)) ) ) {
newBS.set(i)
}
}
Array((oldValues, newBS)).iterator
}
@ -224,6 +223,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
valuesRDD.mapPartitions(iter => iter.map{
case (values, bs: BitSet) =>
/**
* @todo Consider using a view rather than creating a new
* array. This is already being done for join operations.
@ -231,8 +231,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
* recomputation.
*/
val newValues = new Array[U](values.size)
for ( ind <- bs ) {
newValues(ind) = f(values(ind))
var ind = bs.nextSetBit(0)
while(ind >= 0) {
// if(ind >= newValues.size) {
// println(ind)
// println(newValues.size)
// bs.iterator.foreach(print(_))
// }
// assert(ind < newValues.size)
// assert(ind < values.size)
newValues(ind) = cleanF(values(ind))
ind = bs.nextSetBit(ind+1)
}
(newValues.toIndexedSeq, bs)
}, preservesPartitioning = true)
@ -271,7 +280,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
val newValues: Array[U] = new Array[U](oldValues.size)
// Populate the new Values
for( (k,i) <- index ) {
if (bs(i)) { newValues(i) = f(k, oldValues(i)) }
if (bs.get(i)) { newValues(i) = cleanF(k, oldValues(i)) }
}
Array((newValues.toIndexedSeq, bs)).iterator
}
@ -304,7 +313,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
assert(!thisIter.hasNext)
val (otherValues, otherBS: BitSet) = otherIter.next()
assert(!otherIter.hasNext)
val newBS = thisBS & otherBS
val newBS: BitSet = thisBS & otherBS
val newValues = thisValues.view.zip(otherValues)
Iterator((newValues.toIndexedSeq, newBS))
}
@ -340,7 +349,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
val (otherValues, otherBS: BitSet) = otherIter.next()
assert(!otherIter.hasNext)
val otherOption = otherValues.view.zipWithIndex
.map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None }
.map{ (x: (W, Int)) => if(otherBS.get(x._2)) Option(x._1) else None }
val newValues = thisValues.view.zip(otherOption)
Iterator((newValues.toIndexedSeq, thisBS))
}
@ -406,19 +415,19 @@ class VertexSetRDD[@specialized V: ClassManifest](
val ind = index.get(k)
// Not all the vertex ids in the index are in this VertexSet.
// If there is a vertex in this set then record the other value
if(thisBS(ind)) {
if(wBS(ind)) {
if(thisBS.get(ind)) {
if(wBS.get(ind)) {
newW(ind) = cleanMerge(newW(ind), w)
} else {
newW(ind) = w
wBS(ind) = true
wBS.set(ind)
}
}
} // end of for loop over tuples
// Some vertices in this vertex set may not have a corresponding
// tuple in the join and so a None value should be returned.
val otherOption = newW.view.zipWithIndex
.map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None }
.map{ (x: (W, Int)) => if(wBS.get(x._2)) Option(x._1) else None }
// the final values is the zip of the values in this RDD along with
// the values in the other
val newValues = thisValues.view.zip(otherOption)
@ -456,10 +465,13 @@ class VertexSetRDD[@specialized V: ClassManifest](
*/
val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
val newBS = thisBS | otherBS
for( ind <- newBS ) {
val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V]
val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
var ind = newBS.nextSetBit(0)
while(ind >= 0) {
val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
newValues(ind) = (a, b)
ind = newBS.nextSetBit(ind+1)
}
Iterator((newValues.toIndexedSeq, newBS))
}
@ -511,17 +523,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
// Get the left key
val a = if (thisIndex.contains(k)) {
val ind = thisIndex.get(k)
if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V]
if(thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
} else Seq.empty[V]
// Get the right key
val b = if (otherIndex.contains(k)) {
val ind = otherIndex.get(k)
if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
} else Seq.empty[W]
// If at least one key was present then we generate a tuple.
if (!a.isEmpty || !b.isEmpty) {
newValues(ind) = (a, b)
newBS(ind) = true
newBS.set(ind)
}
}
Iterator((newValues.toIndexedSeq, newBS))
@ -554,28 +566,28 @@ class VertexSetRDD[@specialized V: ClassManifest](
val newBS = new BitSet(thisValues.size)
// populate the newValues with the values in this VertexSetRDD
for ((k,i) <- thisIndex) {
if (thisBS(i)) {
if (thisBS.get(i)) {
newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W])
newBS(i) = true
newBS.set(i)
}
}
// Now iterate through the other tuples updating the map
for ((k,w) <- otherTuplesIter){
if (newIndex.contains(k)) {
val ind = newIndex.get(k)
if(newBS(ind)) {
if(newBS.get(ind)) {
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
} else {
// If the other key was in the index but not in the values
// of this indexed RDD then create a new values entry for it
newBS(ind) = true
newBS.set(ind)
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
}
} else {
// update the index
val ind = newIndex.size
newIndex.put(k, ind)
newBS(ind) = true
newBS.set(ind)
// Update the values
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
}
@ -592,6 +604,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
}
}
} // end of cogroup
} // End of VertexSetRDD
@ -633,18 +647,18 @@ object VertexSetRDD {
val groups = preAgg.mapPartitions( iter => {
val indexMap = new VertexIdToIndexMap()
val values = new ArrayBuffer[V]
val bs = new BitSet
for ((k,v) <- iter) {
if(!indexMap.contains(k)) {
val ind = indexMap.size
indexMap.put(k, ind)
values.append(v)
bs(ind) = true
} else {
val ind = indexMap.get(k)
values(ind) = reduceFunc(values(ind), v)
}
}
val bs = new BitSet(indexMap.size)
bs.setUntil(indexMap.size)
Iterator( (indexMap, (values.toIndexedSeq, bs)) )
}, true).cache
// extract the index and the values
@ -732,16 +746,17 @@ object VertexSetRDD {
val values = new Array[C](index.size)
val bs = new BitSet(index.size)
for ((k,c) <- tblIter) {
// @todo this extra check may be costing us a lot!
if (!index.contains(k)) {
throw new SparkException("Error: Trying to bind an external index " +
"to an RDD which contains keys that are not in the index.")
}
val ind = index(k)
if (bs(ind)) {
if (bs.get(ind)) {
values(ind) = mergeCombiners(values(ind), c)
} else {
values(ind) = c
bs(ind) = true
bs.set(ind)
}
}
Iterator((values, bs))