From cb99fc193c489ad3b481c165fd0c23cd596808df Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 4 Apr 2013 17:56:16 -0700 Subject: [PATCH] Added graphlab style implementation of Pregel and a more sophisticated version of mapReduceNeighborhoodFilter --- graph/src/main/scala/spark/graph/Graph.scala | 55 +++++++++++++++++++ .../src/main/scala/spark/graph/GraphLab.scala | 55 ++++++++++++++----- graph/src/main/scala/spark/graph/Pregel.scala | 34 ++++++++++++ 3 files changed, 130 insertions(+), 14 deletions(-) create mode 100644 graph/src/main/scala/spark/graph/Pregel.scala diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 1117cb5a68..b264a5e209 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -189,6 +189,61 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected ( } + + def mapReduceNeighborhoodFilter[VD2: ClassManifest]( + mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], + reduceFunc: (VD2, VD2) => VD2, + gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = { + + ClosureCleaner.clean(mapFunc) + ClosureCleaner.clean(reduceFunc) + + val newVTable = vTableReplicated.mapPartitions({ part => + part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) } + }, preservesPartitioning = true) + + (new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable)) + .mapPartitions { part => + val (vmap, edges) = part.next() + val edgeSansAcc = new EdgeWithVertices[VD, ED]() + edgeSansAcc.src = new Vertex[VD] + edgeSansAcc.dst = new Vertex[VD] + edges.foreach { edge: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] => + edgeSansAcc.data = edge.data + edgeSansAcc.src.data = edge.src.data._1 + edgeSansAcc.dst.data = edge.dst.data._1 + edgeSansAcc.src.id = edge.src.id + edgeSansAcc.dst.id = edge.dst.id + if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) { + edge.dst.data._2 = + if(edge.dst.data._2.isEmpty) mapFunc(edgeSansAcc.dst.id, edgeSansAcc) + else { + val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc) + if(!tmp.isEmpty) Some(reduceFunc(edge.dst.data._2.get, tmp.get)) + else edge.dst.data._2 + } + } + if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { + edge.dst.data._2 = + if(edge.dst.data._2.isEmpty) mapFunc(edgeSansAcc.src.id, edgeSansAcc) + else { + val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc) + if(!tmp.isEmpty) Some(reduceFunc(edge.src.data._2.get, tmp.get)) + else edge.src.data._2 + } + } + } + vmap.int2ObjectEntrySet().fastIterator().filter{!_.getValue()._2.isEmpty}.map{ entry => + (entry.getIntKey(), entry.getValue()._2) + } + } + .map{ case (vid, aOpt) => (vid, aOpt.get) } + .combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false) + + } + + + def updateVertices[U: ClassManifest, VD2: ClassManifest]( updates: RDD[(Vid, U)], updateFunc: (Vertex[VD], Option[U]) => VD2) diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index eb992c464a..8899d9688a 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -7,30 +7,57 @@ import spark.RDD object GraphLab { def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( - graph: Graph[VD, ED])( - gather: (Vid, EdgeWithVertices[VD, ED]) => A, - merge: (A, A) => A, - default: A, - apply: (Vertex[VD], A) => VD, - numIter: Int, - gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) - : Graph[VD, ED] = { + rawGraph: Graph[VD, ED])( + gather: (Vid, EdgeWithVertices[VD, ED]) => A, + merge: (A, A) => A, + default: A, + apply: (Vertex[VD], A) => VD, + numIter: Int, + gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { - var g = graph.cache() + var graph = rawGraph.cache() var i = 0 while (i < numIter) { - val accUpdates: RDD[(Vid, A)] = g.mapReduceNeighborhood( - gather, merge, default, gatherDirection) + val accUpdates: RDD[(Vid, A)] = + graph.mapReduceNeighborhood(gather, merge, default, gatherDirection) def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } - g = g.updateVertices(accUpdates, applyFunc).cache() + graph = graph.updateVertices(accUpdates, applyFunc).cache() i += 1 } - - g + graph } + + def iterateGASOption[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( + rawGraph: Graph[VD, ED])( + gather: (Vid, EdgeWithVertices[VD, ED]) => A, + merge: (A, A) => A, + apply: (Vertex[VD], Option[A]) => VD, + numIter: Int, + gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { + + var graph = rawGraph.cache() + + def someGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = Some(gather(vid, edge)) + + var i = 0 + while (i < numIter) { + + val accUpdates: RDD[(Vid, A)] = + graph.mapReduceNeighborhoodFilter(someGather, merge, gatherDirection) + + def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update) } + graph = graph.updateVertices(accUpdates, applyFunc).cache() + + i += 1 + } + graph + } + + + } diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/spark/graph/Pregel.scala new file mode 100644 index 0000000000..6c551c21dd --- /dev/null +++ b/graph/src/main/scala/spark/graph/Pregel.scala @@ -0,0 +1,34 @@ +package spark.graph + +import scala.collection.JavaConversions._ +import spark.RDD + + +object Pregel { + + def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( + rawGraph: Graph[VD, ED])( + vprog: ( Vertex[VD], A) => VD, + sendMsg: (Vid, EdgeWithVertices[VD, ED]) => Option[A], + mergeMsg: (A, A) => A, + numIter: Int) : Graph[VD, ED] = { + + var graph = rawGraph.cache + var i = 0 + while (i < numIter) { + + val msgs: RDD[(Vid, A)] = + graph.mapReduceNeighborhoodFilter(sendMsg, mergeMsg, EdgeDirection.In) + + def runProg(v: Vertex[VD], msg: Option[A]): VD = + if(msg.isEmpty) v.data else vprog(v, msg.get) + + graph = graph.updateVertices(msgs, runProg).cache() + + i += 1 + } + graph + + } + +}