diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 94dc806fc2..d45e351d6a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -4,34 +4,87 @@ import org.apache.spark.rdd.RDD /** - * This object implements the Pregel bulk-synchronous - * message-passing API. + * This object implements a Pregel-like bulk-synchronous + * message-passing API. However, unlike the original Pregel API + * the GraphX pregel API factors the sendMessage computation over + * edges, enables the message sending computation to read both + * vertex attributes, and finally contrains messages to the graph + * structure. These changes allow for substantially more efficient + * distributed execution while also exposing greater flexibility + * for graph based computation. + * + * This object present several variants of the bulk synchronous + * execution that differ only in the edge direction along which + * messages are sent and whether a fixed number of iterations + * is used. + * + * @example We can use the Pregel abstraction to implement PageRank + * {{{ + * val pagerankGraph: Graph[Double, Double] = graph + * // Associate the degree with each vertex + * .outerJoinVertices(graph.outDegrees){ + * (vid, vdata, deg) => deg.getOrElse(0) + * } + * // Set the weight on the edges based on the degree + * .mapTriplets( e => 1.0 / e.srcAttr ) + * // Set the vertex attributes to the initial pagerank values + * .mapVertices( (id, attr) => 1.0 ) + * + * def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double = + * resetProb + (1.0 - resetProb) * msgSum + * def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] = + * Some(edge.srcAttr * edge.attr) + * def messageCombiner(a: Double, b: Double): Double = a + b + * val initialMessage = 0.0 + * // Execute pregel for a fixed number of iterations. + * Pregel(pagerankGraph, initialMessage, numIter)( + * vertexProgram, sendMessage, messageCombiner) + * }}} + * */ object Pregel { - - /** - * Execute the Pregel program. + * Execute a Pregel-like iterative vertex-parallel abstraction. + * The user-defined vertex-program `vprog` is executed in parallel + * on each vertex receiving any inbound messages and computing a new + * value for the vertex. The `sendMsg` function is then invoked on + * all out-edges and is used to compute an optional message to the + * destination vertex. The `mergeMsg` function is a commutative + * associative function used to combine messages destined to the + * same vertex. + * + * On the first iteration all vertices receive the `initialMsg` and + * on subsequent iterations if a vertex does not receive a message then + * the vertex-program is not invoked. + * + * This function iterates a fixed number (`numIter`) of iterations. * * @tparam VD the vertex data type * @tparam ED the edge data type * @tparam A the Pregel message type * - * @param vprog a user supplied function that acts as the vertex program for - * the Pregel computation. It takes the vertex ID of the vertex it is running on, - * the accompanying data for that vertex, and the incoming data and returns the - * new vertex value. - * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet - * between the vertex and one of its neighbors and produces a message to send - * to that neighbor. - * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges - * them into a single message of type A. ''This function must be commutative and - * associative.'' - * @param initialMsg the message each vertex will receive at the beginning of the - * first iteration. - * @param numIter the number of iterations to run this computation for. + * @param graph the input graph. + * + * @param initialMsg the message each vertex will receive at the + * on the first iteration. + * + * @param numIter the number of iterations to run this computation. + * + * @param vprog the user-defined vertex program which runs on each vertex + * and receives the inbound message and computes a new vertex value. + * On the first iteration the vertex program is invoked on all vertices + * and is passed the default message. On subsequent iterations the + * vertex program is only invoked on those vertices that receive messages. + * + * @param sendMsg a user supplied function that is applied to out edges + * of vertices that received messages in the current iteration. + * + * @param mergeMsg a user supplied function that takes two incoming messages + * of type A and merges them into a single message of type A. + * ''This function must be commutative and associative and ideally the + * size of A should not increase.'' * * @return the resulting graph at the end of the computation * @@ -42,6 +95,64 @@ object Pregel { sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], mergeMsg: (A, A) => A) : Graph[VD, ED] = { + apply(graph, initialMsg, numIter, EdgeDirection.Out)(vprog, sendMsg, mergeMsg) + } // end of Apply + + + /** + * Execute a Pregel-like iterative vertex-parallel abstraction. + * The user-defined vertex-program `vprog` is executed in parallel + * on each vertex receiving any inbound messages and computing a new + * value for the vertex. The `sendMsg` function is then invoked on + * all out-edges and is used to compute an optional message to the + * destination vertex. The `mergeMsg` function is a commutative + * associative function used to combine messages destined to the + * same vertex. + * + * On the first iteration all vertices receive the `initialMsg` and + * on subsequent iterations if a vertex does not receive a message then + * the vertex-program is not invoked. + * + * This function iterates a fixed number (`numIter`) of iterations. + * + * @tparam VD the vertex data type + * @tparam ED the edge data type + * @tparam A the Pregel message type + * + * @param graph the input graph. + * + * @param initialMsg the message each vertex will receive at the + * on the first iteration. + * + * @param numIter the number of iterations to run this computation. + * + * @param sendDir the edge direction along which the `sendMsg` function + * is invoked. + * + * @param vprog the user-defined vertex program which runs on each vertex + * and receives the inbound message and computes a new vertex value. + * On the first iteration the vertex program is invoked on all vertices + * and is passed the default message. On subsequent iterations the + * vertex program is only invoked on those vertices that receive messages. + * + * @param sendMsg a user supplied function that is applied to each edge + * in the direction `sendDir` adjacent to vertices that received messages + * in the current iteration. + * + * @param mergeMsg a user supplied function that takes two incoming messages + * of type A and merges them into a single message of type A. + * ''This function must be commutative and associative and ideally the + * size of A should not increase.'' + * + * @return the resulting graph at the end of the computation + * + */ + def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] + (graph: Graph[VD, ED], initialMsg: A, numIter: Int, sendDir: EdgeDirection)( + vprog: (Vid, VD, A) => VD, + sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge) @@ -51,7 +162,7 @@ object Pregel { var i = 0 while (i < numIter) { // compute the messages - val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) + val messages = g.aggregateNeighbors(mapF, mergeMsg, sendDir.reverse) // receive the messages g = g.joinVertices(messages)(vprog) // count the iteration @@ -63,25 +174,45 @@ object Pregel { /** - * Execute the Pregel program. + * Execute a Pregel-like iterative vertex-parallel abstraction. + * The user-defined vertex-program `vprog` is executed in parallel + * on each vertex receiving any inbound messages and computing a new + * value for the vertex. The `sendMsg` function is then invoked on + * all out-edges and is used to compute an optional message to the + * destination vertex. The `mergeMsg` function is a commutative + * associative function used to combine messages destined to the + * same vertex. + * + * On the first iteration all vertices receive the `initialMsg` and + * on subsequent iterations if a vertex does not receive a message then + * the vertex-program is not invoked. + * + * This function iterates until there are no remaining messages. * * @tparam VD the vertex data type * @tparam ED the edge data type * @tparam A the Pregel message type * - * @param vprog a user supplied function that acts as the vertex program for - * the Pregel computation. It takes the vertex ID of the vertex it is running on, - * the accompanying data for that vertex, and the incoming data and returns the - * new vertex value. - * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet - * between the vertex and one of its neighbors and produces a message to send - * to that neighbor. - * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges - * them into a single message of type A. ''This function must be commutative and - * associative.'' - * @param initialMsg the message each vertex will receive at the beginning of the - * first iteration. - * @param numIter the number of iterations to run this computation for. + * @param graph the input graph. + * + * @param initialMsg the message each vertex will receive at the + * on the first iteration. + * + * @param numIter the number of iterations to run this computation. + * + * @param vprog the user-defined vertex program which runs on each vertex + * and receives the inbound message and computes a new vertex value. + * On the first iteration the vertex program is invoked on all vertices + * and is passed the default message. On subsequent iterations the + * vertex program is only invoked on those vertices that receive messages. + * + * @param sendMsg a user supplied function that is applied to out edges + * of vertices that received messages in the current iteration. + * + * @param mergeMsg a user supplied function that takes two incoming messages + * of type A and merges them into a single message of type A. + * ''This function must be commutative and associative and ideally the + * size of A should not increase.'' * * @return the resulting graph at the end of the computation * @@ -92,6 +223,64 @@ object Pregel { sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], mergeMsg: (A, A) => A) : Graph[VD, ED] = { + apply(graph, initialMsg, EdgeDirection.Out)(vprog, sendMsg, mergeMsg) + } // end of apply + + + /** + * Execute a Pregel-like iterative vertex-parallel abstraction. + * The user-defined vertex-program `vprog` is executed in parallel + * on each vertex receiving any inbound messages and computing a new + * value for the vertex. The `sendMsg` function is then invoked on + * all out-edges and is used to compute an optional message to the + * destination vertex. The `mergeMsg` function is a commutative + * associative function used to combine messages destined to the + * same vertex. + * + * On the first iteration all vertices receive the `initialMsg` and + * on subsequent iterations if a vertex does not receive a message then + * the vertex-program is not invoked. + * + * This function iterates until there are no remaining messages. + * + * @tparam VD the vertex data type + * @tparam ED the edge data type + * @tparam A the Pregel message type + * + * @param graph the input graph. + * + * @param initialMsg the message each vertex will receive at the + * on the first iteration. + * + * @param numIter the number of iterations to run this computation. + * + * @param sendDir the edge direction along which the `sendMsg` function + * is invoked. + * + * @param vprog the user-defined vertex program which runs on each vertex + * and receives the inbound message and computes a new vertex value. + * On the first iteration the vertex program is invoked on all vertices + * and is passed the default message. On subsequent iterations the + * vertex program is only invoked on those vertices that receive messages. + * + * @param sendMsg a user supplied function that is applied to each edge + * in the direction `sendDir` adjacent to vertices that received messages + * in the current iteration. + * + * @param mergeMsg a user supplied function that takes two incoming messages + * of type A and merges them into a single message of type A. + * ''This function must be commutative and associative and ideally the + * size of A should not increase.'' + * + * @return the resulting graph at the end of the computation + * + */ + def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] + (graph: Graph[VD, ED], initialMsg: A, sendDir: EdgeDirection)( + vprog: (Vid, VD, A) => VD, + sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = { msgOpt match { @@ -114,7 +303,7 @@ object Pregel { var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) ) // compute the messages - var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache + var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache var activeMessages = messages.count // Loop var i = 0 @@ -123,7 +312,7 @@ object Pregel { g = g.outerJoinVertices(messages)(vprogFun) val oldMessages = messages // compute the messages - messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache + messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache activeMessages = messages.count // after counting we can unpersist the old messages oldMessages.unpersist(blocking=false)