diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 75248d515e..47c358f491 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -137,12 +137,12 @@ object Pregel extends Logging { val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)]( checkpointInterval, graph.vertices.sparkContext) messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) - var activeMessages = messages.count() + var isActiveMessagesNonEmpty = !messages.isEmpty() // Loop var prevG: Graph[VD, ED] = null var i = 0 - while (activeMessages > 0 && i < maxIterations) { + while (isActiveMessagesNonEmpty && i < maxIterations) { // Receive the messages and update the vertices. prevG = g g = g.joinVertices(messages)(vprog) @@ -158,7 +158,7 @@ object Pregel extends Logging { // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages // and the vertices of g). messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) - activeMessages = messages.count() + isActiveMessagesNonEmpty = !messages.isEmpty() logInfo("Pregel finished iteration " + i)