From 39b0256668c4f7806725fec751c477ea8c76cd84 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 5 Dec 2013 18:55:59 -0800 Subject: [PATCH] Use mask for dynamic Pregel --- .../org/apache/spark/graph/EdgeTriplet.scala | 17 +++--- .../org/apache/spark/graph/GraphLab.scala | 9 ---- .../scala/org/apache/spark/graph/Pregel.scala | 35 +++++------- .../apache/spark/graph/impl/GraphImpl.scala | 53 ++++++------------- .../spark/graph/impl/VTableReplicated.scala | 39 ++++++-------- .../spark/graph/impl/VertexPartition.scala | 15 ++++++ 6 files changed, 71 insertions(+), 97 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala index b8df94c419..1f92233df9 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -1,33 +1,38 @@ package org.apache.spark.graph +import org.apache.spark.graph.impl.VertexPartition + /** * An edge triplet represents two vertices and edge along with their * attributes. * * @tparam VD the type of the vertex attribute. * @tparam ED the type of the edge attribute - * + * * @todo specialize edge triplet for basic types, though when I last * tried specializing I got a warning about inherenting from a type * that is not a trait. */ -class EdgeTriplet[VD, ED] extends Edge[ED] { +class EdgeTriplet[VD, ED](vPart: VertexPartition[VD] = null) extends Edge[ED] { // class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest, // @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] { - + /** * The source vertex attribute */ - var srcAttr: VD = _ //nullValue[VD] + var srcAttr: VD = _ //nullValue[VD] /** * The destination vertex attribute */ - var dstAttr: VD = _ //nullValue[VD] + var dstAttr: VD = _ //nullValue[VD] + + def srcMask: Boolean = vPart.isDefined(srcId) + def dstMask: Boolean = vPart.isDefined(dstId) /** - * Set the edge properties of this triplet. + * Set the edge properties of this triplet. */ protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD,ED] = { srcId = other.srcId diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index bf1f4168da..856a9aca37 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -124,12 +124,3 @@ object GraphLab { activeGraph.mapVertices{case (vid, data) => data._2 } } } - - - - - - - - - diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index d38ed7da4a..ffbb6fe3ca 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -169,36 +169,29 @@ object Pregel { mergeMsg: (A, A) => A) : Graph[VD, ED] = { - def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = { - msgOpt match { - case Some(msg) => (vprog(id, attr._1, msg), true) - case None => (attr._1, false) - } - } - - def sendMsgFun(edge: EdgeTriplet[(VD,Boolean), ED]): Iterator[(Vid, A)] = { - if(edge.srcAttr._2) { - val et = new EdgeTriplet[VD, ED] - et.srcId = edge.srcId - et.srcAttr = edge.srcAttr._1 - et.dstId = edge.dstId - et.dstAttr = edge.dstAttr._1 - et.attr = edge.attr - sendMsg(et) + def sendMsgFun(edge: EdgeTriplet[VD, ED]): Iterator[(Vid, A)] = { + if (edge.srcMask) { + sendMsg(edge) } else { Iterator.empty } } - var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) ) + var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ) // compute the messages - var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache + var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache() var activeMessages = messages.count // Loop var i = 0 while (activeMessages > 0) { // receive the messages - g = g.outerJoinVertices(messages)(vprogFun) + val newVerts = g.vertices.zipJoin(messages)(vprog).cache() // updating the vertices + val changedVerts = g.vertices.diff(newVerts) + println("Replicating %d changed vertices instead of %d total vertices".format( + changedVerts.count, newVerts.count)) + // replicate the changed vertices + g = graph.deltaJoinVertices(newVerts, changedVerts) + val oldMessages = messages // compute the messages messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache @@ -208,8 +201,8 @@ object Pregel { // count the iteration i += 1 } - // Return the final graph - g.mapVertices((id, attr) => attr._1) + + g } // end of apply } // end of class Pregel diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 0ceb70eecc..6e9566e060 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -23,6 +23,9 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa * destinations. `vertexPlacement` specifies where each vertex will be * replicated. `vTableReplicated` stores the replicated vertex attributes, which * are co-partitioned with the relevant edges. + * + * mask in vertices means filter + * mask in vTableReplicated means active */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vertices: VertexRDD[VD], @@ -44,8 +47,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val edManifest = classManifest[ED] edges.zipEdgePartitions(vTableReplicated.bothAttrs) { (edgePartition, vTableReplicatedIter) => - val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next() - new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)(vdManifest, edManifest) + val (_, vPart) = vTableReplicatedIter.next() + new EdgeTripletIterator(vPart.index, vPart.values, edgePartition)(vdManifest, edManifest) } } @@ -141,14 +144,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val vdManifest = classManifest[VD] val newETable = edges.zipEdgePartitions(vTableReplicated.bothAttrs) { (edgePartition, vTableReplicatedIter) => - val (pid, (vidToIndex, vertexArray)) = vTableReplicatedIter.next() + val (pid, vPart) = vTableReplicatedIter.next() val et = new EdgeTriplet[VD, ED] - val vmap = new PrimitiveKeyOpenHashMap[Vid, VD]( - vidToIndex, vertexArray)(classManifest[Vid], vdManifest) val newEdgePartition = edgePartition.map { e => et.set(e) - et.srcAttr = vmap(e.srcId) - et.dstAttr = vmap(e.dstId) + et.srcAttr = vPart(e.srcId) + et.dstAttr = vPart(e.dstId) f(et) } Iterator((pid, newEdgePartition)) @@ -209,44 +210,22 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Map and combine. val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) => - val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next() - assert(vidToIndex.capacity == vertexArray.size) - val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)( - classManifest[Vid], vdManifest) + val (_, vertexPartition) = vTableReplicatedIter.next() - // Note: This doesn't allow users to send messages to arbitrary vertices. - val msgArray = new Array[A](vertexArray.size) - val msgBS = new BitSet(vertexArray.size) // Iterate over the partition - val et = new EdgeTriplet[VD, ED] - - edgePartition.foreach { e => + val et = new EdgeTriplet[VD, ED](vertexPartition) + val filteredEdges = edgePartition.iterator.flatMap { e => et.set(e) if (mapUsesSrcAttr) { - et.srcAttr = vmap(e.srcId) + et.srcAttr = vertexPartition(e.srcId) } if (mapUsesDstAttr) { - et.dstAttr = vmap(e.dstId) - } - // TODO(rxin): rewrite the foreach using a simple while loop to speed things up. - // Also given we are only allowing zero, one, or two messages, we can completely unroll - // the for loop. - mapFunc(et).foreach { case (vid, msg) => - // verify that the vid is valid - assert(vid == et.srcId || vid == et.dstId) - // Get the index of the key - val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK - // Populate the aggregator map - if (msgBS.get(ind)) { - msgArray(ind) = reduceFunc(msgArray(ind), msg) - } else { - msgArray(ind) = msg - msgBS.set(ind) - } + et.dstAttr = vertexPartition(e.dstId) } + mapFunc(et) } - // construct an iterator of tuples. Iterator[(Vid, A)] - msgBS.iterator.map { ind => (vidToIndex.getValue(ind), msgArray(ind)) } + // Note: This doesn't allow users to send messages to arbitrary vertices. + vertexPartition.aggregateUsingIndex(filteredEdges, reduceFunc).iterator } // do the final reduction reusing the index map diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala index be7cf516ea..3e3769b9af 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala @@ -16,19 +16,19 @@ class VTableReplicated[VD: ClassManifest]( vertexPlacement: VertexPlacement, prevVTableReplicated: Option[VTableReplicated[VD]] = None) { - val bothAttrs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = + val bothAttrs: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(vTable, eTable, vertexPlacement, true, true) - val srcAttrOnly: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = + val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(vTable, eTable, vertexPlacement, true, false) - val dstAttrOnly: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = + val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(vTable, eTable, vertexPlacement, false, true) - val noAttrs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = + val noAttrs: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(vTable, eTable, vertexPlacement, false, false) - def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = { + def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = { (includeSrc, includeDst) match { case (true, true) => bothAttrs case (true, false) => srcAttrOnly @@ -42,7 +42,7 @@ class VTableReplicated[VD: ClassManifest]( eTable: EdgeRDD[_], vertexPlacement: VertexPlacement, includeSrcAttr: Boolean, - includeDstAttr: Boolean): RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = { + includeDstAttr: Boolean): RDD[(Pid, VertexPartition[VD])] = { val placement = vertexPlacement.get(includeSrcAttr, includeDstAttr) val vdManifest = classManifest[VD] @@ -55,25 +55,14 @@ class VTableReplicated[VD: ClassManifest]( prevVTableReplicated match { case Some(vTableReplicated) => - val prev: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = + val prev: RDD[(Pid, VertexPartition[VD])] = vTableReplicated.get(includeSrcAttr, includeDstAttr) prev.zipPartitions(msgsByPartition) { (vTableIter, msgsIter) => - val (pid, (vidToIndex, oldVertexArray)) = vTableIter.next() - - val vertexArray = vdManifest.newArray(oldVertexArray.length) - System.arraycopy(oldVertexArray, 0, vertexArray, 0, vertexArray.length) - - for ((_, block) <- msgsIter) { - for (i <- 0 until block.vids.size) { - val vid = block.vids(i) - val attr = block.attrs(i) - val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK - vertexArray(ind) = attr - } - } - - Iterator((pid, (vidToIndex, vertexArray))) + val (pid, vertexPartition) = vTableIter.next() + val (_, block) = msgsIter.next() + val newVPart = vertexPartition.updateUsingIndex(block.iterator)(vdManifest) + Iterator((pid, newVPart)) }.cache() case None => @@ -107,7 +96,7 @@ class VTableReplicated[VD: ClassManifest]( vertexArray(ind) = attr } } - Iterator((pid, (vidToIndex, vertexArray))) + Iterator((pid, new VertexPartition(vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest))) }.cache() } } @@ -131,4 +120,6 @@ object VTableReplicated { } } -class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) +class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) { + def iterator: Iterator[(Vid, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } +} diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index ce93899ed1..c922350345 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -42,6 +42,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( /** Return the vertex attribute for the given vertex ID. */ def apply(vid: Vid): VD = values(index.getPos(vid)) + def isDefined(vid: Vid): Boolean = mask.get(index.getPos(vid)) + /** * Pass each vertex attribute along with the vertex id through a map * function and retain the original RDD's partitioning and index. @@ -167,6 +169,19 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( new VertexPartition[VD2](index, newValues, newMask) } + def updateUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]]) + : VertexPartition[VD2] = { + val newMask = new BitSet(capacity) + val newValues = new Array[VD2](capacity) + System.arraycopy(values, 0, newValues, 0, newValues.length) + iter.foreach { case (vid, vdata) => + val pos = index.getPos(vid) + newMask.set(pos) + newValues(pos) = vdata + } + new VertexPartition[VD2](index, newValues, newMask) + } + def aggregateUsingIndex[VD2: ClassManifest]( iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {