Documenting Pregel API

This commit is contained in:
Joseph E. Gonzalez 2014-01-12 20:49:41 -08:00
parent 7a4bb863c7
commit c787ff5640

View file

@ -429,12 +429,209 @@ val joinedGraph = graph.joinVertices(uniqueCosts,
{% endhighlight %} {% endhighlight %}
## Map Reduce Triplets (mapReduceTriplets) ## Neighborhood Aggregation
A key part of graph computation is aggregating information about the neighborhood of each vertex.
For example we might want to know the number of followers each user has or the average age of the
the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path, and
connected components) repeatedly aggregate properties of neighboring vertices (e.g., current
PageRank Value, shortest path to the source, and smallest reachable vertex id).
### Map Reduce Triplets (mapReduceTriplets)
<a name="mrTriplets"></a> <a name="mrTriplets"></a>
[Graph.mapReduceTriplets]: api/graphx/index.html#mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
These core (heavily optimized) aggregation primitive in GraphX is the
(`mapReduceTriplets`)[Graph.mapReduceTriplets] operator:
{% highlight scala %}
def mapReduceTriplets[A](
map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
reduce: (A, A) => A)
: VertexRDD[A]
{% endhighlight %}
The (`mapReduceTriplets`)[Graph.mapReduceTriplets] operator takes a user defined map function which
is applied to each triplet and can yield *messages* destined to either (none or both) vertices in
the triplet. We currently only support messages destined to the source or destination vertex of the
triplet to enable optimized preaggregation. The user defined `reduce` function combines the
messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]`
containing the aggregate message to each vertex. Vertices that do not receive a message are not
included in the returned `VertexRDD`.
> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which
> restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting
> computation to triplets adjacent to a subset of the vertices is often necessary in incremental
> iterative computation and is a key part of the GraphX implementation of Pregel.
We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For
example if we wanted to compute the average age of followers who are older that each user we could
do the following.
{% highlight scala %}
// Graph with age as the vertex property
val graph: Graph[Double, String] = getFromSomewhereElse()
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
Iterator((triplet.dstId, (1, triplet.srcAttr)))
} else {
// Don't send a message for this triplet
Iterator.empty
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues { case (count, totalAge) => totalAge / count }
{% endhighlight %}
> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums)
> are constant sized (e.g., floats and addition instead of lists and concatenation). More
> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex.
Because it is often necessary to aggregate information about neighboring vertices we also provide an
alternative interface defined in [`GraphOps`][GraphOps]:
{% highlight scala %}
def aggregateNeighbors[A](
map: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
reduce: (A, A) => A,
edgeDir: EdgeDirection)
: VertexRDD[A]
{% endhighlight %}
The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but allows
the user to define the logic in a more vertex centric manner. Here the `map` function is provided
the vertex to which the message is sent as well as one of the edges and returns the optional message
value. The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All` edges
adjacent to each vertex.
### Computing Degree Information
A common aggregation task is computing the degree of each vertex: the number of edges adjacent to
each vertex. In the context of directed graphs it often necessary to know the in-degree, out-
degree, and the total degree of each vertex. The [`GraphOps`][GraphOps] class contains a
collection of operators to compute the degrees of each vertex. For example in the following we
compute the max in, out, and total degrees:
{% highlight scala %}
// Define a reduce operation to compute the highest degree vertex
def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce)
{% endhighlight %}
### Collecting Neighbors
In some cases it may be easier to express computation by collecting neighboring vertices and their
attributes at each vertex. This can be easily accomplished using the `collectNeighborIds` and the
`collectNeighbors` operators.
{% highlight scala %}
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] =
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
{% endhighlight %}
> Note that these operators can be quite costly as they duplicate information and require
> substantial communication. If possible try expressing the same computation using the
> `mapReduceTriplets` operator directly.
# Pregel API # Pregel API
<a name="pregel"></a> <a name="pregel"></a>
Graphs are inherently recursive data-structures as properties of a vertices depend on properties of
their neighbors which intern depend on properties of the neighbors of their neighbors. As a
consequence many important graph algorithms iteratively recompute the properties of each vertex
until a fixed-point condition is reached. A range of graph-parallel abstractions have been proposed
to express these iterative algorithms. GraphX exposes a Pregel operator which is a fusion of
the widely used Pregel and GraphLab abstractions.
At a high-level the GraphX variant of the Pregel abstraction is a bulk-synchronous parallel
messaging abstraction constrained to the topology of the graph. The Pregel operator executes in a
series of super-steps in which vertices receive the sum of their inbound messages from the previous
super-step, compute a new property value, and then send messages to neighboring vertices in the next
super-step. Vertices that do not receive a message are skipped within a super-step. The Pregel
operators terminates iteration and returns the final graph when there are no messages remaining.
> Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to
> neighboring vertices and the message construction is done in parallel using a user defined
> messaging function. These constraints allow additional optimization within GraphX.
The following is type signature of the Pregel operator as well as a *sketch* of its implementation
(note calls to graph.cache have been removed):
{% highlight scala %}
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages: -----------------------------------------------------------------------
// Run the vertex program on all vertices that receive messages
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Merge the new vertex values back into the graph
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
// Send Messages: ------------------------------------------------------------------------------
// Vertices that didn't receive a message above don't appear in newVerts and therefore don't
// get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
// on edges in the activeDir of vertices in newVerts
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
activeMessages = messages.count()
i += 1
}
g
}
{% endhighlight %}
Notice that Pregel takes two argument lists (i.e., `graph.pregel(list1)(list2)`). The first
argument list contains configuration parameters including the initial message, the maximum number of
iterations, and the edge direction in which to send messages (by default along out edges). The
second argument list contains the user defined functions for receiving messages (the vertex program
`vprog`), computing messages (`sendMsg`), and combining messages `mergeMsg`.
We can use the Pregel operator to express computation such single source shortest path in the
following example.
{% highlight scala %}
val graph: Graph[String, Double] // A graph with edge attributes containing distances
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == shourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist) // Vertex Program
triplet => { // Send Message
if(triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b) // Merge Message
)
{% endhighlight %}
# Graph Builders # Graph Builders
<a name="graph_builders"></a> <a name="graph_builders"></a>