Fixed a bug in VTableReplicated that we only process the first block.

This commit is contained in:
Reynold Xin 2013-12-06 00:51:12 -08:00
parent 3b0ee53eda
commit 41721b1494
3 changed files with 6 additions and 5 deletions

View file

@ -60,8 +60,7 @@ class VTableReplicated[VD: ClassManifest](
prev.zipPartitions(msgsByPartition) { (vTableIter, msgsIter) =>
val (pid, vertexPartition) = vTableIter.next()
val (_, block) = msgsIter.next()
val newVPart = vertexPartition.updateUsingIndex(block.iterator)(vdManifest)
val newVPart = vertexPartition.updateUsingIndex(msgsIter.flatMap(_._2.iterator))(vdManifest)
Iterator((pid, newVPart))
}.cache()

View file

@ -42,7 +42,10 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
/** Return the vertex attribute for the given vertex ID. */
def apply(vid: Vid): VD = values(index.getPos(vid))
def isDefined(vid: Vid): Boolean = mask.get(index.getPos(vid))
def isDefined(vid: Vid): Boolean = {
val pos = index.getPos(vid)
pos >= 0 && mask.get(pos)
}
/**
* Pass each vertex attribute along with the vertex id through a map

View file

@ -64,7 +64,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
if (pr1 != pr2) { 1 } else { 0 }
}.map { case (vid, test) => test }.sum
assert(notMatching === 0)
prGraph2.vertices.foreach(println(_))
//prGraph2.vertices.foreach(println(_))
val errors = prGraph2.vertices.map { case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) ||
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
@ -141,7 +141,6 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
else { assert(cc === 10) }
}
val ccMap = vertices.toMap
println(ccMap)
for (id <- 0 until 20) {
if (id < 10) {
assert(ccMap(id) === 0)