diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 92632db491..2093eec311 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -11,43 +11,99 @@ object Analytics extends Logging { */ def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int, - resetProb: Double = 0.15) = { - // Compute the out degree of each vertex - val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ - (vid, vdata, deg) => (deg.getOrElse(0), 1.0) - } + resetProb: Double = 0.15): Graph[Double, Double] = { + /** + * Initialize the pagerankGraph with each edge attribute + * having weight 1/outDegree and each vertex with attribute 1.0. + */ + val pagerankGraph: Graph[Double, Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to the initial pagerank values + .mapVertices( (id, attr) => 1.0 ) + + // Display statistics about pagerank println(pagerankGraph.statistics) - - Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( - (vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply - (me_id, edge) => Some(edge.srcAttr._2 / edge.srcAttr._1), // gather - (a: Double, b: Double) => a + b, // merge - 1.0, - numIter).mapVertices{ case (id, (outDeg, r)) => r } + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double = + resetProb + (1.0 - resetProb) * msgSum + def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] = + Some(edge.srcAttr / edge.attr) + def messageCombiner(a: Double, b: Double): Double = a + b + // The initial message received by all vertices in PageRank + val initialMessage = 1.0 + + // Execute pregel for a fixed number of iterations. + Pregel(pagerankGraph, initialMessage, numIter)( + vertexProgram, sendMessage, messageCombiner) } /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ - def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], - tol: Float, - maxIter: Int = Integer.MAX_VALUE, - resetProb: Double = 0.15) = { - // Compute the out degree of each vertex - val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ - (id, data, degIter) => (degIter.sum, 1.0, 1.0) + def dynamicPagerank[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], tol: Float, resetProb: Double = 0.15): Graph[Double, Double] = { + + /** + * Initialize the pagerankGraph with each edge attribute + * having weight 1/outDegree and each vertex with attribute 1.0. + */ + val pagerankGraph: Graph[(Double, Double), Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to (initalPR, delta = 0) + .mapVertices( (id, attr) => (resetProb, 0.0) ) + + // Display statistics about pagerank + println(pagerankGraph.statistics) + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = { + val (oldPR, lastDelta) = attr + val newPR = oldPR + (1.0 - resetProb) * msgSum + (newPR, newPR - oldPR) } + def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = { + if (edge.srcAttr._2 > tol) { + Some(edge.srcAttr._2 / edge.attr) + } else { None } + } + def messageCombiner(a: Double, b: Double): Double = a + b + // The initial message received by all vertices in PageRank + val initialMessage = 1.0 / (1.0 - resetProb) + + // Execute a dynamic version of Pregel. + Pregel(pagerankGraph, initialMessage)( + vertexProgram, sendMessage, messageCombiner) + .mapVertices( (vid, attr) => attr._1 ) + + + // // Compute the out degree of each vertex + // val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ + // (id, data, degIter) => (degIter.sum, 1.0, 1.0) + // } - // Run PageRank - GraphLab.iterate(pagerankGraph)( - (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather - (a: Double, b: Double) => a + b, - (id, data, a: Option[Double]) => - (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply - (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter - maxIter).mapVertices { case (vid, data) => data._2 } + // // Run PageRank + // GraphLab.iterate(pagerankGraph)( + // (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather + // (a: Double, b: Double) => a + b, + // (id, data, a: Option[Double]) => + // (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply + // (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter + // maxIter).mapVertices { case (vid, data) => data._2 } }