diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index 799c0fc901..aa35f9a746 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -89,9 +89,9 @@ object GraphLab { } // Used to set the active status of vertices for the next round - def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = { + def applyActive(vid: Vid, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = { val (prevActive, vData) = data - (newActive, vData) + (newActiveOpt.getOrElse(false), vData) } // Main Loop --------------------------------------------------------------------- @@ -113,7 +113,7 @@ object GraphLab { val scattered: RDD[(Vid, Boolean)] = activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) - activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache() + activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache() // Calculate the number of active vertices numActive = activeGraph.vertices.map{