diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 92632db491..dc1955a835 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -11,43 +11,99 @@ object Analytics extends Logging { */ def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int, - resetProb: Double = 0.15) = { - // Compute the out degree of each vertex - val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ - (vid, vdata, deg) => (deg.getOrElse(0), 1.0) - } + resetProb: Double = 0.15): Graph[Double, Double] = { + /** + * Initialize the pagerankGraph with each edge attribute + * having weight 1/outDegree and each vertex with attribute 1.0. + */ + val pagerankGraph: Graph[Double, Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to the initial pagerank values + .mapVertices( (id, attr) => 1.0 ) + + // Display statistics about pagerank println(pagerankGraph.statistics) - - Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( - (vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply - (me_id, edge) => Some(edge.srcAttr._2 / edge.srcAttr._1), // gather - (a: Double, b: Double) => a + b, // merge - 1.0, - numIter).mapVertices{ case (id, (outDeg, r)) => r } + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double = + resetProb + (1.0 - resetProb) * msgSum + def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] = + Some(edge.srcAttr * edge.attr) + def messageCombiner(a: Double, b: Double): Double = a + b + // The initial message received by all vertices in PageRank + val initialMessage = 0.0 + + // Execute pregel for a fixed number of iterations. + Pregel(pagerankGraph, initialMessage, numIter)( + vertexProgram, sendMessage, messageCombiner) } /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ - def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], - tol: Float, - maxIter: Int = Integer.MAX_VALUE, - resetProb: Double = 0.15) = { - // Compute the out degree of each vertex - val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ - (id, data, degIter) => (degIter.sum, 1.0, 1.0) + def deltaPagerank[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = { + + /** + * Initialize the pagerankGraph with each edge attribute + * having weight 1/outDegree and each vertex with attribute 1.0. + */ + val pagerankGraph: Graph[(Double, Double), Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to (initalPR, delta = 0) + .mapVertices( (id, attr) => (0.0, 0.0) ) + + // Display statistics about pagerank + println(pagerankGraph.statistics) + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = { + val (oldPR, lastDelta) = attr + val newPR = oldPR + (1.0 - resetProb) * msgSum + (newPR, newPR - oldPR) } + def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = { + if (edge.srcAttr._2 > tol) { + Some(edge.srcAttr._2 * edge.attr) + } else { None } + } + def messageCombiner(a: Double, b: Double): Double = a + b + // The initial message received by all vertices in PageRank + val initialMessage = resetProb / (1.0 - resetProb) + + // Execute a dynamic version of Pregel. + Pregel(pagerankGraph, initialMessage)( + vertexProgram, sendMessage, messageCombiner) + .mapVertices( (vid, attr) => attr._1 ) + + + // // Compute the out degree of each vertex + // val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ + // (id, data, degIter) => (degIter.sum, 1.0, 1.0) + // } - // Run PageRank - GraphLab.iterate(pagerankGraph)( - (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather - (a: Double, b: Double) => a + b, - (id, data, a: Option[Double]) => - (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply - (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter - maxIter).mapVertices { case (vid, data) => data._2 } + // // Run PageRank + // GraphLab.iterate(pagerankGraph)( + // (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather + // (a: Double, b: Double) => a + b, + // (id, data, a: Option[Double]) => + // (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply + // (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter + // maxIter).mapVertices { case (vid, data) => data._2 } } diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 7ad6fda2a4..94dc806fc2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -10,6 +10,8 @@ import org.apache.spark.rdd.RDD object Pregel { + + /** * Execute the Pregel program. * @@ -34,23 +36,19 @@ object Pregel { * @return the resulting graph at the end of the computation * */ - def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] + (graph: Graph[VD, ED], initialMsg: A, numIter: Int)( vprog: (Vid, VD, A) => VD, sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], - mergeMsg: (A, A) => A, - initialMsg: A, - numIter: Int) + mergeMsg: (A, A) => A) : Graph[VD, ED] = { - var g = graph - //var g = graph.cache() - var i = 0 - def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge) // Receive the first set of messages - g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) - + var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) + + var i = 0 while (i < numIter) { // compute the messages val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) @@ -61,5 +59,79 @@ object Pregel { } // Return the final graph g - } -} + } // end of apply + + + /** + * Execute the Pregel program. + * + * @tparam VD the vertex data type + * @tparam ED the edge data type + * @tparam A the Pregel message type + * + * @param vprog a user supplied function that acts as the vertex program for + * the Pregel computation. It takes the vertex ID of the vertex it is running on, + * the accompanying data for that vertex, and the incoming data and returns the + * new vertex value. + * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet + * between the vertex and one of its neighbors and produces a message to send + * to that neighbor. + * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges + * them into a single message of type A. ''This function must be commutative and + * associative.'' + * @param initialMsg the message each vertex will receive at the beginning of the + * first iteration. + * @param numIter the number of iterations to run this computation for. + * + * @return the resulting graph at the end of the computation + * + */ + def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] + (graph: Graph[VD, ED], initialMsg: A)( + vprog: (Vid, VD, A) => VD, + sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { + + def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = { + msgOpt match { + case Some(msg) => (vprog(id, attr._1, msg), true) + case None => (attr._1, false) + } + } + + def sendMsgFun(vid: Vid, edge: EdgeTriplet[(VD,Boolean), ED]): Option[A] = { + if(edge.srcAttr._2) { + val et = new EdgeTriplet[VD, ED] + et.srcId = edge.srcId + et.srcAttr = edge.srcAttr._1 + et.dstId = edge.dstId + et.dstAttr = edge.dstAttr._1 + et.attr = edge.attr + sendMsg(edge.otherVertexId(vid), et) + } else { None } + } + + var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) ) + // compute the messages + var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache + var activeMessages = messages.count + // Loop + var i = 0 + while (activeMessages > 0) { + // receive the messages + g = g.outerJoinVertices(messages)(vprogFun) + val oldMessages = messages + // compute the messages + messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache + activeMessages = messages.count + // after counting we can unpersist the old messages + oldMessages.unpersist(blocking=false) + // count the iteration + i += 1 + } + // Return the final graph + g.mapVertices((id, attr) => attr._1) + } // end of apply + +} // end of class Pregel diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index 895c65c14c..1bbcce5039 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -236,7 +236,51 @@ object GraphGenerators { } } -} + + + /** + * Create `rows` by `cols` grid graph with each vertex connected to its + * row+1 and col+1 neighbors. Vertex ids are assigned in row major + * order. + * + * @param sc the spark context in which to construct the graph + * @param rows the number of rows + * @param cols the number of columns + * + * @return A graph containing vertices with the row and column ids + * as their attributes and edge values as 1.0. + */ + def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = { + // Convert row column address into vertex ids (row major order) + def sub2ind(r: Int, c: Int): Vid = r * cols + c + + val vertices: RDD[(Vid, (Int,Int))] = + sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) + val edges: RDD[Edge[Double]] = + vertices.flatMap{ case (vid, (r,c)) => + (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++ + (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty }) + }.map{ case (src, dst) => Edge(src, dst, 1.0) } + Graph(vertices, edges) + } // end of gridGraph + + /** + * Create a star graph with vertex 0 being the center. + * + * @param sc the spark context in which to construct the graph + * @param the number of vertices in the star + * + * @return A star graph containing `nverts` vertices with vertex 0 + * being the center vertex. + */ + def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { + val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) + Graph(edges, false) + } // end of starGraph + + + +} // end of Graph Generators diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala new file mode 100644 index 0000000000..f4a8c6b4c9 --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -0,0 +1,107 @@ +package org.apache.spark.graph + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ + +import org.apache.spark.graph.LocalSparkContext._ + +import org.apache.spark.graph.util.GraphGenerators + + +object GridPageRank { + def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) + val outDegree = Array.fill(nRows * nCols)(0) + // Convert row column address into vertex ids (row major order) + def sub2ind(r: Int, c: Int): Int = r * nCols + c + // Make the grid graph + for(r <- 0 until nRows; c <- 0 until nCols){ + val ind = sub2ind(r,c) + if(r+1 < nRows) { + outDegree(ind) += 1 + inNbrs(sub2ind(r+1,c)) += ind + } + if(c+1 < nCols) { + outDegree(ind) += 1 + inNbrs(sub2ind(r,c+1)) += ind + } + } + // compute the pagerank + var pr = Array.fill(nRows * nCols)(resetProb) + for(iter <- 0 until nIter) { + val oldPr = pr + pr = new Array[Double](nRows * nCols) + for(ind <- 0 until (nRows * nCols)) { + pr(ind) = resetProb + (1.0 - resetProb) * + inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum + } + } + (0L until (nRows * nCols)).zip(pr) + } + +} + + +class AnalyticsSuite extends FunSuite with LocalSparkContext { + + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + + + test("Star PageRank") { + withSpark(new SparkContext("local", "test")) { sc => + val nVertices = 100 + val starGraph = GraphGenerators.starGraph(sc, nVertices) + val resetProb = 0.15 + val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb) + val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb) + + val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) + .map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum + assert(notMatching === 0) + prGraph2.vertices.foreach(println(_)) + val errors = prGraph2.vertices.map{ case (vid, pr) => + val correct = (vid > 0 && pr == resetProb) || + (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) + if ( !correct ) { 1 } else { 0 } + } + assert(errors.sum === 0) + + val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb) + val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices).map{ + case (_, (pr1, Some(pr2))) if(pr1 == pr2) => 0 + case _ => 1 + }.sum + assert(errors2 === 0) + } + } // end of test Star PageRank + + + test("Grid PageRank") { + withSpark(new SparkContext("local", "test")) { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10) + val resetProb = 0.15 + val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache() + val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache() + val error = prGraph1.vertices.zipJoin(prGraph2.vertices).map { + case (id, (a, b)) => (a - b) * (a - b) + }.sum + prGraph1.vertices.zipJoin(prGraph2.vertices) + .map{ case (id, (a,b)) => (id, (a,b, a-b))}.foreach(println(_)) + println(error) + assert(error < 1.0e-5) + val pr3 = sc.parallelize(GridPageRank(10,10, 50, resetProb)) + val error2 = prGraph1.vertices.leftJoin(pr3).map { + case (id, (a, Some(b))) => (a - b) * (a - b) + case _ => 0 + }.sum + prGraph1.vertices.leftJoin(pr3).foreach(println( _ )) + println(error2) + assert(error2 < 1.0e-5) + } + } // end of Grid PageRank + + +} // end of AnalyticsSuite