Clean up Pregel.run, add logging
This commit is contained in:
parent
c0736f6f68
commit
62ef620354
|
@ -18,61 +18,58 @@ object Pregel extends Logging {
|
|||
* all vertices have voted to halt by setting their state to
|
||||
* Inactive.
|
||||
*/
|
||||
def run[V <: Vertex : Manifest, M <: Message : Manifest, C](sc: SparkContext, verts: RDD[(String, V)], msgs: RDD[(String, M)], splits: Int, messageCombiner: (C, M) => C, defaultCombined: () => C, mergeCombined: (C, C) => C, superstep: Int = 0)(compute: (V, C, Int) => (V, Iterable[M])): RDD[V] = {
|
||||
println("Starting superstep "+superstep+".")
|
||||
def run[V <: Vertex : Manifest, M <: Message : Manifest, C](sc: SparkContext, verts: RDD[(String, V)], msgs: RDD[(String, M)], splits: Int, messageCombiner: (C, M) => C, defaultCombined: () => C, mergeCombined: (C, C) => C, maxSupersteps: Option[Int] = None, superstep: Int = 0)(compute: (V, C, Int) => (V, Iterable[M])): RDD[V] = {
|
||||
logInfo("Starting superstep "+superstep+".")
|
||||
val startTime = System.currentTimeMillis
|
||||
|
||||
// Bring together vertices and messages
|
||||
println("Joining vertices and messages...")
|
||||
val combinedMsgs = msgs.combineByKey({x => messageCombiner(defaultCombined(), x)}, messageCombiner, mergeCombined, splits)
|
||||
println("verts.splits.size = " + verts.splits.size)
|
||||
println("combinedMsgs.splits.size = " + combinedMsgs.splits.size)
|
||||
println("verts.partitioner = " + verts.partitioner)
|
||||
println("combinedMsgs.partitioner = " + combinedMsgs.partitioner)
|
||||
logDebug("verts.splits.size = " + verts.splits.size)
|
||||
logDebug("combinedMsgs.splits.size = " + combinedMsgs.splits.size)
|
||||
logDebug("verts.partitioner = " + verts.partitioner)
|
||||
logDebug("combinedMsgs.partitioner = " + combinedMsgs.partitioner)
|
||||
|
||||
val joined = verts.groupWith(combinedMsgs)
|
||||
println("joined.splits.size = " + joined.splits.size)
|
||||
println("joined.partitioner = " + joined.partitioner)
|
||||
//val joined = graph.groupByKeyAsymmetrical(messageCombiner, defaultCombined, mergeCombined, splits)
|
||||
println("Done joining vertices and messages.")
|
||||
logDebug("joined.splits.size = " + joined.splits.size)
|
||||
logDebug("joined.partitioner = " + joined.partitioner)
|
||||
|
||||
// Run compute on each vertex
|
||||
println("Running compute on each vertex...")
|
||||
var messageCount = sc.accumulator(0)
|
||||
var activeVertexCount = sc.accumulator(0)
|
||||
val processed = joined.flatMapValues {
|
||||
case (Seq(), _) => None
|
||||
case (Seq(v), Seq(comb)) =>
|
||||
val (newVertex, newMessages) = compute(v, comb, superstep)
|
||||
|
||||
messageCount += newMessages.size
|
||||
if (newVertex.active)
|
||||
activeVertexCount += 1
|
||||
|
||||
Some((newVertex, newMessages))
|
||||
//val result = ArrayBuffer[(String, Either[V, M])]((newVertex.id, Left(newVertex)))
|
||||
//result ++= newMessages.map(m => (m.targetId, Right(m)))
|
||||
case (Seq(v), Seq()) =>
|
||||
val (newVertex, newMessages) = compute(v, defaultCombined(), superstep)
|
||||
|
||||
messageCount += newMessages.size
|
||||
if (newVertex.active)
|
||||
activeVertexCount += 1
|
||||
|
||||
Some((newVertex, newMessages))
|
||||
}.cache
|
||||
//MATEI: Added this
|
||||
// Force evaluation of processed RDD for accurate performance measurements
|
||||
processed.foreach(x => {})
|
||||
println("Done running compute on each vertex.")
|
||||
|
||||
println("Checking stopping condition...")
|
||||
val stop = messageCount.value == 0 && activeVertexCount.value == 0
|
||||
|
||||
val timeTaken = System.currentTimeMillis - startTime
|
||||
println("Superstep %d took %d s".format(superstep, timeTaken / 1000))
|
||||
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
|
||||
|
||||
val newVerts = processed.mapValues(_._1)
|
||||
val newMsgs = processed.flatMap(x => x._2._2.map(m => (m.targetId, m)))
|
||||
|
||||
if (superstep >= 10)
|
||||
// Check stopping condition and recurse
|
||||
val stop = messageCount.value == 0 && activeVertexCount.value == 0
|
||||
if (stop || (maxSupersteps.isDefined && superstep >= maxSupersteps.get)) {
|
||||
processed.map { _._2._1 }
|
||||
else
|
||||
run(sc, newVerts, newMsgs, splits, messageCombiner, defaultCombined, mergeCombined, superstep + 1)(compute)
|
||||
} else {
|
||||
val newVerts = processed.mapValues(_._1)
|
||||
val newMsgs = processed.flatMap(x => x._2._2.map(m => (m.targetId, m)))
|
||||
run(sc, newVerts, newMsgs, splits, messageCombiner, defaultCombined, mergeCombined, maxSupersteps, superstep + 1)(compute)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue