Added graphlab style implementation of Pregel and a more sophisticated version of mapReduceNeighborhoodFilter

This commit is contained in:
Joseph E. Gonzalez 2013-04-04 17:56:16 -07:00
parent db45cf3a49
commit cb99fc193c
3 changed files with 130 additions and 14 deletions

View file

@ -189,6 +189,61 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
}
def mapReduceNeighborhoodFilter[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2,
gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { edge: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
edgeSansAcc.data = edge.data
edgeSansAcc.src.data = edge.src.data._1
edgeSansAcc.dst.data = edge.dst.data._1
edgeSansAcc.src.id = edge.src.id
edgeSansAcc.dst.id = edge.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
edge.dst.data._2 =
if(edge.dst.data._2.isEmpty) mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
else {
val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
if(!tmp.isEmpty) Some(reduceFunc(edge.dst.data._2.get, tmp.get))
else edge.dst.data._2
}
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
edge.dst.data._2 =
if(edge.dst.data._2.isEmpty) mapFunc(edgeSansAcc.src.id, edgeSansAcc)
else {
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
if(!tmp.isEmpty) Some(reduceFunc(edge.src.data._2.get, tmp.get))
else edge.src.data._2
}
}
}
vmap.int2ObjectEntrySet().fastIterator().filter{!_.getValue()._2.isEmpty}.map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
.map{ case (vid, aOpt) => (vid, aOpt.get) }
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
def updateVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)],
updateFunc: (Vertex[VD], Option[U]) => VD2)

View file

@ -7,30 +7,57 @@ import spark.RDD
object GraphLab {
def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
graph: Graph[VD, ED])(
rawGraph: Graph[VD, ED])(
gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A,
default: A,
apply: (Vertex[VD], A) => VD,
numIter: Int,
gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In)
: Graph[VD, ED] = {
gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
var g = graph.cache()
var graph = rawGraph.cache()
var i = 0
while (i < numIter) {
val accUpdates: RDD[(Vid, A)] = g.mapReduceNeighborhood(
gather, merge, default, gatherDirection)
val accUpdates: RDD[(Vid, A)] =
graph.mapReduceNeighborhood(gather, merge, default, gatherDirection)
def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
g = g.updateVertices(accUpdates, applyFunc).cache()
graph = graph.updateVertices(accUpdates, applyFunc).cache()
i += 1
}
g
graph
}
def iterateGASOption[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])(
gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A,
apply: (Vertex[VD], Option[A]) => VD,
numIter: Int,
gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
var graph = rawGraph.cache()
def someGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = Some(gather(vid, edge))
var i = 0
while (i < numIter) {
val accUpdates: RDD[(Vid, A)] =
graph.mapReduceNeighborhoodFilter(someGather, merge, gatherDirection)
def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update) }
graph = graph.updateVertices(accUpdates, applyFunc).cache()
i += 1
}
graph
}
}

View file

@ -0,0 +1,34 @@
package spark.graph
import scala.collection.JavaConversions._
import spark.RDD
object Pregel {
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])(
vprog: ( Vertex[VD], A) => VD,
sendMsg: (Vid, EdgeWithVertices[VD, ED]) => Option[A],
mergeMsg: (A, A) => A,
numIter: Int) : Graph[VD, ED] = {
var graph = rawGraph.cache
var i = 0
while (i < numIter) {
val msgs: RDD[(Vid, A)] =
graph.mapReduceNeighborhoodFilter(sendMsg, mergeMsg, EdgeDirection.In)
def runProg(v: Vertex[VD], msg: Option[A]): VD =
if(msg.isEmpty) v.data else vprog(v, msg.get)
graph = graph.updateVertices(msgs, runProg).cache()
i += 1
}
graph
}
}