Unpersist previous iterations in GraphLab

This commit is contained in:
Ankur Dave 2014-01-10 00:34:08 -08:00
parent 2578332f97
commit 8b6b8ac87f

View file

@ -102,6 +102,7 @@ object GraphLab extends Logging {
// Main Loop ---------------------------------------------------------------------
var i = 0
var numActive = activeGraph.numVertices
var prevActiveGraph: Graph[(Boolean, VD), ED] = null
while (i < numIter && numActive > 0) {
// Gather
@ -109,22 +110,25 @@ object GraphLab extends Logging {
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
// Apply
activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache()
val applied = activeGraph.outerJoinVertices(gathered)(apply)
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
// activeGraph: Graph[(Boolean, VD), ED]
val scattered: RDD[(VertexID, Boolean)] =
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
prevActiveGraph = activeGraph
activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
// Calculate the number of active vertices
// Calculate the number of active vertices. The call to reduce() materializes the vertices of
// `activeGraph`, hiding the vertices of `prevActiveGraph`.
numActive = activeGraph.vertices.map{
case (vid, data) => if (data._1) 1 else 0
}.reduce(_ + _)
logInfo("Number active vertices: " + numActive)
// Unpersist the RDDs hidden by newly-materialized RDDs
prevActiveGraph.unpersistVertices(blocking=false)
i += 1
}