Return Graph from aggregateNeighbors; update callers

This commit only affects the Graph API, not GraphImpl.
This commit is contained in:
Ankur Dave 2013-09-02 20:52:03 -07:00
parent 9335aff946
commit 9ff783599b
4 changed files with 56 additions and 34 deletions

View file

@ -227,12 +227,12 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}} * }}}
* *
*/ */
def aggregateNeighbors[VD2: ClassManifest]( def aggregateNeighbors[A: ClassManifest](
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[VD2], mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeFunc: (VD2, VD2) => VD2, mergeFunc: (A, A) => A,
direction: EdgeDirection) direction: EdgeDirection)
: RDD[(Vid, VD2)] : Graph[(VD, Option[A]), ED]
// TODO: consider a version that doesn't preserve the original VD
/** /**
* This function is used to compute a statistic for the neighborhood of each * This function is used to compute a statistic for the neighborhood of each

View file

@ -44,13 +44,13 @@ object GraphLab {
// Add an active attribute to all vertices to track convergence. // Add an active attribute to all vertices to track convergence.
var activeGraph = graph.mapVertices { var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
case Vertex(id, data) => (true, data) case Vertex(id, data) => (true, data)
}.cache() }.cache()
// The gather function wrapper strips the active attribute and // The gather function wrapper strips the active attribute and
// only invokes the gather function on active vertices // only invokes the gather function on active vertices
def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]) = { def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
if (e.vertex(vid).data._1) { if (e.vertex(vid).data._1) {
val edge = new EdgeTriplet[VD,ED] val edge = new EdgeTriplet[VD,ED]
edge.src = Vertex(e.src.id, e.src.data._2) edge.src = Vertex(e.src.id, e.src.data._2)
@ -64,14 +64,15 @@ object GraphLab {
// The apply function wrapper strips the vertex of the active attribute // The apply function wrapper strips the vertex of the active attribute
// and only invokes the apply function on active vertices // and only invokes the apply function on active vertices
def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = { def apply(v: Vertex[((Boolean, VD), Option[A])]): (Boolean, VD) = {
if (v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum)) val ((active, vData), accum) = v.data
else (false, v.data._2) if (active) (true, applyFunc(Vertex(v.id, vData), accum))
else (false, vData)
} }
// The scatter function wrapper strips the vertex of the active attribute // The scatter function wrapper strips the vertex of the active attribute
// and only invokes the scatter function on active vertices // and only invokes the scatter function on active vertices
def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]) = { def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
val vid = e.otherVertex(rawVid).id val vid = e.otherVertex(rawVid).id
if (e.vertex(vid).data._1) { if (e.vertex(vid).data._1) {
val edge = new EdgeTriplet[VD,ED] val edge = new EdgeTriplet[VD,ED]
@ -88,24 +89,31 @@ object GraphLab {
} }
// Used to set the active status of vertices for the next round // Used to set the active status of vertices for the next round
def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) = def applyActive(v: Vertex[((Boolean, VD), Option[Boolean])]): (Boolean, VD) = {
(accum.getOrElse(false), v.data._2) val ((prevActive, vData), newActive) = v.data
(newActive.getOrElse(false), vData)
}
// Main Loop --------------------------------------------------------------------- // Main Loop ---------------------------------------------------------------------
var i = 0 var i = 0
var numActive = activeGraph.numVertices var numActive = activeGraph.numVertices
while (i < numIter && numActive > 0) { while (i < numIter && numActive > 0) {
val accUpdates: RDD[(Vid, A)] = val gathered: Graph[((Boolean, VD), Option[A]), ED] =
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
activeGraph = activeGraph.leftJoinVertices(accUpdates, apply).cache() val applied: Graph[(Boolean, VD), ED] = gathered.mapVertices(apply).cache()
activeGraph = applied.cache()
// Scatter is basically a gather in the opposite direction so we reverse the edge direction // Scatter is basically a gather in the opposite direction so we reverse the edge direction
val activeVertices: RDD[(Vid, Boolean)] = // activeGraph: Graph[(Boolean, VD), ED]
val scattered: Graph[((Boolean, VD), Option[Boolean]), ED] =
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
val newActiveGraph: Graph[(Boolean, VD), ED] =
scattered.mapVertices(applyActive)
activeGraph = activeGraph.leftJoinVertices(activeVertices, applyActive).cache() activeGraph = newActiveGraph.cache()
numActive = activeGraph.vertices.map(v => if (v.data._1) 1 else 0).reduce(_ + _) numActive = activeGraph.vertices.map(v => if (v.data._1) 1 else 0).reduce(_ + _)
println("Number active vertices: " + numActive) println("Number active vertices: " + numActive)

View file

@ -9,22 +9,29 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) {
lazy val numVertices: Long = g.vertices.count() lazy val numVertices: Long = g.vertices.count()
lazy val inDegrees: RDD[(Vid, Int)] = { lazy val inDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.In)
g.aggregateNeighbors((vid, edge) => Some(1), _+_, EdgeDirection.In)
}
lazy val outDegrees: RDD[(Vid, Int)] = { lazy val outDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Out)
g.aggregateNeighbors((vid, edge) => Some(1), _+_, EdgeDirection.Out)
}
lazy val degrees: RDD[(Vid, Int)] = { lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both)
g.aggregateNeighbors((vid, edge) => Some(1), _+_, EdgeDirection.Both)
}
def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
g.aggregateNeighbors( val graph: Graph[(VD, Option[Array[Vid]]), ED] = g.aggregateNeighbors(
(vid, edge) => Some(Array(edge.otherVertex(vid).id)), (vid, edge) => Some(Array(edge.otherVertex(vid).id)),
(a, b) => a ++ b, (a, b) => a ++ b,
edgeDirection) edgeDirection)
graph.vertices.map(v => {
val (_, neighborIds) = v.data
(v.id, neighborIds.getOrElse(Array()))
})
}
private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = {
val degreeGraph: Graph[(VD, Option[Int]), ED] =
g.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
degreeGraph.vertices.map(v => {
val (_, degree) = v.data
(v.id, degree.getOrElse(0))
})
} }
} }

View file

@ -19,18 +19,25 @@ object Pregel {
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge) def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge)
def runProg(v: Vertex[VD], msg: Option[A]): VD = { def runProg(vertexWithMsgs: Vertex[(VD, Option[A])]): VD = {
if (msg.isEmpty) v.data else vprog(v, msg.get) val (vData, msg) = vertexWithMsgs.data
val v = Vertex(vertexWithMsgs.id, vData)
msg match {
case Some(m) => vprog(v, m)
case None => v.data
}
} }
var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) } var graphWithMsgs: Graph[(VD, Option[A]), ED] =
g.mapVertices(v => (v.data, Some(initialMsg)))
while (i < numIter) { while (i < numIter) {
g = g.leftJoinVertices(msgs, runProg).cache() val newGraph: Graph[VD, ED] = graphWithMsgs.mapVertices(runProg).cache()
msgs = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) graphWithMsgs = newGraph.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
i += 1 i += 1
} }
g graphWithMsgs.mapVertices(vertexWithMsgs => vertexWithMsgs.data match {
case (vData, _) => vData
})
} }
} }