Reverting to Array based (materialized) output of all VertexSetRDD operations.

This commit is contained in:
Joseph E. Gonzalez 2013-11-05 01:15:12 -08:00
parent 99bfcc91e0
commit 2dc9ec2387
5 changed files with 121 additions and 121 deletions

View file

@ -154,11 +154,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
(vid, edge) => Some(Array(edge.otherVertexId(vid))),
(a, b) => a ++ b,
case (_, Some(nbrs)) => nbrs
case (_, None) => Array.empty[Vid]
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) }
} // end of collectNeighborIds
@ -183,10 +179,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
(a, b) => a ++ b,
case (_, Some(nbrs)) => nbrs
case (_, None) => Array.empty[(Vid, VD)]
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(Vid, VD)]) }
} // end of collectNeighbor

View file

@ -98,7 +98,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
class VertexSetRDD[@specialized V: ClassManifest](
@transient val index: VertexSetIndex,
@transient val valuesRDD: RDD[ ( (Int => V), BitSet) ])
@transient val valuesRDD: RDD[ ( Array[V], BitSet) ])
extends RDD[(Vid, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
@ -182,7 +182,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
val cleanPred = index.rdd.context.clean(pred)
val newValues = index.rdd.zipPartitions(valuesRDD){
(keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(Int => V, BitSet)]) =>
valuesIter: Iterator[(Array[V], BitSet)]) =>
val index =
assert(keysIter.hasNext() == false)
val (oldValues, bs) =
@ -217,11 +217,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
* VertexSetRDD retains the same index.
def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
val newValuesRDD: RDD[ (Int => U, BitSet) ] =
val cleanF = index.rdd.context.clean(f)
val newValuesRDD: RDD[ (Array[U], BitSet) ] =
valuesRDD.mapPartitions(iter =>{
case (values, bs: BitSet) =>
val newValues: (Int => U) =
(ind: Int) => if (bs.get(ind)) f(values(ind)) else null.asInstanceOf[U]
val newValues = new Array[U](values.size)
bs.iterator.foreach { ind => newValues(ind) = cleanF(values(ind)) }
(newValues, bs)
}, preservesPartitioning = true)
new VertexSetRDD[U](index, newValuesRDD)
@ -241,19 +242,18 @@ class VertexSetRDD[@specialized V: ClassManifest](
* VertexSetRDD retains the same index.
def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
val newValues: RDD[ (Int => U, BitSet) ] =
val cleanF = index.rdd.context.clean(f)
val newValues: RDD[ (Array[U], BitSet) ] =
(keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(Int => V, BitSet)]) =>
valuesIter: Iterator[(Array[V], BitSet)]) =>
val index =
assert(keysIter.hasNext() == false)
val (oldValues, bs: BitSet) =
val (values, bs: BitSet) =
assert(valuesIter.hasNext() == false)
// Cosntruct a view of the map transformation
val newValues: (Int => U) = (ind: Int) => {
if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) }
else { null.asInstanceOf[U] }
val newValues = new Array[U](index.capacity)
bs.iterator.foreach { ind => newValues(ind) = cleanF(index.getValueSafe(ind), values(ind)) }
Iterator((newValues, bs))
new VertexSetRDD[U](index, newValues)
@ -261,6 +261,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
* @todo update docs to reflect function argument
* Inner join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set will only contain
@ -273,20 +275,25 @@ class VertexSetRDD[@specialized V: ClassManifest](
* and the other VertexSet and with tuple attributes.
def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = {
def zipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, W) => Z):
VertexSetRDD[Z] = {
val cleanF = index.rdd.context.clean(f)
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
val newValuesRDD: RDD[ (Int => (V,W), BitSet) ] =
(thisIter: Iterator[(Int => V, BitSet)],
otherIter: Iterator[(Int => W, BitSet)]) =>
val newValuesRDD: RDD[ (Array[Z], BitSet) ] =
index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) =>
val index =
val (thisValues, thisBS: BitSet) =
val (otherValues, otherBS: BitSet) =
val newBS: BitSet = thisBS & otherBS
val newValues: Int => (V,W) = (ind: Int) => (thisValues(ind), otherValues(ind))
val newValues = new Array[Z](index.capacity)
newBS.iterator.foreach { ind =>
newValues(ind) = cleanF(index.getValueSafe(ind), thisValues(ind), otherValues(ind))
Iterator((newValues, newBS))
new VertexSetRDD(index, newValuesRDD)
@ -294,6 +301,37 @@ class VertexSetRDD[@specialized V: ClassManifest](
* @todo document
* @param other
* @param f
* @tparam W
* @tparam Z
* @return
def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]):
RDD[Z] = {
val cleanF = index.rdd.context.clean(f)
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) =>
val index =
val (thisValues, thisBS: BitSet) =
val (otherValues, otherBS: BitSet) =
val newBS: BitSet = thisBS & otherBS
val newValues = new Array[Z](index.capacity)
newBS.iterator.flatMap { ind => cleanF(index.getValueSafe(ind), thisValues(ind), otherValues(ind)) }
* @todo update docs to reflect function argument
* Left join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set contains an entry
@ -308,20 +346,25 @@ class VertexSetRDD[@specialized V: ClassManifest](
* other VertexSet.
def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = {
def leftZipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, Option[W]) => Z):
VertexSetRDD[Z] = {
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] =
(thisIter: Iterator[(Int => V, BitSet)],
otherIter: Iterator[(Int => W, BitSet)]) =>
val cleanF = index.rdd.context.clean(f)
val newValuesRDD: RDD[(Array[Z], BitSet)] =
index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) =>
val index =
val (thisValues, thisBS: BitSet) =
val (otherValues, otherBS: BitSet) =
val newValues: Int => (V, Option[W]) = (ind: Int) =>
(thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
val newValues = new Array[Z](index.capacity)
thisBS.iterator.foreach { ind =>
val otherV = if (otherBS.get(ind)) Option(otherValues(ind)) else None
newValues(ind) = cleanF(index.getValueSafe(ind), thisValues(ind), otherV)
Iterator((newValues, thisBS))
new VertexSetRDD(index, newValuesRDD)
@ -346,68 +389,29 @@ class VertexSetRDD[@specialized V: ClassManifest](
* other VertexSet.
def leftJoin[W: ClassManifest](
other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a):
VertexSetRDD[(V, Option[W]) ] = {
def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)])
(f: (Vid, V, Option[W]) => Z, merge: (W,W) => W = (a:W, b:W) => a ):
VertexSetRDD[Z] = {
val cleanF = index.rdd.context.clean(f)
val cleanMerge = index.rdd.context.clean(merge)
// Test if the other vertex is a VertexSetRDD to choose the optimal
// join strategy
other match {
// If the other set is a VertexSetRDD and shares the same index as
// this vertex set then we use the much more efficient leftZipJoin
case other: VertexSetRDD[_] if index == other.index => {
// @todo handle case where other is a VertexSetRDD with a different index
case _ => {
// Otherwise we treat the other RDD as a collectiong of
// vertex-attribute pairs.
// If necessary shuffle the other RDD using the partitioner
// for this VertexSet
val otherShuffled =
if (other.partitioner == partitioner) other
else other.partitionBy(partitioner.get)
// Compute the new values RDD
val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] =
index.rdd.zipPartitions(valuesRDD, otherShuffled) {
(thisIndexIter: Iterator[VertexIdToIndexMap],
thisIter: Iterator[(Int => V, BitSet)],
tuplesIter: Iterator[(Vid,W)]) =>
// Get the Index and values for this RDD
val index =
val (thisValues, thisBS) =
// Create a new array to store the values in the resulting VertexSet
val otherValues = new Array[W](index.capacity)
// track which values are matched with values in other
val otherBS = new BitSet(index.capacity)
for ((k,w) <- tuplesIter) {
// Get the location of the key in the index
val pos = index.getPos(k)
// Only if the key is already in the index
if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) {
// Get the actual index
val ind = pos & OpenHashSet.POSITION_MASK
// If this value has already been seen then merge
if (otherBS.get(ind)) {
otherValues(ind) = merge(otherValues(ind), w)
} else { // otherwise just store the new value
otherValues(ind) = w
// Some vertices in this vertex set may not have a corresponding
// tuple in the join and so a None value should be returned.
val newValues: Int => (V, Option[W]) = (ind: Int) =>
(thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
Iterator((newValues, thisBS))
} // end of newValues
new VertexSetRDD(index, newValuesRDD)
val indexedOther: VertexSetRDD[W] = VertexSetRDD(other, index, cleanMerge)
} // end of leftJoin
* For each key k in `this` or `other`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this` as well as `other`.
@ -609,29 +613,30 @@ object VertexSetRDD {
def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
val cReduceFunc = rdd.context.clean(reduceFunc)
// Preaggregate and shuffle if necessary
val preAgg = rdd.partitioner match {
case Some(p) => rdd
case None =>
val partitioner = new HashPartitioner(rdd.partitions.size)
// Preaggregation.
val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc)
val aggregator = new Aggregator[Vid, V, V](v => v, cReduceFunc, cReduceFunc)
rdd.mapPartitions(aggregator.combineValuesByKey, true).partitionBy(partitioner)
val groups = preAgg.mapPartitions( iter => {
val hashMap = new PrimitiveKeyOpenHashMap[Vid, V]
for ((k,v) <- iter) {
hashMap.setMerge(k, v, reduceFunc)
hashMap.setMerge(k, v, cReduceFunc)
val index = hashMap.keySet
val values: Int => V = (ind: Int) => hashMap._values(ind)
val values = hashMap._values
val bs = index.getBitSet
Iterator( (index, (values, bs)) )
}, true).cache
// extract the index and the values
val index = groups.mapPartitions({ case (kMap, vAr) => kMap }, true)
val values: RDD[(Int => V, BitSet)] =
val values: RDD[(Array[V], BitSet)] =
groups.mapPartitions({ case (kMap,vAr) => vAr }, true)
new VertexSetRDD[V](new VertexSetIndex(index), values)
} // end of apply
@ -690,6 +695,9 @@ object VertexSetRDD {
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): VertexSetRDD[C] = {
val cCreateCombiner = index.rdd.context.clean(createCombiner)
val cMergeValue = index.rdd.context.clean(mergeValue)
val cMergeCombiners = index.rdd.context.clean(mergeCombiners)
// Get the index Partitioner
val partitioner = index.rdd.partitioner match {
case Some(p) => p
@ -699,15 +707,15 @@ object VertexSetRDD {
val partitioned =
if (rdd.partitioner != Some(partitioner)) {
// Preaggregation.
val aggregator = new Aggregator[Vid, V, C](createCombiner, mergeValue,
val aggregator = new Aggregator[Vid, V, C](cCreateCombiner, cMergeValue,
} else {
rdd.mapValues(x => createCombiner(x))
// Use the index to build the new values table
val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
val values: RDD[ (Array[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// There is only one map
val index =
@ -724,14 +732,14 @@ object VertexSetRDD {
val ind = pos & OpenHashSet.POSITION_MASK
// If this value has already been seen then merge
if (bs.get(ind)) {
values(ind) = mergeCombiners(values(ind), c)
values(ind) = cMergeCombiners(values(ind), c)
} else { // otherwise just store the new value
values(ind) = c
Iterator(((ind: Int) => values(ind), bs))
Iterator((values, bs))
new VertexSetRDD(index, values)
} // end of apply

View file

@ -315,8 +315,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
(updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2)
: Graph[VD2, ED] = {
val newVTable = vTable.leftJoin(updates).mapValuesWithKeys(
(vid, vu) => updateF(vid, vu._1, vu._2) )
val newVTable = vTable.leftJoin(updates)(updateF)
new GraphImpl(newVTable, vid2pid, localVidMap, eTable)
@ -437,11 +436,9 @@ object GraphImpl {
RDD[(Pid, Array[VD])] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined
// result, and get the shuffle id so we can use it on the slave.
val msgsByPartition = vTable.zipJoin(vid2pid)
.flatMap { case (vid, (vdata, pids)) =>
val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) => { pid => MessageToPartition(pid, (vid, vdata)) }
(mapIter, msgsIter) =>

View file

@ -4,6 +4,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.graph.LocalSparkContext._
@ -58,8 +59,9 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb)
val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb)
val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices)
.map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum
val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) =>
if (pr1 != pr2) { 1 } else { 0 }
}.map { case (vid, test) => test }.sum
assert(notMatching === 0)
val errors ={ case (vid, pr) =>
@ -70,10 +72,12 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
assert(errors.sum === 0)
val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb)
val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices).map{
case (_, (pr1, Some(pr2))) if(pr1 == pr2) => 0
val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices){ (vid, pr1, pr2Opt) =>
pr2Opt match {
case Some(pr2) if(pr1 == pr2) => 0
case _ => 1
}.map { case (vid, test) => test }.sum
assert(errors2 === 0)
} // end of test Star PageRank
@ -86,19 +90,17 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val resetProb = 0.15
val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache()
val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache()
val error = prGraph1.vertices.zipJoin(prGraph2.vertices).map {
case (id, (a, b)) => (a - b) * (a - b)
.map{ case (id, (a,b)) => (id, (a,b, a-b))}.foreach(println(_))
val error = prGraph1.vertices.zipJoin(prGraph2.vertices) { case (id, a, b) => (a - b) * (a - b) }
.map { case (id, error) => error }.sum
prGraph1.vertices.zipJoin(prGraph2.vertices) { (id, a, b) => (a, b, a-b) }.foreach(println(_))
assert(error < 1.0e-5)
val pr3 = sc.parallelize(GridPageRank(10,10, 50, resetProb))
val error2 = prGraph1.vertices.leftJoin(pr3).map {
case (id, (a, Some(b))) => (a - b) * (a - b)
case _ => 0
prGraph1.vertices.leftJoin(pr3).foreach(println( _ ))
val pr3: RDD[(Vid, Double)] = sc.parallelize(GridPageRank(10,10, 50, resetProb))
val error2 = prGraph1.vertices.leftJoin(pr3) { (id, a, bOpt) =>
val b: Double = bOpt.get
(a - b) * (a - b)
}.map { case (id, error) => error }.sum
prGraph1.vertices.leftJoin(pr3) { (id, a, b) => (a, b) }.foreach( println(_) )
assert(error2 < 1.0e-5)

View file

@ -78,13 +78,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
val b = VertexSetRDD(a).mapValues(x => -x)
assert(b.count === 101)
assert(b.leftJoin(a).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0)
val c = VertexSetRDD(a, b.index)
assert(b.leftJoin(c).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0)
val d = c.filter(q => ((q._2 % 2) == 0))
val e = a.filter(q => ((q._2 % 2) == 0))
assert(d.count === e.count)
assert(b.zipJoin(c).mapValues(x => x._1 + x._2).map(x => x._2).reduce(_+_) === 0)
assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)