Changing the Pregel interface slightly to better support type inference.

This commit is contained in:
Joseph E. Gonzalez 2013-10-22 15:01:20 -07:00
parent 9cf43cfeb7
commit 14a3329a11

View file

@ -10,6 +10,8 @@ import org.apache.spark.rdd.RDD
object Pregel { object Pregel {
/** /**
* Execute the Pregel program. * Execute the Pregel program.
* *
@ -34,12 +36,11 @@ object Pregel {
* @return the resulting graph at the end of the computation * @return the resulting graph at the end of the computation
* *
*/ */
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
vprog: (Vid, VD, A) => VD, vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A, mergeMsg: (A, A) => A)
initialMsg: A,
numIter: Int)
: Graph[VD, ED] = { : Graph[VD, ED] = {
var g = graph var g = graph
@ -61,5 +62,61 @@ object Pregel {
} }
// Return the final graph // Return the final graph
g g
} } // end of apply
}
/**
* Execute the Pregel program.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param vprog a user supplied function that acts as the vertex program for
* the Pregel computation. It takes the vertex ID of the vertex it is running on,
* the accompanying data for that vertex, and the incoming data and returns the
* new vertex value.
* @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
* between the vertex and one of its neighbors and produces a message to send
* to that neighbor.
* @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
* them into a single message of type A. ''This function must be commutative and
* associative.''
* @param initialMsg the message each vertex will receive at the beginning of the
* first iteration.
* @param numIter the number of iterations to run this computation for.
*
* @return the resulting graph at the end of the computation
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
var g = graph
//var g = graph.cache()
var i = 0
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
// Receive the first set of messages
g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
var activeMessages = g.numEdges
while (activeMessages > 0) {
// compute the messages
val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In).cache
activeMessages = messages.count
// receive the messages
g = g.joinVertices(messages)(vprog)
// count the iteration
i += 1
}
// Return the final graph
g
} // end of apply
} // end of class Pregel