Reindenting documentation.

This commit is contained in:
Joseph E. Gonzalez 2013-10-29 14:01:24 -07:00
parent d316cad9b1
commit 15958ca65a
5 changed files with 213 additions and 183 deletions

View file

@ -6,19 +6,20 @@ import org.apache.spark._
/**
* The Analytics object contains a collection of basic graph analytics
* algorithms that operate largely on the graph structure.
* algorithms that operate largely on the graph structure.
*
* In addition the Analytics object contains a driver `main` which can
* be used to apply the various functions to graphs in standard formats.
* In addition the Analytics object contains a driver `main` which can
* be used to apply the various functions to graphs in standard
* formats.
*/
object Analytics extends Logging {
/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge attributes
* the normalized edge weight.
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
* The following PageRank fixed point is computed for each vertex.
* The following PageRank fixed point is computed for each vertex.
*
* {{{
* var PR = Array.fill(n)( 1.0 )
@ -31,12 +32,13 @@ object Analytics extends Logging {
* }
* }}}
*
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and `outDeg[j]`
* is the out degree of vertex `j`.
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and
* `outDeg[j]` is the out degree of vertex `j`.
*
* Note that this is not the "normalized" PageRank and as a consequence
* pages that have no inlinks will have a PageRank of alpha.
* Note that this is not the "normalized" PageRank and as a
* consequence pages that have no inlinks will have a PageRank of
* alpha.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
@ -45,8 +47,8 @@ object Analytics extends Logging {
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
*
* @return the graph containing with each vertex containing the PageRank and
* each edge containing the normalized weight.
* @return the graph containing with each vertex containing the
* PageRank and each edge containing the normalized weight.
*
*/
def pagerank[VD: Manifest, ED: Manifest](
@ -54,8 +56,8 @@ object Analytics extends Logging {
Graph[Double, Double] = {
/**
* Initialize the pagerankGraph with each edge attribute
* having weight 1/outDegree and each vertex with attribute 1.0.
* Initialize the pagerankGraph with each edge attribute having
* weight 1/outDegree and each vertex with attribute 1.0.
*/
val pagerankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
@ -85,10 +87,11 @@ object Analytics extends Logging {
vertexProgram, sendMessage, messageCombiner)
}
/**
* Run a dynamic version of PageRank returning a graph with vertex attributes
* containing the PageRank and edge attributes containing the normalized
* edge weight.
* Run a dynamic version of PageRank returning a graph with vertex
* attributes containing the PageRank and edge attributes containing
* the normalized edge weight.
*
* {{{
* var PR = Array.fill(n)( 1.0 )
@ -101,22 +104,24 @@ object Analytics extends Logging {
* }
* }}}
*
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and `outDeg[j]`
* is the out degree of vertex `j`.
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and
* `outDeg[j]` is the out degree of vertex `j`.
*
* Note that this is not the "normalized" PageRank and as a consequence
* pages that have no inlinks will have a PageRank of alpha.
* Note that this is not the "normalized" PageRank and as a
* consequence pages that have no inlinks will have a PageRank of
* alpha.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param tol the tolerance allowed at convergence (smaller => more accurate).
* @param tol the tolerance allowed at convergence (smaller => more
* accurate).
* @param resetProb the random reset probability (alpha)
*
* @return the graph containing with each vertex containing the PageRank and
* each edge containing the normalized weight.
* @return the graph containing with each vertex containing the
* PageRank and each edge containing the normalized weight.
*/
def deltaPagerank[VD: Manifest, ED: Manifest](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15):
@ -163,18 +168,19 @@ object Analytics extends Logging {
/**
* Compute the connected component membership of each vertex
* and return an RDD with the vertex value containing the
* lowest vertex id in the connected component containing
* that vertex.
* Compute the connected component membership of each vertex and
* return an RDD with the vertex value containing the lowest vertex
* id in the connected component containing that vertex.
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam VD the vertex attribute type (discarded in the
* computation)
* @tparam ED the edge attribute type (preserved in the computation)
*
* @param graph the graph for which to compute the connected components
* @param graph the graph for which to compute the connected
* components
*
* @return a graph with vertex attributes containing the smallest vertex
* in each connected component
* @return a graph with vertex attributes containing the smallest
* vertex in each connected component
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]):
Graph[Vid, ED] = {

View file

@ -8,8 +8,17 @@ package org.apache.spark.graph
* @tparam ED type of the edge attribute
*/
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
/**
* The vertex id of the source vertex
*/
var srcId: Vid = 0,
/**
* The vertex id of the target vertex.
*/
var dstId: Vid = 0,
/**
* The attribute associated with the edge.
*/
var attr: ED = nullValue[ED]) {
/**
@ -23,10 +32,12 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
/**
* Return the relative direction of the edge to the corresponding vertex.
* Return the relative direction of the edge to the corresponding
* vertex.
*
* @param vid the id of one of the two vertices in the edge.
* @return the relative direction of the edge to the corresponding vertex.
* @return the relative direction of the edge to the corresponding
* vertex.
*/
def relativeDirection(vid: Vid): EdgeDirection =
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }

View file

@ -1,14 +1,15 @@
package org.apache.spark.graph
/**
* An edge triplet represents two vertices and edge along with their attributes.
* An edge triplet represents two vertices and edge along with their
* attributes.
*
* @tparam VD the type of the vertex attribute.
* @tparam ED the type of the edge attribute
*
* @todo specialize edge triplet for basic types, though when I last tried
* specializing I got a warning about inherenting from a type that is not
* a trait.
* @todo specialize edge triplet for basic types, though when I last
* tried specializing I got a warning about inherenting from a type
* that is not a trait.
*/
class EdgeTriplet[VD, ED] extends Edge[ED] {
// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest,

View file

@ -7,9 +7,10 @@ import org.apache.spark.util.ClosureCleaner
/**
* `GraphOps` contains additional functionality (syntatic sugar) for the graph
* type and is implicitly constructed for each Graph object. All operations in
* `GraphOps` are expressed in terms of the efficient GraphX API.
* `GraphOps` contains additional functionality (syntatic sugar) for
* the graph type and is implicitly constructed for each Graph object.
* All operations in `GraphOps` are expressed in terms of the
* efficient GraphX API.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
@ -30,7 +31,8 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Compute the in-degree of each vertex in the Graph returning an RDD.
* Compute the in-degree of each vertex in the Graph returning an
* RDD.
* @note Vertices with no in edges are not returned in the resulting RDD.
*/
lazy val inDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.In)
@ -44,8 +46,9 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Compute the degrees of each vertex in the Graph returning an RDD.
* @note Vertices with no edges are not returned in the resulting RDD.
* Compute the degrees of each vertex in the Graph returning an RDD.
* @note Vertices with no edges are not returned in the resulting
* RDD.
*/
lazy val degrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Both)
@ -53,8 +56,8 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Compute the neighboring vertex degrees.
*
* @param edgeDirection the direction along which to collect neighboring
* vertex attributes.
* @param edgeDirection the direction along which to collect
* neighboring vertex attributes.
*/
private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = {
graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
@ -62,29 +65,31 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* This function is used to compute a statistic for the neighborhood of each
* vertex and returns a value for all vertices (including those without
* neighbors).
* This function is used to compute a statistic for the neighborhood
* of each vertex and returns a value for all vertices (including
* those without neighbors).
*
* @note Because the a default value is provided all vertices will have a
* corresponding entry in the returned RDD.
* @note Because the a default value is provided all vertices will
* have a corresponding entry in the returned RDD.
*
* @param mapFunc the function applied to each edge adjacent to each vertex.
* The mapFunc can optionally return None in which case it does not
* contribute to the final sum.
* @param reduceFunc the function used to merge the results of each map
* operation.
* @param default the default value to use for each vertex if it has no
* neighbors or the map function repeatedly evaluates to none
* @param direction the direction of edges to consider (e.g., In, Out, Both).
* @param mapFunc the function applied to each edge adjacent to each
* vertex. The mapFunc can optionally return None in which case it
* does not contribute to the final sum.
* @param reduceFunc the function used to merge the results of each
* map operation.
* @param default the default value to use for each vertex if it has
* no neighbors or the map function repeatedly evaluates to none
* @param direction the direction of edges to consider (e.g., In,
* Out, Both).
* @tparam VD2 The returned type of the aggregation operation.
*
* @return A Spark.RDD containing tuples of vertex identifiers and
* their resulting value. There will be exactly one entry for ever vertex in
* the original graph.
* their resulting value. There will be exactly one entry for ever
* vertex in the original graph.
*
* @example We can use this function to compute the average follower age
* for each user
* @example We can use this function to compute the average follower
* age for each user
*
* {{{
* val graph: Graph[Int,Int] = loadGraph()
* val averageFollowerAge: RDD[(Int, Int)] =
@ -138,7 +143,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Return the Ids of the neighboring vertices.
*
* @param edgeDirection the direction along which to collect
* @param edgeDirection the direction along which to collect
* neighboring vertices
*
* @return the vertex set of neighboring ids for each vertex.
@ -164,11 +169,11 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* graphs where high degree vertices may force a large ammount of
* information to be collected to a single location.
*
* @param edgeDirection the direction along which to collect
* @param edgeDirection the direction along which to collect
* neighboring vertices
*
* @return the vertex set of neighboring vertex attributes
* for each vertex.
* @return the vertex set of neighboring vertex attributes for each
* vertex.
*/
def collectNeighbors(edgeDirection: EdgeDirection) :
VertexSetRDD[ Array[(Vid, VD)] ] = {
@ -186,24 +191,26 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Join the vertices with an RDD and then apply a function from the the
* vertex and RDD entry to a new vertex value. The input table should
* contain at most one entry for each vertex. If no entry is provided the
* map function is skipped and the old value is used.
* Join the vertices with an RDD and then apply a function from the
* the vertex and RDD entry to a new vertex value. The input table
* should contain at most one entry for each vertex. If no entry is
* provided the map function is skipped and the old value is used.
*
* @tparam U the type of entry in the table of updates
* @param table the table to join with the vertices in the graph. The table
* should contain at most one entry for each vertex.
* @param mapFunc the function used to compute the new vertex values. The
* map function is invoked only for vertices with a corresponding entry in
* the table otherwise the old vertex value is used.
* @param table the table to join with the vertices in the graph.
* The table should contain at most one entry for each vertex.
* @param mapFunc the function used to compute the new vertex
* values. The map function is invoked only for vertices with a
* corresponding entry in the table otherwise the old vertex value
* is used.
*
* @note for small tables this function can be much more efficient than
* leftJoinVertices
* @note for small tables this function can be much more efficient
* than leftJoinVertices
*
* @example This function is used to update the vertices with new
* values based on external data. For example we could add the out
* degree to each vertex record
*
* @example This function is used to update the vertices with new values
* based on external data. For example we could add the out degree to each
* vertex record
* {{{
* val rawGraph: Graph[Int,()] = Graph.textFile("webgraph")
* .mapVertices(v => 0)

View file

@ -5,18 +5,17 @@ import org.apache.spark.rdd.RDD
/**
* This object implements a Pregel-like bulk-synchronous
* message-passing API. However, unlike the original Pregel API
* the GraphX pregel API factors the sendMessage computation over
* edges, enables the message sending computation to read both
* vertex attributes, and finally contrains messages to the graph
* structure. These changes allow for substantially more efficient
* distributed execution while also exposing greater flexibility
* for graph based computation.
* message-passing API. However, unlike the original Pregel API the
* GraphX pregel API factors the sendMessage computation over edges,
* enables the message sending computation to read both vertex
* attributes, and finally contrains messages to the graph structure.
* These changes allow for substantially more efficient distributed
* execution while also exposing greater flexibility for graph based
* computation.
*
* This object present several variants of the bulk synchronous
* execution that differ only in the edge direction along which
* messages are sent and whether a fixed number of iterations
* is used.
* messages are sent and whether a fixed number of iterations is used.
*
* @example We can use the Pregel abstraction to implement PageRank
* {{{
@ -46,18 +45,18 @@ object Pregel {
/**
* Execute a Pregel-like iterative vertex-parallel abstraction.
* The user-defined vertex-program `vprog` is executed in parallel
* on each vertex receiving any inbound messages and computing a new
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message then
* the vertex-program is not invoked.
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates a fixed number (`numIter`) of iterations.
*
@ -67,24 +66,26 @@ object Pregel {
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the
* on the first iteration.
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
* @param numIter the number of iterations to run this computation.
*
* @param vprog the user-defined vertex program which runs on each vertex
* and receives the inbound message and computes a new vertex value.
* On the first iteration the vertex program is invoked on all vertices
* and is passed the default message. On subsequent iterations the
* vertex program is only invoked on those vertices that receive messages.
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out edges
* of vertices that received messages in the current iteration.
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration.
*
* @param mergeMsg a user supplied function that takes two incoming messages
* of type A and merges them into a single message of type A.
* ''This function must be commutative and associative and ideally the
* size of A should not increase.''
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
@ -100,18 +101,18 @@ object Pregel {
/**
* Execute a Pregel-like iterative vertex-parallel abstraction.
* The user-defined vertex-program `vprog` is executed in parallel
* on each vertex receiving any inbound messages and computing a new
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message then
* the vertex-program is not invoked.
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates a fixed number (`numIter`) of iterations.
*
@ -121,28 +122,29 @@ object Pregel {
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the
* on the first iteration.
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
* @param numIter the number of iterations to run this computation.
*
* @param sendDir the edge direction along which the `sendMsg` function
* is invoked.
* @param sendDir the edge direction along which the `sendMsg`
* function is invoked.
*
* @param vprog the user-defined vertex program which runs on each vertex
* and receives the inbound message and computes a new vertex value.
* On the first iteration the vertex program is invoked on all vertices
* and is passed the default message. On subsequent iterations the
* vertex program is only invoked on those vertices that receive messages.
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to each edge
* in the direction `sendDir` adjacent to vertices that received messages
* in the current iteration.
* @param sendMsg a user supplied function that is applied to each
* edge in the direction `sendDir` adjacent to vertices that
* received messages in the current iteration.
*
* @param mergeMsg a user supplied function that takes two incoming messages
* of type A and merges them into a single message of type A.
* ''This function must be commutative and associative and ideally the
* size of A should not increase.''
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
@ -174,18 +176,18 @@ object Pregel {
/**
* Execute a Pregel-like iterative vertex-parallel abstraction.
* The user-defined vertex-program `vprog` is executed in parallel
* on each vertex receiving any inbound messages and computing a new
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message then
* the vertex-program is not invoked.
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages.
*
@ -195,24 +197,26 @@ object Pregel {
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the
* on the first iteration.
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
* @param numIter the number of iterations to run this computation.
*
* @param vprog the user-defined vertex program which runs on each vertex
* and receives the inbound message and computes a new vertex value.
* On the first iteration the vertex program is invoked on all vertices
* and is passed the default message. On subsequent iterations the
* vertex program is only invoked on those vertices that receive messages.
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to out edges
* of vertices that received messages in the current iteration.
* @param sendMsg a user supplied function that is applied to out
* edges of vertices that received messages in the current
* iteration.
*
* @param mergeMsg a user supplied function that takes two incoming messages
* of type A and merges them into a single message of type A.
* ''This function must be commutative and associative and ideally the
* size of A should not increase.''
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
@ -228,18 +232,18 @@ object Pregel {
/**
* Execute a Pregel-like iterative vertex-parallel abstraction.
* The user-defined vertex-program `vprog` is executed in parallel
* on each vertex receiving any inbound messages and computing a new
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message then
* the vertex-program is not invoked.
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages.
*
@ -249,28 +253,29 @@ object Pregel {
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the
* on the first iteration.
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
* @param numIter the number of iterations to run this computation.
*
* @param sendDir the edge direction along which the `sendMsg` function
* is invoked.
* @param sendDir the edge direction along which the `sendMsg`
* function is invoked.
*
* @param vprog the user-defined vertex program which runs on each vertex
* and receives the inbound message and computes a new vertex value.
* On the first iteration the vertex program is invoked on all vertices
* and is passed the default message. On subsequent iterations the
* vertex program is only invoked on those vertices that receive messages.
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to each edge
* in the direction `sendDir` adjacent to vertices that received messages
* in the current iteration.
* @param sendMsg a user supplied function that is applied to each
* edge in the direction `sendDir` adjacent to vertices that
* received messages in the current iteration.
*
* @param mergeMsg a user supplied function that takes two incoming messages
* of type A and merges them into a single message of type A.
* ''This function must be commutative and associative and ideally the
* size of A should not increase.''
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*