added replication and balance reporting

This commit is contained in:
Joseph E. Gonzalez 2013-10-10 14:48:40 -07:00
parent 5f756fb63f
commit fa2f87ca63
3 changed files with 27 additions and 0 deletions

View file

@ -47,6 +47,17 @@ object Analytics extends Logging {
val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0)
)
println("Vertex Replication: " + pagerankGraph.replication)
val edgeCounts = pagerankGraph.balance
println("Edge Balance: " + (edgeCounts.max.toDouble / edgeCounts.min ) )
println("Min edge block: " + edgeCounts.min)
println("Max edge block: " + edgeCounts.max)
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(
(vertex, a: Double) => (vertex.data._1, (resetProb + (1.0 - resetProb) * a)), // apply
(me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather

View file

@ -18,6 +18,12 @@ import org.apache.spark.rdd.RDD
*/
abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def replication: Double
def balance: Array[Int]
/**
* Get the vertices and their data.
*

View file

@ -56,6 +56,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
this
}
override def replication(): Double = {
val rep = vTable.map{ case (_, (_, a)) => a.size }.sum
rep / vTable.count
}
override def balance(): Array[Int] = {
eTable.map{ case (_, epart) => epart.data.size }.collect
}
override def reverse: Graph[VD, ED] = {
newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) })
}