[SPARK-9508] GraphX Pregel docs update with new Pregel code
SPARK-9436 simplifies the Pregel code. graphx-programming-guide needs to be modified accordingly since it lists the old Pregel code Author: Alexander Ulanov <nashb@yandex.ru> Closes #7831 from avulanov/SPARK-9508-pregel-doc2.
This commit is contained in:
parent
de3223872a
commit
1c843e2848
|
@ -768,16 +768,14 @@ class GraphOps[VD, ED] {
|
|||
// Loop until no messages remain or maxIterations is achieved
|
||||
var i = 0
|
||||
while (activeMessages > 0 && i < maxIterations) {
|
||||
// Receive the messages: -----------------------------------------------------------------------
|
||||
// Run the vertex program on all vertices that receive messages
|
||||
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
|
||||
// Merge the new vertex values back into the graph
|
||||
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
|
||||
// Send Messages: ------------------------------------------------------------------------------
|
||||
// Vertices that didn't receive a message above don't appear in newVerts and therefore don't
|
||||
// get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
|
||||
// on edges in the activeDir of vertices in newVerts
|
||||
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
|
||||
// Receive the messages and update the vertices.
|
||||
g = g.joinVertices(messages)(vprog).cache()
|
||||
val oldMessages = messages
|
||||
// Send new messages, skipping edges where neither side received a message. We must cache
|
||||
// messages so it can be materialized on the next line, allowing us to uncache the previous
|
||||
// iteration.
|
||||
messages = g.mapReduceTriplets(
|
||||
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
|
||||
activeMessages = messages.count()
|
||||
i += 1
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue