diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/spark/graph/Pregel.scala index 6c551c21dd..9ddae64b03 100644 --- a/graph/src/main/scala/spark/graph/Pregel.scala +++ b/graph/src/main/scala/spark/graph/Pregel.scala @@ -15,10 +15,14 @@ object Pregel { var graph = rawGraph.cache var i = 0 + + def reverseGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = + sendMsg(edge.otherVertex(vid).id, edge) + while (i < numIter) { val msgs: RDD[(Vid, A)] = - graph.mapReduceNeighborhoodFilter(sendMsg, mergeMsg, EdgeDirection.In) + graph.mapReduceNeighborhoodFilter(reverseGather, mergeMsg, EdgeDirection.In) def runProg(v: Vertex[VD], msg: Option[A]): VD = if(msg.isEmpty) v.data else vprog(v, msg.get)