diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala index 0c50ad09c7..8ee631bdba 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala @@ -60,8 +60,7 @@ class VTableReplicated[VD: ClassManifest]( prev.zipPartitions(msgsByPartition) { (vTableIter, msgsIter) => val (pid, vertexPartition) = vTableIter.next() - val (_, block) = msgsIter.next() - val newVPart = vertexPartition.updateUsingIndex(block.iterator)(vdManifest) + val newVPart = vertexPartition.updateUsingIndex(msgsIter.flatMap(_._2.iterator))(vdManifest) Iterator((pid, newVPart)) }.cache() diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index 0af445fa7d..fe005c8723 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -42,7 +42,10 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( /** Return the vertex attribute for the given vertex ID. */ def apply(vid: Vid): VD = values(index.getPos(vid)) - def isDefined(vid: Vid): Boolean = mask.get(index.getPos(vid)) + def isDefined(vid: Vid): Boolean = { + val pos = index.getPos(vid) + pos >= 0 && mask.get(pos) + } /** * Pass each vertex attribute along with the vertex id through a map diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index e719ca0872..a3ac7470a5 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -64,7 +64,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { if (pr1 != pr2) { 1 } else { 0 } }.map { case (vid, test) => test }.sum assert(notMatching === 0) - prGraph2.vertices.foreach(println(_)) + //prGraph2.vertices.foreach(println(_)) val errors = prGraph2.vertices.map { case (vid, pr) => val correct = (vid > 0 && pr == resetProb) || (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) @@ -141,7 +141,6 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { else { assert(cc === 10) } } val ccMap = vertices.toMap - println(ccMap) for (id <- 0 until 20) { if (id < 10) { assert(ccMap(id) === 0)