From 8b6b8ac87f6ffb92b3395344bf2696d5c7fb3798 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 10 Jan 2014 00:34:08 -0800 Subject: [PATCH] Unpersist previous iterations in GraphLab --- .../scala/org/apache/spark/graphx/GraphLab.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala index 437288405f..94cfa7e126 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala @@ -102,6 +102,7 @@ object GraphLab extends Logging { // Main Loop --------------------------------------------------------------------- var i = 0 var numActive = activeGraph.numVertices + var prevActiveGraph: Graph[(Boolean, VD), ED] = null while (i < numIter && numActive > 0) { // Gather @@ -109,22 +110,25 @@ object GraphLab extends Logging { activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) // Apply - activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache() - - + val applied = activeGraph.outerJoinVertices(gathered)(apply) // Scatter is basically a gather in the opposite direction so we reverse the edge direction - // activeGraph: Graph[(Boolean, VD), ED] val scattered: RDD[(VertexID, Boolean)] = - activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) + applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) + prevActiveGraph = activeGraph activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache() - // Calculate the number of active vertices + // Calculate the number of active vertices. The call to reduce() materializes the vertices of + // `activeGraph`, hiding the vertices of `prevActiveGraph`. numActive = activeGraph.vertices.map{ case (vid, data) => if (data._1) 1 else 0 }.reduce(_ + _) logInfo("Number active vertices: " + numActive) + + // Unpersist the RDDs hidden by newly-materialized RDDs + prevActiveGraph.unpersistVertices(blocking=false) + i += 1 }