diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index 9a1ebbcdcc..ee368ebb41 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -45,20 +45,18 @@ class EdgeRDD[@specialized ED: ClassManifest]( def mapEdgePartitions[ED2: ClassManifest](f: EdgePartition[ED] => EdgePartition[ED2]) : EdgeRDD[ED2]= { - val cleanF = sparkContext.clean(f) new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => val (pid, ep) = iter.next() - Iterator(Tuple2(pid, cleanF(ep))) + Iterator(Tuple2(pid, f(ep))) }, preservesPartitioning = true)) } def zipEdgePartitions[T: ClassManifest, U: ClassManifest] (other: RDD[T]) (f: (EdgePartition[ED], Iterator[T]) => Iterator[U]): RDD[U] = { - val cleanF = sparkContext.clean(f) partitionsRDD.zipPartitions(other, preservesPartitioning = true) { (ePartIter, otherIter) => val (_, edgePartition) = ePartIter.next() - cleanF(edgePartition, otherIter) + f(edgePartition, otherIter) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala index 1f92233df9..76768489ee 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -13,7 +13,7 @@ import org.apache.spark.graph.impl.VertexPartition * tried specializing I got a warning about inherenting from a type * that is not a trait. */ -class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] { +class EdgeTriplet[VD, ED] extends Edge[ED] { // class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest, // @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] { @@ -28,8 +28,8 @@ class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] { */ var dstAttr: VD = _ //nullValue[VD] - def srcMask: Boolean = vPart.isDefined(srcId) - def dstMask: Boolean = vPart.isDefined(dstId) + var srcStale: Boolean = false + var dstStale: Boolean = false /** * Set the edge properties of this triplet. diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index d31d9dead0..0dc5ec8b24 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -227,6 +227,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * be commutative and assosciative and is used to combine the output * of the map phase. * + * @param activeSet optionally, a set of "active" vertices and a direction of edges to consider + * when running `mapFunc`. For example, if the direction is Out, `mapFunc` will only be run on + * edges originating from vertices in the active set. `activeSet` must have the same index as the + * graph's vertices. + * * @example We can use this function to compute the inDegree of each * vertex * {{{ @@ -244,7 +249,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { */ def mapReduceTriplets[A: ClassManifest]( mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)], - reduceFunc: (A, A) => A) + reduceFunc: (A, A) => A, + activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) : VertexRDD[A] /** @@ -280,8 +286,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { (mapFunc: (Vid, VD, Option[U]) => VD2) : Graph[VD2, ED] - def deltaJoinVertices(changedVerts: VertexRDD[VD]): Graph[VD, ED] - // Save a copy of the GraphOps object so there is always one unique GraphOps object // for a given Graph object, and thus the lazy vals in GraphOps would work as intended. val ops = new GraphOps(this) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index ee74a36c21..7b9eb88b7b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -2,7 +2,6 @@ package org.apache.spark.graph import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.apache.spark.util.ClosureCleaner import org.apache.spark.SparkException @@ -116,9 +115,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { dir: EdgeDirection) : VertexRDD[A] = { - ClosureCleaner.clean(mapFunc) - ClosureCleaner.clean(reduceFunc) - // Define a new map function over edge triplets val mf = (et: EdgeTriplet[VD,ED]) => { // Compute the message to the dst vertex @@ -140,7 +136,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { } } - ClosureCleaner.clean(mf) graph.mapReduceTriplets(mf, reduceFunc) } // end of aggregateNeighbors @@ -233,7 +228,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { */ def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD) : Graph[VD, ED] = { - ClosureCleaner.clean(mapFunc) val uf = (id: Vid, data: VD, o: Option[U]) => { o match { case Some(u) => mapFunc(id, data, u) diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 9fb1d3fd8c..285e857b69 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -91,29 +91,22 @@ object Pregel { mergeMsg: (A, A) => A) : Graph[VD, ED] = { - def sendMsgFun(edge: EdgeTriplet[VD, ED]): Iterator[(Vid, A)] = { - if (edge.srcMask) { - sendMsg(edge) - } else { - Iterator.empty - } - } - var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ) // compute the messages - var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache() + var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache() var activeMessages = messages.count() // Loop var i = 0 while (activeMessages > 0 && i < maxIterations) { - // receive the messages - val changedVerts = g.vertices.deltaJoin(messages)(vprog).cache() // updating the vertices - // replicate the changed vertices - g = g.deltaJoinVertices(changedVerts) + // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. + val newVerts = g.vertices.innerJoin(messages)(vprog).cache() + // Update the graph with the new vertices. + g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) } val oldMessages = messages - // compute the messages - messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache() + // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't + // get to send messages. + messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache() activeMessages = messages.count() // after counting we can unpersist the old messages oldMessages.unpersist(blocking=false) diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala index 33e0b892fb..90ac6dc61d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala @@ -60,7 +60,7 @@ class VertexRDD[@specialized VD: ClassManifest]( /** * Construct a new VertexRDD that is indexed by only the keys in the RDD. - * The resulting VertexSet will be based on a different index and can + * The resulting VertexRDD will be based on a different index and can * no longer be quickly joined with this RDD. */ def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex())) @@ -113,8 +113,7 @@ class VertexRDD[@specialized VD: ClassManifest]( */ def mapVertexPartitions[VD2: ClassManifest](f: VertexPartition[VD] => VertexPartition[VD2]) : VertexRDD[VD2] = { - val cleanF = sparkContext.clean(f) - val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(cleanF), preservesPartitioning = true) + val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) new VertexRDD(newPartitionsRDD) } @@ -122,16 +121,15 @@ class VertexRDD[@specialized VD: ClassManifest]( * Return a new VertexRDD by applying a function to corresponding * VertexPartitions of this VertexRDD and another one. */ - private def zipVertexPartitions[VD2: ClassManifest, VD3: ClassManifest] + def zipVertexPartitions[VD2: ClassManifest, VD3: ClassManifest] (other: VertexRDD[VD2]) (f: (VertexPartition[VD], VertexPartition[VD2]) => VertexPartition[VD3]): VertexRDD[VD3] = { - val cleanF = sparkContext.clean(f) val newPartitionsRDD = partitionsRDD.zipPartitions( other.partitionsRDD, preservesPartitioning = true ) { (thisIter, otherIter) => val thisPart = thisIter.next() val otherPart = otherIter.next() - Iterator(cleanF(thisPart, otherPart)) + Iterator(f(thisPart, otherPart)) } new VertexRDD(newPartitionsRDD) } @@ -159,7 +157,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * * @param f the function applied to each value in the RDD * @return a new VertexRDD with values obtained by applying `f` to - * each of the entries in the original VertexSet. The resulting + * each of the entries in the original VertexRDD. The resulting * VertexRDD retains the same index. */ def mapValues[VD2: ClassManifest](f: VD => VD2): VertexRDD[VD2] = @@ -173,12 +171,16 @@ class VertexRDD[@specialized VD: ClassManifest]( * * @param f the function applied to each value in the RDD * @return a new VertexRDD with values obtained by applying `f` to - * each of the entries in the original VertexSet. The resulting + * each of the entries in the original VertexRDD. The resulting * VertexRDD retains the same index. */ def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) + /** + * Hides vertices that are the same between this and other. For vertices that are different, keeps + * the values from `other`. + */ def diff(other: VertexRDD[VD]): VertexRDD[VD] = { this.zipVertexPartitions(other) { (thisPart, otherPart) => thisPart.diff(otherPart) @@ -199,24 +201,14 @@ class VertexRDD[@specialized VD: ClassManifest]( * this and the other vertex set to a new vertex attribute. * @return a VertexRDD containing only the vertices in both this * and the other VertexSet and with tuple attributes. - * */ def zipJoin[VD2: ClassManifest, VD3: ClassManifest] - (other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD3): VertexRDD[VD3] = - { + (other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD3): VertexRDD[VD3] = { this.zipVertexPartitions(other) { (thisPart, otherPart) => thisPart.join(otherPart)(f) } } - def deltaJoin[VD2: ClassManifest] - (other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD): VertexRDD[VD] = - { - this.zipVertexPartitions(other) { (thisPart, otherPart) => - thisPart.deltaJoin(otherPart)(f) - } - } - /** * Left join this VertexSet with another VertexSet which has the * same Index. This function will fail if both VertexSets do not @@ -236,31 +228,30 @@ class VertexRDD[@specialized VD: ClassManifest]( * */ def leftZipJoin[VD2: ClassManifest, VD3: ClassManifest] - (other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = - { + (other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { this.zipVertexPartitions(other) { (thisPart, otherPart) => thisPart.leftJoin(otherPart)(f) } } /** - * Left join this VertexSet with an RDD containing vertex attribute - * pairs. If the other RDD is backed by a VertexSet with the same + * Left join this VertexRDD with an RDD containing vertex attribute + * pairs. If the other RDD is backed by a VertexRDD with the same * index than the efficient leftZipJoin implementation is used. The * resulting vertex set contains an entry for each vertex in this - * set. If the other VertexSet is missing any vertex in this - * VertexSet then a `None` attribute is generated. + * set. If the other VertexRDD is missing any vertex in this + * VertexRDD then a `None` attribute is generated. * * If there are duplicates, the vertex is picked at random. * - * @tparam VD2 the attribute type of the other VertexSet - * @tparam VD3 the attribute type of the resulting VertexSet + * @tparam VD2 the attribute type of the other VertexRDD + * @tparam VD3 the attribute type of the resulting VertexRDD * - * @param other the other VertexSet with which to join. + * @param other the other VertexRDD with which to join. * @param f the function mapping a vertex id and its attributes in * this and the other vertex set to a new vertex attribute. * @return a VertexRDD containing all the vertices in this - * VertexSet with the attribute emitted by f. + * VertexRDD with the attribute emitted by f. */ def leftJoin[VD2: ClassManifest, VD3: ClassManifest] (other: RDD[(Vid, VD2)]) @@ -284,13 +275,47 @@ class VertexRDD[@specialized VD: ClassManifest]( } } + /** + * Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other` + * must have the same index. + */ + def innerZipJoin[U: ClassManifest, VD2: ClassManifest](other: VertexRDD[U]) + (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = { + this.zipVertexPartitions(other) { (thisPart, otherPart) => + thisPart.innerJoin(otherPart)(f) + } + } + + /** + * Replace vertices with corresponding vertices in `other`, and drop vertices without a + * corresponding vertex in `other`. + */ + def innerJoin[U: ClassManifest, VD2: ClassManifest](other: RDD[(Vid, U)]) + (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = { + // Test if the other vertex is a VertexRDD to choose the optimal join strategy. + // If the other set is a VertexRDD then we use the much more efficient innerZipJoin + other match { + case other: VertexRDD[_] => + innerZipJoin(other)(f) + case _ => + new VertexRDD( + partitionsRDD.zipPartitions( + other.partitionBy(this.partitioner.get), preservesPartitioning = true) + { (part, msgs) => + val vertexPartition: VertexPartition[VD] = part.next() + Iterator(vertexPartition.innerJoin(msgs)(f)) + } + ) + } + } + def aggregateUsingIndex[VD2: ClassManifest]( messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => - val vertextPartition: VertexPartition[VD] = thisIter.next() - Iterator(vertextPartition.aggregateUsingIndex(msgIter, reduceFunc)) + val vertexPartition: VertexPartition[VD] = thisIter.next() + Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc)) } new VertexRDD[VD2](parts) } @@ -299,7 +324,7 @@ class VertexRDD[@specialized VD: ClassManifest]( /** - * The VertexRDD singleton is used to construct VertexSets + * The VertexRDD singleton is used to construct VertexRDDs */ object VertexRDD { diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala index bb92e7c767..f77dffd7b4 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala @@ -166,24 +166,29 @@ object PageRank extends Logging { .mapVertices((vid, degree) => resetProb).cache() var numDeltas: Long = ranks.count() + var prevDeltas: Option[VertexRDD[Double]] = None + var i = 0 val weight = (1.0 - resetProb) while (numDeltas > 0) { - // Compute new deltas + // Compute new deltas. Only deltas that existed in the last round (i.e., were greater than + // `tol`) get to send messages; those that were less than `tol` would send messages less than + // `tol` as well. val deltas = deltaGraph .mapReduceTriplets[Double]( - et => { - if (et.srcMask) Iterator((et.dstId, et.srcAttr * et.attr * weight)) - else Iterator.empty - }, - _ + _) + et => Iterator((et.dstId, et.srcAttr * et.attr * weight)), + _ + _, + prevDeltas.map((_, EdgeDirection.Out))) .filter { case (vid, delta) => delta > tol } .cache() + prevDeltas = Some(deltas) numDeltas = deltas.count() logInfo("Standalone PageRank: iter %d has %d deltas".format(i, numDeltas)) - // Apply deltas. Sets the mask for each vertex to false if it does not appear in deltas. - deltaGraph = deltaGraph.deltaJoinVertices(deltas).cache() + // Update deltaGraph with the deltas + deltaGraph = deltaGraph.outerJoinVertices(deltas) { (vid, old, newOpt) => + newOpt.getOrElse(old) + }.cache() // Update ranks ranks = ranks.leftZipJoin(deltas) { (vid, oldRank, deltaOpt) => diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index ba2ebe6497..08bef82150 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -47,21 +47,20 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( this(vertices, edges, new VertexPlacement(edges, vertices)) } - /** Return a RDD that brings edges with its source and destination vertices together. */ + /** Return a RDD that brings edges together with their source and destination vertices. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { val vdManifest = classManifest[VD] val edManifest = classManifest[ED] - edges.zipEdgePartitions(vTableReplicated.bothAttrs) { (edgePartition, vTableReplicatedIter) => - val (_, vPart) = vTableReplicatedIter.next() - new EdgeTripletIterator(vPart.index, vPart.values, edgePartition)(vdManifest, edManifest) + edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (ePart, vPartIter) => + val (_, vPart) = vPartIter.next() + new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest) } } override def persist(newLevel: StorageLevel): Graph[VD, ED] = { vertices.persist(newLevel) edges.persist(newLevel) - vertexPlacement.persist(newLevel) this } @@ -149,29 +148,38 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( println("\n\nvertexPlacement.bothAttrs -------------------------------") traverseLineage(vertexPlacement.bothAttrs, " ", visited) visited += (vertexPlacement.bothAttrs.id -> "vertexPlacement.bothAttrs") - println("\n\nvTableReplicated.bothAttrs ----------------") - traverseLineage(vTableReplicated.bothAttrs, " ", visited) - visited += (vTableReplicated.bothAttrs.id -> "vTableReplicated.bothAttrs") println("\n\ntriplets ----------------------------------------") traverseLineage(triplets, " ", visited) println(visited) } // end of printLineage override def reverse: Graph[VD, ED] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement) + new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement, vTableReplicated) - override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = - new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement) + override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { + if (classManifest[VD] equals classManifest[VD2]) { + // The map preserves type, so we can use incremental replication + val newVerts = vertices.mapVertexPartitions(_.map(f)) + val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) + val newVTableReplicated = new VTableReplicated[VD2]( + changedVerts, edges, vertexPlacement, + Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) + new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) + } else { + // The map does not preserve type, so we must re-replicate all vertices + new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement) + } + } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement) + new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement, vTableReplicated) override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit // manifest from GraphImpl (which would require serializing GraphImpl). val vdManifest = classManifest[VD] val newETable = - edges.zipEdgePartitions(vTableReplicated.bothAttrs) { (edgePartition, vTableReplicatedIter) => + edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (edgePartition, vTableReplicatedIter) => val (pid, vPart) = vTableReplicatedIter.next() val et = new EdgeTriplet[VD, ED] val newEdgePartition = edgePartition.map { e => @@ -182,7 +190,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } Iterator((pid, newEdgePartition)) } - new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement) + new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement, vTableReplicated) } override def subgraph( @@ -210,7 +218,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { ClosureCleaner.clean(merge) val newETable = edges.mapEdgePartitions(_.groupEdges(merge)) - new GraphImpl(vertices, newETable, vertexPlacement) + new GraphImpl(vertices, newETable, vertexPlacement, vTableReplicated) } ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -219,7 +227,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def mapReduceTriplets[A: ClassManifest]( mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)], - reduceFunc: (A, A) => A): VertexRDD[A] = { + reduceFunc: (A, A) => A, + activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = { ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(reduceFunc) @@ -228,23 +237,42 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // in the relevant position in an edge. val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr") val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr") - val vs = vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr) + val vs = activeSetOpt match { + case Some((activeSet, _)) => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet) + case None => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr) + } + val activeDirectionOpt = activeSetOpt.map(_._2) // Map and combine. val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) => val (_, vertexPartition) = vTableReplicatedIter.next() // Iterate over the partition - val et = new EdgeTriplet[VD, ED](vertexPartition) + val et = new EdgeTriplet[VD, ED] val filteredEdges = edgePartition.iterator.flatMap { e => - et.set(e) - if (mapUsesSrcAttr) { - et.srcAttr = vertexPartition(e.srcId) + // Ensure the edge is adjacent to a vertex in activeSet if necessary + val adjacent = activeDirectionOpt match { + case Some(EdgeDirection.In) => + vertexPartition.isActive(e.dstId) + case Some(EdgeDirection.Out) => + vertexPartition.isActive(e.srcId) + case Some(EdgeDirection.Both) => + vertexPartition.isActive(e.srcId) && vertexPartition.isActive(e.dstId) + case None => + true } - if (mapUsesDstAttr) { - et.dstAttr = vertexPartition(e.dstId) + if (adjacent) { + et.set(e) + if (mapUsesSrcAttr) { + et.srcAttr = vertexPartition(e.srcId) + } + if (mapUsesDstAttr) { + et.dstAttr = vertexPartition(e.dstId) + } + mapFunc(et) + } else { + Iterator.empty } - mapFunc(et) } // Note: This doesn't allow users to send messages to arbitrary vertices. vertexPartition.aggregateUsingIndex(filteredEdges, reduceFunc).iterator @@ -255,22 +283,20 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } // end of mapReduceTriplets override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest] - (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = { - ClosureCleaner.clean(updateF) - val newVTable = vertices.leftJoin(updates)(updateF) - new GraphImpl(newVTable, edges, vertexPlacement) - } - - override def deltaJoinVertices(changedVerts: VertexRDD[VD]): Graph[VD, ED] = { - val newVerts = vertices.leftZipJoin(changedVerts) { (vid, oldAttr, newAttrOpt) => - newAttrOpt match { - case Some(newAttr) => newAttr - case None => oldAttr - } + (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = { + if (classManifest[VD] equals classManifest[VD2]) { + // updateF preserves type, so we can use incremental replication + val newVerts = vertices.leftJoin(updates)(updateF) + val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) + val newVTableReplicated = new VTableReplicated[VD2]( + changedVerts, edges, vertexPlacement, + Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) + new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) + } else { + // updateF does not preserve type, so we must re-replicate all vertices + val newVerts = vertices.leftJoin(updates)(updateF) + new GraphImpl(newVerts, edges, vertexPlacement) } - val newVTableReplicated = new VTableReplicated( - changedVerts, edges, vertexPlacement, Some(vTableReplicated)) - new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) } private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala index 6cbeb50186..161c98f158 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala @@ -7,26 +7,47 @@ import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} import org.apache.spark.graph._ /** - * Stores the vertex attribute values after they are replicated. + * A view of the vertices after they are shipped to the join sites specified in + * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated` + * is specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, + * a fresh view is created. + * + * The view is always cached (i.e., once it is created, it remains materialized). This avoids + * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for + * example. */ private[impl] class VTableReplicated[VD: ClassManifest]( - vTable: VertexRDD[VD], - eTable: EdgeRDD[_], + updatedVerts: VertexRDD[VD], + edges: EdgeRDD[_], vertexPlacement: VertexPlacement, prevVTableReplicated: Option[VTableReplicated[VD]] = None) { - val bothAttrs: RDD[(Pid, VertexPartition[VD])] = - createVTableReplicated(vTable, eTable, vertexPlacement, true, true) + /** + * Within each edge partition, create a local map from vid to an index into the attribute + * array. Each map contains a superset of the vertices that it will receive, because it stores + * vids from both the source and destination of edges. It must always include both source and + * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. + */ + private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match { + case Some(prev) => + prev.localVidMap + case None => + edges.partitionsRDD.mapPartitions(_.map { + case (pid, epart) => + val vidToIndex = new VertexIdToIndexMap + epart.foreach { e => + vidToIndex.add(e.srcId) + vidToIndex.add(e.dstId) + } + (pid, vidToIndex) + }, preservesPartitioning = true).cache().setName("VTableReplicated localVidMap") + } - val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = - createVTableReplicated(vTable, eTable, vertexPlacement, true, false) - - val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = - createVTableReplicated(vTable, eTable, vertexPlacement, false, true) - - val noAttrs: RDD[(Pid, VertexPartition[VD])] = - createVTableReplicated(vTable, eTable, vertexPlacement, false, false) + private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true) + private lazy val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(true, false) + private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true) + private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false) def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = { (includeSrc, includeDst) match { @@ -37,57 +58,60 @@ class VTableReplicated[VD: ClassManifest]( } } - private def createVTableReplicated( - vTable: VertexRDD[VD], - eTable: EdgeRDD[_], - vertexPlacement: VertexPlacement, - includeSrcAttr: Boolean, - includeDstAttr: Boolean): RDD[(Pid, VertexPartition[VD])] = { + def get( + includeSrc: Boolean, + includeDst: Boolean, + actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = { - val placement = vertexPlacement.get(includeSrcAttr, includeDstAttr) + // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and + // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be + // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is + // also shipped there. + val shippedActives = vertexPlacement.get(true, true) + .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _)) + .partitionBy(edges.partitioner.get) + // Update vTableReplicated with shippedActives, setting activeness flags in the resulting + // VertexPartitions + get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) => + val (pid, vPart) = viewIter.next() + val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator)) + Iterator((pid, newPart)) + } + } + + private def create(includeSrc: Boolean, includeDst: Boolean) + : RDD[(Pid, VertexPartition[VD])] = { val vdManifest = classManifest[VD] - // Send each edge partition the vertex attributes it wants, as specified in - // vertexPlacement - val msgsByPartition = placement.zipPartitions(vTable.partitionsRDD)(VTableReplicated.buildBuffer(_, _)(vdManifest)) - .partitionBy(eTable.partitioner.get).cache() + // Ship vertex attributes to edge partitions according to vertexPlacement + val verts = updatedVerts.partitionsRDD + val shippedVerts = vertexPlacement.get(includeSrc, includeDst) + .zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest)) + .partitionBy(edges.partitioner.get) // TODO: Consider using a specialized shuffler. prevVTableReplicated match { case Some(vTableReplicated) => - val prev: RDD[(Pid, VertexPartition[VD])] = - vTableReplicated.get(includeSrcAttr, includeDstAttr) + val prevView: RDD[(Pid, VertexPartition[VD])] = + vTableReplicated.get(includeSrc, includeDst) - prev.zipPartitions(msgsByPartition) { (vTableIter, msgsIter) => - val (pid, vertexPartition) = vTableIter.next() - val newVPart = vertexPartition.updateUsingIndex(msgsIter.flatMap(_._2.iterator))(vdManifest) + // Update vTableReplicated with shippedVerts, setting staleness flags in the resulting + // VertexPartitions + prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) => + val (pid, prevVPart) = prevViewIter.next() + val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) Iterator((pid, newVPart)) - }.cache().setName("VTableReplicated delta %s %s".format(includeSrcAttr, includeDstAttr)) + }.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst)) case None => - // Within each edge partition, create a local map from vid to an index into - // the attribute array. Each map contains a superset of the vertices that it - // will receive, because it stores vids from both the source and destination - // of edges. It must always include both source and destination vids because - // some operations, such as GraphImpl.mapReduceTriplets, rely on this. - val localVidMap = eTable.partitionsRDD.mapPartitions(_.map { - case (pid, epart) => - val vidToIndex = new VertexIdToIndexMap - epart.foreach { e => - vidToIndex.add(e.srcId) - vidToIndex.add(e.dstId) - } - (pid, vidToIndex) - }, preservesPartitioning = true).cache() - - // Within each edge partition, place the vertex attributes received from - // msgsByPartition into the correct locations specified in localVidMap - localVidMap.zipPartitions(msgsByPartition) { (mapIter, msgsIter) => + // Within each edge partition, place the shipped vertex attributes into the correct + // locations specified in localVidMap + localVidMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => val (pid, vidToIndex) = mapIter.next() assert(!mapIter.hasNext) // Populate the vertex array using the vidToIndex map val vertexArray = vdManifest.newArray(vidToIndex.capacity) - for ((_, block) <- msgsIter) { + for ((_, block) <- shippedVertsIter) { for (i <- 0 until block.vids.size) { val vid = block.vids(i) val attr = block.attrs(i) @@ -95,17 +119,18 @@ class VTableReplicated[VD: ClassManifest]( vertexArray(ind) = attr } } - Iterator((pid, new VertexPartition(vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest))) - }.cache().setName("VTableReplicated %s %s".format(includeSrcAttr, includeDstAttr)) + val newVPart = new VertexPartition( + vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest) + Iterator((pid, newVPart)) + }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst)) } } - } - object VTableReplicated { - - def buildBuffer[VD: ClassManifest](pid2vidIter: Iterator[Array[Array[Vid]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { + protected def buildBuffer[VD: ClassManifest]( + pid2vidIter: Iterator[Array[Array[Vid]]], + vertexPartIter: Iterator[VertexPartition[VD]]) = { val pid2vid: Array[Array[Vid]] = pid2vidIter.next() val vertexPart: VertexPartition[VD] = vertexPartIter.next() @@ -126,6 +151,29 @@ object VTableReplicated { (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array)) } } + + protected def buildActiveBuffer( + pid2vidIter: Iterator[Array[Array[Vid]]], + activePartIter: Iterator[VertexPartition[_]]) + : Iterator[(Int, Array[Vid])] = { + val pid2vid: Array[Array[Vid]] = pid2vidIter.next() + val activePart: VertexPartition[_] = activePartIter.next() + + Iterator.tabulate(pid2vid.size) { pid => + val vidsCandidate = pid2vid(pid) + val size = vidsCandidate.length + val actives = new PrimitiveVector[Vid](vidsCandidate.size) + var i = 0 + while (i < size) { + val vid = vidsCandidate(i) + if (activePart.isDefined(vid)) { + actives += vid + } + i += 1 + } + (pid, actives.trim().array) + } + } } class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index fe005c8723..1c589c9b72 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -32,7 +32,9 @@ private[graph] class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( val index: VertexIdToIndexMap, val values: Array[VD], - val mask: BitSet) + val mask: BitSet, + /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */ + private val activeSet: Option[VertexSet] = None) extends Logging { val capacity: Int = index.capacity @@ -47,6 +49,11 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( pos >= 0 && mask.get(pos) } + /** Look up vid in activeSet, throwing an exception if it is None. */ + def isActive(vid: Vid): Boolean = { + activeSet.get.contains(vid) + } + /** * Pass each vertex attribute along with the vertex id through a map * function and retain the original RDD's partitioning and index. @@ -57,7 +64,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( * attribute in the RDD * * @return a new VertexPartition with values obtained by applying `f` to - * each of the entries in the original VertexSet. The resulting + * each of the entries in the original VertexRDD. The resulting * VertexPartition retains the same index. */ def map[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexPartition[VD2] = { @@ -94,19 +101,25 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( new VertexPartition(index, values, newMask) } + /** + * Hides vertices that are the same between this and other. For vertices that are different, keeps + * the values from `other`. The indices of `this` and `other` must be the same. + */ def diff(other: VertexPartition[VD]): VertexPartition[VD] = { - assert(index == other.index) - - val newMask = mask & other.mask - - var i = newMask.nextSetBit(0) - while (i >= 0) { - if (values(i) == other.values(i)) { - newMask.unset(i) + if (index != other.index) { + logWarning("Diffing two VertexPartitions with different indexes is slow.") + diff(createUsingIndex(other.iterator)) + } else { + val newMask = mask & other.mask + var i = newMask.nextSetBit(0) + while (i >= 0) { + if (values(i) == other.values(i)) { + newMask.unset(i) + } + i = newMask.nextSetBit(i + 1) } - i = mask.nextSetBit(i + 1) + new VertexPartition(index, other.values, newMask) } - new VertexPartition[VD](index, other.values, newMask) } /** Inner join another VertexPartition. */ @@ -130,30 +143,6 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( } } - /** Inner join another VertexPartition. */ - def deltaJoin[VD2: ClassManifest, VD3: ClassManifest] - (other: VertexPartition[VD2]) - (f: (Vid, VD, VD2) => VD3): VertexPartition[VD3] = - { - if (index != other.index) { - logWarning("Joining two VertexPartitions with different indexes is slow.") - join(createUsingIndex(other.iterator))(f) - } else { - val newValues = new Array[VD3](capacity) - val newMask = mask & other.mask - - var i = newMask.nextSetBit(0) - while (i >= 0) { - newValues(i) = f(index.getValue(i), values(i), other.values(i)) - if (newValues(i) == values(i)) { - newMask.unset(i) - } - i = mask.nextSetBit(i + 1) - } - new VertexPartition(index, newValues, newMask) - } - } - /** Left outer join another VertexPartition. */ def leftJoin[VD2: ClassManifest, VD3: ClassManifest] (other: VertexPartition[VD2]) @@ -181,6 +170,32 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( leftJoin(createUsingIndex(other))(f) } + /** Inner join another VertexPartition. */ + def innerJoin[U: ClassManifest, VD2: ClassManifest](other: VertexPartition[U]) + (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = { + if (index != other.index) { + logWarning("Joining two VertexPartitions with different indexes is slow.") + innerJoin(createUsingIndex(other.iterator))(f) + } + val newMask = mask & other.mask + val newValues = new Array[VD2](capacity) + var i = newMask.nextSetBit(0) + while (i >= 0) { + newValues(i) = f(index.getValue(i), values(i), other.values(i)) + i = newMask.nextSetBit(i + 1) + } + new VertexPartition(index, newValues, newMask) + } + + /** + * Inner join an iterator of messages. + */ + def innerJoin[U: ClassManifest, VD2: ClassManifest] + (iter: Iterator[Product2[Vid, U]]) + (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = { + innerJoin(createUsingIndex(iter))(f) + } + /** * Similar effect as aggregateUsingIndex((a, b) => a) */ @@ -196,17 +211,20 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( new VertexPartition[VD2](index, newValues, newMask) } - def updateUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]]) - : VertexPartition[VD2] = { + /** + * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in + * the partition, hidden by the bitmask. + */ + def innerJoinKeepLeft(iter: Iterator[Product2[Vid, VD]]): VertexPartition[VD] = { val newMask = new BitSet(capacity) - val newValues = new Array[VD2](capacity) + val newValues = new Array[VD](capacity) System.arraycopy(values, 0, newValues, 0, newValues.length) iter.foreach { case (vid, vdata) => val pos = index.getPos(vid) newMask.set(pos) newValues(pos) = vdata } - new VertexPartition[VD2](index, newValues, newMask) + new VertexPartition(index, newValues, newMask) } def aggregateUsingIndex[VD2: ClassManifest]( @@ -228,6 +246,12 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( new VertexPartition[VD2](index, newValues, newMask) } + def replaceActives(iter: Iterator[Vid]): VertexPartition[VD] = { + val newActiveSet = new VertexSet + iter.foreach(newActiveSet.add(_)) + new VertexPartition(index, values, mask, Some(newActiveSet)) + } + /** * Construct a new VertexPartition whose index contains only the vertices in the mask. */ @@ -241,4 +265,6 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( } def iterator: Iterator[(Vid, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind))) + + def vidIterator: Iterator[Vid] = mask.iterator.map(ind => index.getValue(ind)) } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala index 24fdf0db45..44a0a05f74 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala @@ -7,8 +7,10 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.collection.PrimitiveVector /** - * Stores the layout of replicated vertex attributes for GraphImpl. Tells each - * partition of the vertex data where it should go. + * Stores the locations of edge-partition join sites for each vertex attribute in `vTable`; that is, + * the routing information for shipping vertex attributes to edge partitions. This is always cached + * because it may be used multiple times in VTableReplicated -- once to ship the vertex attributes + * and (possibly) once to ship the active-set information. */ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) { @@ -25,13 +27,6 @@ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) { case (false, false) => noAttrs } - def persist(newLevel: StorageLevel) { - bothAttrs.persist(newLevel) - srcAttrOnly.persist(newLevel) - dstAttrOnly.persist(newLevel) - noAttrs.persist(newLevel) - } - private def createPid2Vid( includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 514d20b76c..68a171b12f 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -35,12 +35,38 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("core operations") { + withSpark(new SparkContext("local", "test")) { sc => + val n = 5 + val star = Graph.fromEdgeTuples( + sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v") + // triplets + assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet === + (1 to n).map(x => (0: Vid, x: Vid, "v", "v")).toSet) + // reverse + val reverseStar = star.reverse + assert(reverseStar.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet) + // outerJoinVertices + val reverseStarDegrees = + reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) } + val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( + et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), + (a: Int, b: Int) => a + b).collect.toSet + assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0))) + // mapVertices preserving type + val mappedVAttrs = reverseStar.mapVertices((vid, attr) => attr + "2") + assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: Vid, "v2")).toSet) + // mapVertices changing type + val mappedVAttrs2 = reverseStar.mapVertices((vid, attr) => attr.length) + assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet) + } + } + test("mapEdges") { withSpark(new SparkContext("local", "test")) { sc => val n = 3 val star = Graph.fromEdgeTuples( - sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), - "defaultValue") + sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v") val starWithEdgeAttrs = star.mapEdges(e => e.dstId) // map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25 @@ -52,13 +78,42 @@ class GraphSuite extends FunSuite with LocalSparkContext { test("mapReduceTriplets") { withSpark(new SparkContext("local", "test")) { sc => - val n = 3 + val n = 5 val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0) val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg } val neighborDegreeSums = starDeg.mapReduceTriplets( edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)), (a: Int, b: Int) => a + b) assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) + + // activeSetOpt + val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid) + val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) + val vids = complete.mapVertices((vid, attr) => vid).cache() + val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } + val numEvenNeighbors = vids.mapReduceTriplets(et => { + // Map function should only run on edges with destination in the active set + if (et.dstId % 2 != 0) { + throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId)) + } + Iterator((et.srcId, 1)) + }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet + assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet) + + // outerJoinVertices followed by mapReduceTriplets(activeSetOpt) + val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0) + .mapVertices((vid, attr) => vid).cache() + val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_) + val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) } + val numOddNeighbors = changedGraph.mapReduceTriplets(et => { + // Map function should only run on edges with source in the active set + if (et.srcId % 2 != 1) { + throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId)) + } + Iterator((et.dstId, 1)) + }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet + assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet) + } } @@ -151,34 +206,4 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet) } } - - test("deltaJoinVertices") { - withSpark(new SparkContext("local", "test")) { sc => - // Create a star graph of 10 vertices - val n = 10 - val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v1").cache() - - // Modify only vertices whose vids are even - val changedVerts = star.vertices.filter(_._1 % 2 == 0).mapValues((vid, attr) => "v2") - - // Apply the modification to the graph - val changedStar = star.deltaJoinVertices(changedVerts) - - val newVertices = star.vertices.leftZipJoin(changedVerts) { (vid, oldVd, newVdOpt) => - newVdOpt match { - case Some(newVd) => newVd - case None => oldVd - } - } - - // The graph's vertices should be correct - assert(changedStar.vertices.collect().toSet === newVertices.collect().toSet) - - // Send the leaf attributes to the center - val sums = changedStar.mapReduceTriplets( - edge => Iterator((edge.srcId, Set(edge.dstAttr))), - (a: Set[String], b: Set[String]) => a ++ b) - assert(sums.collect().toSet === Set((0, Set("v1", "v2")))) - } - } } diff --git a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala new file mode 100644 index 0000000000..0897d9783e --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala @@ -0,0 +1,43 @@ +package org.apache.spark.graph + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.graph.LocalSparkContext._ +import org.apache.spark.rdd._ + +class PregelSuite extends FunSuite with LocalSparkContext { + + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + + test("1 iteration") { + withSpark(new SparkContext("local", "test")) { sc => + val n = 5 + val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v") + val result = Pregel(star, 0)( + (vid, attr, msg) => attr, + et => Iterator.empty, + (a: Int, b: Int) => throw new Exception("mergeMsg run unexpectedly")) + assert(result.vertices.collect.toSet === star.vertices.collect.toSet) + } + } + + test("chain propagation") { + withSpark(new SparkContext("local", "test")) { sc => + val n = 5 + val chain = Graph.fromEdgeTuples( + sc.parallelize((1 until n).map(x => (x: Vid, x + 1: Vid)), 3), + 0).cache() + assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: Vid, 0)).toSet) + val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 } + assert(chainWithSeed.vertices.collect.toSet === Set((1: Vid, 1)) ++ (2 to n).map(x => (x: Vid, 0)).toSet) + val result = Pregel(chainWithSeed, 0)( + (vid, attr, msg) => math.max(msg, attr), + et => Iterator((et.dstId, et.srcAttr)), + (a: Int, b: Int) => math.max(a, b)) + assert(result.vertices.collect.toSet === + chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet) + } + } +}