diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index b80c4e32ad..86502182fb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -278,8 +278,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { (mapFunc: (Vid, VD, Option[U]) => VD2) : Graph[VD2, ED] - def deltaJoin[VD2: ClassManifest] - (updates: VertexRDD[VD2])(updateF: (Vid, VD, VD2) => VD): Graph[VD, ED] + def deltaJoinVertices(newVerts: VertexRDD[VD], changedVerts: VertexRDD[VD]): Graph[VD, ED] // Save a copy of the GraphOps object so there is always one unique GraphOps object // for a given Graph object, and thus the lazy vals in GraphOps would work as intended. diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 447a97b1ad..bf32d34cbd 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -103,7 +103,12 @@ object Pregel { // compute the messages val messages = g.mapReduceTriplets(sendMsg, mergeMsg) // broadcast & aggregation // receive the messages - g = g.deltaJoin(messages)(vprog).cache() // updating the graph + val newVerts = g.vertices.zipJoin(messages)(vprog).cache() // updating the vertices + val changedVerts = g.vertices.diff(newVerts) + println("Replicating %d changed vertices instead of %d total vertices".format( + changedVerts.count, newVerts.count)) + // replicate the changed vertices + g = graph.deltaJoinVertices(newVerts, changedVerts) // count the iteration i += 1 } diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala index a6f52a6e22..5afe2df0ca 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala @@ -177,6 +177,12 @@ class VertexRDD[@specialized VD: ClassManifest]( def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) + def diff(other: VertexRDD[VD]): VertexRDD[VD] = { + this.zipVertexPartitions(other) { (thisPart, otherPart) => + thisPart.diff(otherPart) + } + } + /** * Inner join this VertexSet with another VertexSet which has the * same Index. This function will fail if both VertexSets do not @@ -268,13 +274,6 @@ class VertexRDD[@specialized VD: ClassManifest]( } } - def deltaJoin[VD2: ClassManifest](other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD): VertexRDD[VD] = - { - this.zipVertexPartitions(other) { (thisPart, otherPart) => - thisPart.deltaJoin(otherPart)(f) - } - } - def aggregateUsingIndex[VD2: ClassManifest]( messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 73018afa4c..0ceb70eecc 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -28,25 +28,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vertices: VertexRDD[VD], @transient val edges: EdgeRDD[ED], @transient val vertexPlacement: VertexPlacement, - @transient val prevVTableReplicated: Option[VTableReplicated[VD]] = None) + @transient val vTableReplicated: VTableReplicated[VD]) extends Graph[VD, ED] { - //def this() = this(null, null, null, null) - def this( - vertices: RDD[VertexPartition[VD]], - edges: RDD[(Pid, EdgePartition[ED])], + vertices: VertexRDD[VD], + edges: EdgeRDD[ED], vertexPlacement: VertexPlacement) = { - this(new VertexRDD(vertices), new EdgeRDD(edges), vertexPlacement) + this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement)) } - @transient private val vTableReplicated: VTableReplicated[VD] = - new VTableReplicated(vertices, edges, vertexPlacement, prevVTableReplicated) - - /** Return a RDD of edges. */ -// @transient override val edges: RDD[Edge[ED]] = -// edges.mapPartitions(_.next()._2.iterator, true) - /** Return a RDD that brings edges with its source and destination vertices together. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { val vdManifest = classManifest[VD] @@ -269,11 +260,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( new GraphImpl(newVTable, edges, vertexPlacement) } - override def deltaJoin[VD2: ClassManifest] - (updates: VertexRDD[VD2])(updateF: (Vid, VD, VD2) => VD): Graph[VD, ED] = - { - val newVTable = vertices.deltaJoin(updates)(updateF) - new GraphImpl(newVTable, edges, vertexPlacement, Some(vTableReplicated)) + override def deltaJoinVertices( + newVerts: VertexRDD[VD], + changedVerts: VertexRDD[VD]): Graph[VD, ED] = { + val newVTableReplicated = new VTableReplicated( + changedVerts, edges, vertexPlacement, Some(vTableReplicated)) + new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) } } // end of class GraphImpl diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index f677b86ccc..ce93899ed1 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -89,6 +89,21 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( new VertexPartition(index, values, newMask) } + def diff(other: VertexPartition[VD]): VertexPartition[VD] = { + assert(index == other.index) + + val newMask = mask & other.mask + + var i = newMask.nextSetBit(0) + while (i >= 0) { + if (values(i) == other.values(i)) { + newMask.unset(i) + } + i = mask.nextSetBit(i + 1) + } + new VertexPartition[VD](index, other.values, newMask) + } + /** Inner join another VertexPartition. */ def join[VD2: ClassManifest, VD3: ClassManifest] (other: VertexPartition[VD2]) @@ -110,29 +125,6 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( } } - /** Inner join another VertexPartition, only keeping values that change. */ - def deltaJoin[VD2: ClassManifest] - (other: VertexPartition[VD2]) - (f: (Vid, VD, VD2) => VD): VertexPartition[VD] = - { - assert(index == other.index) - - val newValues = new Array[VD](capacity) - val newMask = mask & other.mask - - var i = newMask.nextSetBit(0) - while (i >= 0) { - newValues(i) = f(index.getValue(i), values(i), other.values(i)) - // Only set the mask if the value changes (we are using precise comparison here). - // TODO: Use delta comparison for double type. - if (newValues(i) == values(i)) { - newMask.unset(i) - } - i = mask.nextSetBit(i + 1) - } - new VertexPartition[VD](index, newValues, newMask) - } - /** Left outer join another VertexPartition. */ def leftJoin[VD2: ClassManifest, VD3: ClassManifest] (other: VertexPartition[VD2])