Remove GraphLab

This commit is contained in:
Ankur Dave 2014-01-11 11:49:21 -08:00
parent 0b5c49ebad
commit 732333d78e
4 changed files with 40 additions and 170 deletions

View file

@ -18,13 +18,12 @@ title: GraphX Programming Guide
GraphX is the new (alpha) Spark API for graphs and graph-parallel
computation. At a high-level, GraphX extends the Spark
[RDD](api/core/index.html#org.apache.spark.rdd.RDD) by
introducing the [Resilient Distributed property Graph (RDG)](#property_graph):
a directed graph with properties attached to each vertex and edge.
To support graph computation, GraphX exposes a set of functions
(e.g., [mapReduceTriplets](#mrTriplets)) as well as optimized variants of the
[Pregel]( and [GraphLab](
APIs. In addition, GraphX includes a growing collection of graph
[RDD](api/core/index.html#org.apache.spark.rdd.RDD) by introducing the
[Resilient Distributed property Graph (RDG)](#property_graph): a directed graph
with properties attached to each vertex and edge. To support graph computation,
GraphX exposes a set of functions (e.g., [mapReduceTriplets](#mrTriplets)) as
well as an optimized variant of the [Pregel]( API. In
addition, GraphX includes a growing collection of graph
[algorithms](#graph_algorithms) and [builders](#graph_builders) to simplify
graph analytics tasks.

View file

@ -1,138 +0,0 @@
package org.apache.spark.graphx
import scala.reflect.ClassTag
import org.apache.spark.Logging
import scala.collection.JavaConversions._
import org.apache.spark.rdd.RDD
* Implements the GraphLab gather-apply-scatter API.
object GraphLab extends Logging {
* Executes the GraphLab Gather-Apply-Scatter API.
* @param graph the graph on which to execute the GraphLab API
* @param gatherFunc executed on each edge triplet
* adjacent to a vertex. Returns an accumulator which
* is then merged using the merge function.
* @param mergeFunc an accumulative associative operation on the result of
* the gather type.
* @param applyFunc takes a vertex and the final result of the merge operations
* on the adjacent edges and returns a new vertex value.
* @param scatterFunc executed after the apply function. Takes
* a triplet and signals whether the neighboring vertex program
* must be recomputed.
* @param startVertices a predicate to determine which vertices to start the computation on.
* These will be the active vertices in the first iteration.
* @param numIter the maximum number of iterations to run
* @param gatherDirection the direction of edges to consider during the gather phase
* @param scatterDirection the direction of edges to consider during the scatter phase
* @tparam VD the graph vertex attribute type
* @tparam ED the graph edge attribute type
* @tparam A the type accumulated during the gather phase
* @return the resulting graph after the algorithm converges
* @note Unlike [[Pregel]], this implementation of [[GraphLab]] does not unpersist RDDs from
* previous iterations. As a result, long-running iterative GraphLab programs will eventually fill
* the Spark cache. Though Spark will evict RDDs from old iterations eventually, garbage
* collection will take longer than necessary since it must examine the entire cache. This will be
* fixed in a future update.
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED], numIter: Int,
gatherDirection: EdgeDirection = EdgeDirection.In,
scatterDirection: EdgeDirection = EdgeDirection.Out)
(gatherFunc: (VertexID, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A,
applyFunc: (VertexID, VD, Option[A]) => VD,
scatterFunc: (VertexID, EdgeTriplet[VD, ED]) => Boolean,
startVertices: (VertexID, VD) => Boolean = (vid: VertexID, data: VD) => true)
: Graph[VD, ED] = {
// Add an active attribute to all vertices to track convergence.
var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
case (id, data) => (startVertices(id, data), data)
// The gather function wrapper strips the active attribute and
// only invokes the gather function on active vertices
def gather(vid: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
if (e.vertexAttr(vid)._1) {
val edgeTriplet = new EdgeTriplet[VD,ED]
edgeTriplet.srcAttr = e.srcAttr._2
edgeTriplet.dstAttr = e.dstAttr._2
Some(gatherFunc(vid, edgeTriplet))
} else {
// The apply function wrapper strips the vertex of the active attribute
// and only invokes the apply function on active vertices
def apply(vid: VertexID, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
val (active, vData) = data
if (active) (true, applyFunc(vid, vData, accum))
else (false, vData)
// The scatter function wrapper strips the vertex of the active attribute
// and only invokes the scatter function on active vertices
def scatter(rawVertexID: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
val vid = e.otherVertexId(rawVertexID)
if (e.vertexAttr(vid)._1) {
val edgeTriplet = new EdgeTriplet[VD,ED]
edgeTriplet.srcAttr = e.srcAttr._2
edgeTriplet.dstAttr = e.dstAttr._2
Some(scatterFunc(vid, edgeTriplet))
} else {
// Used to set the active status of vertices for the next round
def applyActive(
vid: VertexID, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
val (prevActive, vData) = data
(newActiveOpt.getOrElse(false), vData)
// Main Loop ---------------------------------------------------------------------
var i = 0
var numActive = activeGraph.numVertices
var prevActiveGraph: Graph[(Boolean, VD), ED] = null
while (i < numIter && numActive > 0) {
// Gather
val gathered: RDD[(VertexID, A)] =
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
// Apply
val applied = activeGraph.outerJoinVertices(gathered)(apply).cache()
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
val scattered: RDD[(VertexID, Boolean)] =
applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
prevActiveGraph = activeGraph
activeGraph = applied.outerJoinVertices(scattered)(applyActive).cache()
// Calculate the number of active vertices.
numActive ={
case (vid, data) => if (data._1) 1 else 0
}.reduce(_ + _)
logInfo("Number active vertices: " + numActive)
i += 1
// Remove the active attribute from the vertex data before returning the graph
activeGraph.mapVertices{case (vid, data) => data._2 }

View file

@ -65,6 +65,10 @@ object Pregel {
* @param maxIterations the maximum number of iterations to run for
* @param activeDirection the direction of edges incident to a vertex that received a message in
* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
* out-edges of vertices that received a message in the previous round will run.
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
@ -85,7 +89,8 @@ object Pregel {
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
(graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Out)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
@ -110,7 +115,7 @@ object Pregel {
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
// get to send messages. We must cache messages so it can be materialized on the next line,
// allowing us to uncache the previous iteration.
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache()
// The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).

View file

@ -53,34 +53,38 @@ object StronglyConnectedComponents {
// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)(
(vid, e) => e.otherVertexAttr(vid)._1,
(vid1, vid2) => math.min(vid1, vid2),
(vid, scc, optScc) =>
(math.min(scc._1, optScc.getOrElse(scc._1)), scc._2),
(vid, e) => e.vertexAttr(vid)._1 < e.otherVertexAttr(vid)._1
sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Long.MaxValue)(
(vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
e => {
if (e.srcId < e.dstId) {
Iterator((e.dstId, e.srcAttr._1))
} else {
(vid1, vid2) => math.min(vid1, vid2))
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match!
sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean](
sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean](
sccWorkGraph, false, activeDirection = EdgeDirection.In)(
// vertex is final if it is the root of a color
// or it has the same color as a neighbor that is final
(vid, e) => (vid == e.vertexAttr(vid)._1) || (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1),
(final1, final2) => final1 || final2,
(vid, scc, optFinal) =>
(scc._1, scc._2 || optFinal.getOrElse(false)),
// activate neighbor if they are not final, you are, and you have the same color
(vid, e) => e.vertexAttr(vid)._2 &&
!e.otherVertexAttr(vid)._2 && (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1),
// start at root of colors
(vid, data) => vid == data._1
(vid, myScc, existsSameColorFinalNeighbor) => {
val isColorRoot = vid == myScc._1
(myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor)
// activate neighbor if they are not final, you are, and you have the same color
e => {
val sameColor = e.dstAttr._1 == e.srcAttr._1
val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2
if (sameColor && onlyDstIsFinal) {
Iterator((e.srcId, e.dstAttr._2))
} else {
(final1, final2) => final1 || final2)