Rename rawGraph to graph in Pregel.

This commit is contained in:
Reynold Xin 2013-04-05 16:16:04 +08:00
parent d40c1d5122
commit 822d9c5b70
2 changed files with 12 additions and 10 deletions

View file

@ -6,35 +6,32 @@ import spark.RDD
object Pregel { object Pregel {
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
rawGraph: Graph[VD, ED])(
vprog: ( Vertex[VD], A) => VD, vprog: ( Vertex[VD], A) => VD,
sendMsg: (Vid, EdgeWithVertices[VD, ED]) => Option[A], sendMsg: (Vid, EdgeWithVertices[VD, ED]) => Option[A],
mergeMsg: (A, A) => A, mergeMsg: (A, A) => A,
initialMsg: A, initialMsg: A,
numIter: Int) : Graph[VD, ED] = { numIter: Int) : Graph[VD, ED] = {
var graph = rawGraph.cache var g = graph.cache
var i = 0 var i = 0
def reverseGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = def reverseGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) =
sendMsg(edge.otherVertex(vid).id, edge) sendMsg(edge.otherVertex(vid).id, edge)
var msgs: RDD[(Vid, A)] = graph.vertices.map{ v => (v.id, initialMsg) } var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) }
while (i < numIter) { while (i < numIter) {
def runProg(v: Vertex[VD], msg: Option[A]): VD = def runProg(v: Vertex[VD], msg: Option[A]): VD = if(msg.isEmpty) v.data else vprog(v, msg.get)
if(msg.isEmpty) v.data else vprog(v, msg.get)
graph = graph.updateVertices(msgs, runProg).cache() g = g.updateVertices(msgs, runProg).cache()
msgs = graph.flatMapReduceNeighborhood(reverseGather, mergeMsg, EdgeDirection.In) msgs = g.flatMapReduceNeighborhood(reverseGather, mergeMsg, EdgeDirection.In)
i += 1 i += 1
} }
graph g
} }

View file

@ -37,5 +37,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
(v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data) (v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data)
assert(g.numVertexPartitions === 5) assert(g.numVertexPartitions === 5)
assert(g.numEdgePartitions === 8) assert(g.numEdgePartitions === 8)
g = g.reverse
assert(g.numVertexPartitions === 5)
assert(g.numEdgePartitions === 8)
} }
} }