diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index 8480ff29d3..8c7f4c25e2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -154,11 +154,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { (vid, edge) => Some(Array(edge.otherVertexId(vid))), (a, b) => a ++ b, edgeDirection) - - graph.vertices.leftZipJoin(nbrs).mapValues{ - case (_, Some(nbrs)) => nbrs - case (_, None) => Array.empty[Vid] - } + graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) } } // end of collectNeighborIds @@ -183,10 +179,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { (a, b) => a ++ b, edgeDirection) - graph.vertices.leftZipJoin(nbrs).mapValues{ - case (_, Some(nbrs)) => nbrs - case (_, None) => Array.empty[(Vid, VD)] - } + graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(Vid, VD)]) } } // end of collectNeighbor diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index f26e286003..5cb05998aa 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -98,7 +98,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { */ class VertexSetRDD[@specialized V: ClassManifest]( @transient val index: VertexSetIndex, - @transient val valuesRDD: RDD[ ( (Int => V), BitSet) ]) + @transient val valuesRDD: RDD[ ( Array[V], BitSet) ]) extends RDD[(Vid, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { @@ -182,7 +182,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( val cleanPred = index.rdd.context.clean(pred) val newValues = index.rdd.zipPartitions(valuesRDD){ (keysIter: Iterator[VertexIdToIndexMap], - valuesIter: Iterator[(Int => V, BitSet)]) => + valuesIter: Iterator[(Array[V], BitSet)]) => val index = keysIter.next() assert(keysIter.hasNext() == false) val (oldValues, bs) = valuesIter.next() @@ -217,11 +217,12 @@ class VertexSetRDD[@specialized V: ClassManifest]( * VertexSetRDD retains the same index. */ def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = { - val newValuesRDD: RDD[ (Int => U, BitSet) ] = + val cleanF = index.rdd.context.clean(f) + val newValuesRDD: RDD[ (Array[U], BitSet) ] = valuesRDD.mapPartitions(iter => iter.map{ - case (values, bs: BitSet) => - val newValues: (Int => U) = - (ind: Int) => if (bs.get(ind)) f(values(ind)) else null.asInstanceOf[U] + case (values, bs: BitSet) => + val newValues = new Array[U](values.size) + bs.iterator.foreach { ind => newValues(ind) = cleanF(values(ind)) } (newValues, bs) }, preservesPartitioning = true) new VertexSetRDD[U](index, newValuesRDD) @@ -241,19 +242,18 @@ class VertexSetRDD[@specialized V: ClassManifest]( * VertexSetRDD retains the same index. */ def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = { - val newValues: RDD[ (Int => U, BitSet) ] = + val cleanF = index.rdd.context.clean(f) + val newValues: RDD[ (Array[U], BitSet) ] = index.rdd.zipPartitions(valuesRDD){ (keysIter: Iterator[VertexIdToIndexMap], - valuesIter: Iterator[(Int => V, BitSet)]) => + valuesIter: Iterator[(Array[V], BitSet)]) => val index = keysIter.next() assert(keysIter.hasNext() == false) - val (oldValues, bs: BitSet) = valuesIter.next() + val (values, bs: BitSet) = valuesIter.next() assert(valuesIter.hasNext() == false) // Cosntruct a view of the map transformation - val newValues: (Int => U) = (ind: Int) => { - if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) } - else { null.asInstanceOf[U] } - } + val newValues = new Array[U](index.capacity) + bs.iterator.foreach { ind => newValues(ind) = cleanF(index.getValueSafe(ind), values(ind)) } Iterator((newValues, bs)) } new VertexSetRDD[U](index, newValues) @@ -261,6 +261,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** + * @todo update docs to reflect function argument + * * Inner join this VertexSet with another VertexSet which has the * same Index. This function will fail if both VertexSets do not * share the same index. The resulting vertex set will only contain @@ -273,20 +275,25 @@ class VertexSetRDD[@specialized V: ClassManifest]( * and the other VertexSet and with tuple attributes. * */ - def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = { + def zipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, W) => Z): + VertexSetRDD[Z] = { + val cleanF = index.rdd.context.clean(f) if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } - val newValuesRDD: RDD[ (Int => (V,W), BitSet) ] = - valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter: Iterator[(Int => V, BitSet)], - otherIter: Iterator[(Int => W, BitSet)]) => + val newValuesRDD: RDD[ (Array[Z], BitSet) ] = + index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) => + val index = indexIter.next() + assert(!indexIter.hasNext) val (thisValues, thisBS: BitSet) = thisIter.next() assert(!thisIter.hasNext) val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) val newBS: BitSet = thisBS & otherBS - val newValues: Int => (V,W) = (ind: Int) => (thisValues(ind), otherValues(ind)) + val newValues = new Array[Z](index.capacity) + newBS.iterator.foreach { ind => + newValues(ind) = cleanF(index.getValueSafe(ind), thisValues(ind), otherValues(ind)) + } Iterator((newValues, newBS)) } new VertexSetRDD(index, newValuesRDD) @@ -294,6 +301,37 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** + * @todo document + * + * @param other + * @param f + * @tparam W + * @tparam Z + * @return + */ + def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]): + RDD[Z] = { + val cleanF = index.rdd.context.clean(f) + if(index != other.index) { + throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") + } + index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) => + val index = indexIter.next() + assert(!indexIter.hasNext) + val (thisValues, thisBS: BitSet) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS: BitSet) = otherIter.next() + assert(!otherIter.hasNext) + val newBS: BitSet = thisBS & otherBS + val newValues = new Array[Z](index.capacity) + newBS.iterator.flatMap { ind => cleanF(index.getValueSafe(ind), thisValues(ind), otherValues(ind)) } + } + } + + + /** + * @todo update docs to reflect function argument + * Left join this VertexSet with another VertexSet which has the * same Index. This function will fail if both VertexSets do not * share the same index. The resulting vertex set contains an entry @@ -308,20 +346,25 @@ class VertexSetRDD[@specialized V: ClassManifest]( * other VertexSet. * */ - def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = { + def leftZipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, Option[W]) => Z): + VertexSetRDD[Z] = { if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } - val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = - valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter: Iterator[(Int => V, BitSet)], - otherIter: Iterator[(Int => W, BitSet)]) => + val cleanF = index.rdd.context.clean(f) + val newValuesRDD: RDD[(Array[Z], BitSet)] = + index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) => + val index = indexIter.next() + assert(!indexIter.hasNext) val (thisValues, thisBS: BitSet) = thisIter.next() assert(!thisIter.hasNext) val (otherValues, otherBS: BitSet) = otherIter.next() assert(!otherIter.hasNext) - val newValues: Int => (V, Option[W]) = (ind: Int) => - (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None) + val newValues = new Array[Z](index.capacity) + thisBS.iterator.foreach { ind => + val otherV = if (otherBS.get(ind)) Option(otherValues(ind)) else None + newValues(ind) = cleanF(index.getValueSafe(ind), thisValues(ind), otherV) + } Iterator((newValues, thisBS)) } new VertexSetRDD(index, newValuesRDD) @@ -346,68 +389,29 @@ class VertexSetRDD[@specialized V: ClassManifest]( * other VertexSet. * */ - def leftJoin[W: ClassManifest]( - other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a): - VertexSetRDD[(V, Option[W]) ] = { + def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)]) + (f: (Vid, V, Option[W]) => Z, merge: (W,W) => W = (a:W, b:W) => a ): + VertexSetRDD[Z] = { + val cleanF = index.rdd.context.clean(f) + val cleanMerge = index.rdd.context.clean(merge) // Test if the other vertex is a VertexSetRDD to choose the optimal // join strategy other match { // If the other set is a VertexSetRDD and shares the same index as // this vertex set then we use the much more efficient leftZipJoin case other: VertexSetRDD[_] if index == other.index => { - leftZipJoin(other) - } + leftZipJoin(other)(cleanF) + // @todo handle case where other is a VertexSetRDD with a different index + } case _ => { - // Otherwise we treat the other RDD as a collectiong of - // vertex-attribute pairs. - // If necessary shuffle the other RDD using the partitioner - // for this VertexSet - val otherShuffled = - if (other.partitioner == partitioner) other - else other.partitionBy(partitioner.get) - // Compute the new values RDD - val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = - index.rdd.zipPartitions(valuesRDD, otherShuffled) { - (thisIndexIter: Iterator[VertexIdToIndexMap], - thisIter: Iterator[(Int => V, BitSet)], - tuplesIter: Iterator[(Vid,W)]) => - // Get the Index and values for this RDD - val index = thisIndexIter.next() - assert(!thisIndexIter.hasNext) - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - // Create a new array to store the values in the resulting VertexSet - val otherValues = new Array[W](index.capacity) - // track which values are matched with values in other - val otherBS = new BitSet(index.capacity) - for ((k,w) <- tuplesIter) { - // Get the location of the key in the index - val pos = index.getPos(k) - // Only if the key is already in the index - if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) { - // Get the actual index - val ind = pos & OpenHashSet.POSITION_MASK - // If this value has already been seen then merge - if (otherBS.get(ind)) { - otherValues(ind) = merge(otherValues(ind), w) - } else { // otherwise just store the new value - otherBS.set(ind) - otherValues(ind) = w - } - } - } - // Some vertices in this vertex set may not have a corresponding - // tuple in the join and so a None value should be returned. - val newValues: Int => (V, Option[W]) = (ind: Int) => - (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None) - Iterator((newValues, thisBS)) - } // end of newValues - new VertexSetRDD(index, newValuesRDD) + val indexedOther: VertexSetRDD[W] = VertexSetRDD(other, index, cleanMerge) + leftZipJoin(indexedOther)(cleanF) } } } // end of leftJoin + /** * For each key k in `this` or `other`, return a resulting RDD that contains a * tuple with the list of values for that key in `this` as well as `other`. @@ -609,29 +613,30 @@ object VertexSetRDD { */ def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = { + val cReduceFunc = rdd.context.clean(reduceFunc) // Preaggregate and shuffle if necessary val preAgg = rdd.partitioner match { case Some(p) => rdd case None => val partitioner = new HashPartitioner(rdd.partitions.size) // Preaggregation. - val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc) + val aggregator = new Aggregator[Vid, V, V](v => v, cReduceFunc, cReduceFunc) rdd.mapPartitions(aggregator.combineValuesByKey, true).partitionBy(partitioner) } val groups = preAgg.mapPartitions( iter => { val hashMap = new PrimitiveKeyOpenHashMap[Vid, V] for ((k,v) <- iter) { - hashMap.setMerge(k, v, reduceFunc) + hashMap.setMerge(k, v, cReduceFunc) } val index = hashMap.keySet - val values: Int => V = (ind: Int) => hashMap._values(ind) + val values = hashMap._values val bs = index.getBitSet Iterator( (index, (values, bs)) ) }, true).cache // extract the index and the values val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) - val values: RDD[(Int => V, BitSet)] = + val values: RDD[(Array[V], BitSet)] = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) new VertexSetRDD[V](new VertexSetIndex(index), values) } // end of apply @@ -690,6 +695,9 @@ object VertexSetRDD { createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): VertexSetRDD[C] = { + val cCreateCombiner = index.rdd.context.clean(createCombiner) + val cMergeValue = index.rdd.context.clean(mergeValue) + val cMergeCombiners = index.rdd.context.clean(mergeCombiners) // Get the index Partitioner val partitioner = index.rdd.partitioner match { case Some(p) => p @@ -699,15 +707,15 @@ object VertexSetRDD { val partitioned = if (rdd.partitioner != Some(partitioner)) { // Preaggregation. - val aggregator = new Aggregator[Vid, V, C](createCombiner, mergeValue, - mergeCombiners) + val aggregator = new Aggregator[Vid, V, C](cCreateCombiner, cMergeValue, + cMergeCombiners) rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) } else { rdd.mapValues(x => createCombiner(x)) } // Use the index to build the new values table - val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { + val values: RDD[ (Array[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { // There is only one map val index = indexIter.next() assert(!indexIter.hasNext()) @@ -724,14 +732,14 @@ object VertexSetRDD { val ind = pos & OpenHashSet.POSITION_MASK // If this value has already been seen then merge if (bs.get(ind)) { - values(ind) = mergeCombiners(values(ind), c) + values(ind) = cMergeCombiners(values(ind), c) } else { // otherwise just store the new value bs.set(ind) values(ind) = c } } } - Iterator(((ind: Int) => values(ind), bs)) + Iterator((values, bs)) }) new VertexSetRDD(index, values) } // end of apply diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index b80713dbf4..83ff2d734c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -315,8 +315,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2) : Graph[VD2, ED] = { ClosureCleaner.clean(updateF) - val newVTable = vTable.leftJoin(updates).mapValuesWithKeys( - (vid, vu) => updateF(vid, vu._1, vu._2) ) + val newVTable = vTable.leftJoin(updates)(updateF) new GraphImpl(newVTable, vid2pid, localVidMap, eTable) } @@ -437,11 +436,9 @@ object GraphImpl { RDD[(Pid, Array[VD])] = { // Join vid2pid and vTable, generate a shuffle dependency on the joined // result, and get the shuffle id so we can use it on the slave. - val msgsByPartition = vTable.zipJoin(vid2pid) - .flatMap { case (vid, (vdata, pids)) => - pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } - } - .partitionBy(replicationMap.partitioner.get).cache() + val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) => + pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } + }.partitionBy(replicationMap.partitioner.get).cache() replicationMap.zipPartitions(msgsByPartition){ (mapIter, msgsIter) => diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index 8d0b2e0b02..0fb101a08c 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -4,6 +4,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ import org.apache.spark.graph.LocalSparkContext._ @@ -58,8 +59,9 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb) val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb) - val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) - .map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum + val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) => + if (pr1 != pr2) { 1 } else { 0 } + }.map { case (vid, test) => test }.sum assert(notMatching === 0) prGraph2.vertices.foreach(println(_)) val errors = prGraph2.vertices.map{ case (vid, pr) => @@ -70,10 +72,12 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { assert(errors.sum === 0) val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb) - val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices).map{ - case (_, (pr1, Some(pr2))) if(pr1 == pr2) => 0 - case _ => 1 - }.sum + val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices){ (vid, pr1, pr2Opt) => + pr2Opt match { + case Some(pr2) if(pr1 == pr2) => 0 + case _ => 1 + } + }.map { case (vid, test) => test }.sum assert(errors2 === 0) } } // end of test Star PageRank @@ -86,19 +90,17 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { val resetProb = 0.15 val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache() val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache() - val error = prGraph1.vertices.zipJoin(prGraph2.vertices).map { - case (id, (a, b)) => (a - b) * (a - b) - }.sum - prGraph1.vertices.zipJoin(prGraph2.vertices) - .map{ case (id, (a,b)) => (id, (a,b, a-b))}.foreach(println(_)) + val error = prGraph1.vertices.zipJoin(prGraph2.vertices) { case (id, a, b) => (a - b) * (a - b) } + .map { case (id, error) => error }.sum + prGraph1.vertices.zipJoin(prGraph2.vertices) { (id, a, b) => (a, b, a-b) }.foreach(println(_)) println(error) assert(error < 1.0e-5) - val pr3 = sc.parallelize(GridPageRank(10,10, 50, resetProb)) - val error2 = prGraph1.vertices.leftJoin(pr3).map { - case (id, (a, Some(b))) => (a - b) * (a - b) - case _ => 0 - }.sum - prGraph1.vertices.leftJoin(pr3).foreach(println( _ )) + val pr3: RDD[(Vid, Double)] = sc.parallelize(GridPageRank(10,10, 50, resetProb)) + val error2 = prGraph1.vertices.leftJoin(pr3) { (id, a, bOpt) => + val b: Double = bOpt.get + (a - b) * (a - b) + }.map { case (id, error) => error }.sum + prGraph1.vertices.leftJoin(pr3) { (id, a, b) => (a, b) }.foreach( println(_) ) println(error2) assert(error2 < 1.0e-5) } diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 2067b1613e..ec548bda16 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -78,13 +78,13 @@ class GraphSuite extends FunSuite with LocalSparkContext { val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5) val b = VertexSetRDD(a).mapValues(x => -x) assert(b.count === 101) - assert(b.leftJoin(a).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0) + assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0) val c = VertexSetRDD(a, b.index) - assert(b.leftJoin(c).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0) + assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0) val d = c.filter(q => ((q._2 % 2) == 0)) val e = a.filter(q => ((q._2 % 2) == 0)) assert(d.count === e.count) - assert(b.zipJoin(c).mapValues(x => x._1 + x._2).map(x => x._2).reduce(_+_) === 0) + assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0) } }