diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index 18abbe4f2f..c5ac94e6bf 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -40,6 +40,22 @@ object Analytics { } + /** + * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD + */ + def pregelPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { + // Compute the out degree of each vertex + val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, + (vertex, degIter) => (degIter.sum, 1.0F) + ) + Pregel.iterate[(Int, Float), ED, Float](pagerankGraph)( + (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply + (me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather + (a: Float, b: Float) => a + b, // merge + numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } + } + + /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */