From 14a3329a11d7b38e0fd28807aa434dae19ca52f6 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 22 Oct 2013 15:01:20 -0700 Subject: [PATCH 1/6] Changing the Pregel interface slightly to better support type inference. --- .../scala/org/apache/spark/graph/Pregel.scala | 69 +++++++++++++++++-- 1 file changed, 63 insertions(+), 6 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 7ad6fda2a4..d1f5513f6a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -10,6 +10,8 @@ import org.apache.spark.rdd.RDD object Pregel { + + /** * Execute the Pregel program. * @@ -34,12 +36,11 @@ object Pregel { * @return the resulting graph at the end of the computation * */ - def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] + (graph: Graph[VD, ED], initialMsg: A, numIter: Int)( vprog: (Vid, VD, A) => VD, sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], - mergeMsg: (A, A) => A, - initialMsg: A, - numIter: Int) + mergeMsg: (A, A) => A) : Graph[VD, ED] = { var g = graph @@ -61,5 +62,61 @@ object Pregel { } // Return the final graph g - } -} + } // end of apply + + + /** + * Execute the Pregel program. + * + * @tparam VD the vertex data type + * @tparam ED the edge data type + * @tparam A the Pregel message type + * + * @param vprog a user supplied function that acts as the vertex program for + * the Pregel computation. It takes the vertex ID of the vertex it is running on, + * the accompanying data for that vertex, and the incoming data and returns the + * new vertex value. + * @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet + * between the vertex and one of its neighbors and produces a message to send + * to that neighbor. + * @param mergeMsg a user supplied function that takes two incoming messages of type A and merges + * them into a single message of type A. ''This function must be commutative and + * associative.'' + * @param initialMsg the message each vertex will receive at the beginning of the + * first iteration. + * @param numIter the number of iterations to run this computation for. + * + * @return the resulting graph at the end of the computation + * + */ + def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] + (graph: Graph[VD, ED], initialMsg: A)( + vprog: (Vid, VD, A) => VD, + sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { + + var g = graph + //var g = graph.cache() + var i = 0 + + def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge) + + // Receive the first set of messages + g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) + + var activeMessages = g.numEdges + while (activeMessages > 0) { + // compute the messages + val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In).cache + activeMessages = messages.count + // receive the messages + g = g.joinVertices(messages)(vprog) + // count the iteration + i += 1 + } + // Return the final graph + g + } // end of apply + +} // end of class Pregel From 46b195253ecb54ff8a202a53773fc9388b2c753c Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 22 Oct 2013 15:01:49 -0700 Subject: [PATCH 2/6] Adding some additional graph generators to support unit testing of the analytics package. --- .../spark/graph/util/GraphGenerators.scala | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index 895c65c14c..1bbcce5039 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -236,7 +236,51 @@ object GraphGenerators { } } -} + + + /** + * Create `rows` by `cols` grid graph with each vertex connected to its + * row+1 and col+1 neighbors. Vertex ids are assigned in row major + * order. + * + * @param sc the spark context in which to construct the graph + * @param rows the number of rows + * @param cols the number of columns + * + * @return A graph containing vertices with the row and column ids + * as their attributes and edge values as 1.0. + */ + def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = { + // Convert row column address into vertex ids (row major order) + def sub2ind(r: Int, c: Int): Vid = r * cols + c + + val vertices: RDD[(Vid, (Int,Int))] = + sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) + val edges: RDD[Edge[Double]] = + vertices.flatMap{ case (vid, (r,c)) => + (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++ + (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty }) + }.map{ case (src, dst) => Edge(src, dst, 1.0) } + Graph(vertices, edges) + } // end of gridGraph + + /** + * Create a star graph with vertex 0 being the center. + * + * @param sc the spark context in which to construct the graph + * @param the number of vertices in the star + * + * @return A star graph containing `nverts` vertices with vertex 0 + * being the center vertex. + */ + def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { + val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) + Graph(edges, false) + } // end of starGraph + + + +} // end of Graph Generators From ba5c75692a61ae86496d4285da5d2e453ce88c36 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 22 Oct 2013 15:03:00 -0700 Subject: [PATCH 3/6] Updating analytics to reflect changes in the pregel interface and moving degree information into the edge attribute. --- .../org/apache/spark/graph/Analytics.scala | 110 +++++++++++++----- 1 file changed, 83 insertions(+), 27 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 92632db491..2093eec311 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -11,43 +11,99 @@ object Analytics extends Logging { */ def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int, - resetProb: Double = 0.15) = { - // Compute the out degree of each vertex - val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ - (vid, vdata, deg) => (deg.getOrElse(0), 1.0) - } + resetProb: Double = 0.15): Graph[Double, Double] = { + /** + * Initialize the pagerankGraph with each edge attribute + * having weight 1/outDegree and each vertex with attribute 1.0. + */ + val pagerankGraph: Graph[Double, Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to the initial pagerank values + .mapVertices( (id, attr) => 1.0 ) + + // Display statistics about pagerank println(pagerankGraph.statistics) - - Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( - (vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply - (me_id, edge) => Some(edge.srcAttr._2 / edge.srcAttr._1), // gather - (a: Double, b: Double) => a + b, // merge - 1.0, - numIter).mapVertices{ case (id, (outDeg, r)) => r } + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double = + resetProb + (1.0 - resetProb) * msgSum + def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] = + Some(edge.srcAttr / edge.attr) + def messageCombiner(a: Double, b: Double): Double = a + b + // The initial message received by all vertices in PageRank + val initialMessage = 1.0 + + // Execute pregel for a fixed number of iterations. + Pregel(pagerankGraph, initialMessage, numIter)( + vertexProgram, sendMessage, messageCombiner) } /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ - def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], - tol: Float, - maxIter: Int = Integer.MAX_VALUE, - resetProb: Double = 0.15) = { - // Compute the out degree of each vertex - val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ - (id, data, degIter) => (degIter.sum, 1.0, 1.0) + def dynamicPagerank[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], tol: Float, resetProb: Double = 0.15): Graph[Double, Double] = { + + /** + * Initialize the pagerankGraph with each edge attribute + * having weight 1/outDegree and each vertex with attribute 1.0. + */ + val pagerankGraph: Graph[(Double, Double), Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to (initalPR, delta = 0) + .mapVertices( (id, attr) => (resetProb, 0.0) ) + + // Display statistics about pagerank + println(pagerankGraph.statistics) + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = { + val (oldPR, lastDelta) = attr + val newPR = oldPR + (1.0 - resetProb) * msgSum + (newPR, newPR - oldPR) } + def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = { + if (edge.srcAttr._2 > tol) { + Some(edge.srcAttr._2 / edge.attr) + } else { None } + } + def messageCombiner(a: Double, b: Double): Double = a + b + // The initial message received by all vertices in PageRank + val initialMessage = 1.0 / (1.0 - resetProb) + + // Execute a dynamic version of Pregel. + Pregel(pagerankGraph, initialMessage)( + vertexProgram, sendMessage, messageCombiner) + .mapVertices( (vid, attr) => attr._1 ) + + + // // Compute the out degree of each vertex + // val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ + // (id, data, degIter) => (degIter.sum, 1.0, 1.0) + // } - // Run PageRank - GraphLab.iterate(pagerankGraph)( - (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather - (a: Double, b: Double) => a + b, - (id, data, a: Option[Double]) => - (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply - (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter - maxIter).mapVertices { case (vid, data) => data._2 } + // // Run PageRank + // GraphLab.iterate(pagerankGraph)( + // (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather + // (a: Double, b: Double) => a + b, + // (id, data, a: Option[Double]) => + // (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply + // (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter + // maxIter).mapVertices { case (vid, data) => data._2 } } From e3eb03d5b54db894670376b9c01a0c6c61aae083 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 22 Oct 2013 15:03:16 -0700 Subject: [PATCH 4/6] Starting analytics test suite. --- .../apache/spark/graph/AnalyticsSuite.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala new file mode 100644 index 0000000000..864d51e3f6 --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -0,0 +1,30 @@ +package org.apache.spark.graph + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.graph.LocalSparkContext._ +import org.apache.spark.graph.util.GraphGenerators +import org.apache.spark.graph.Analytics + + +class AnalyticsSuite extends FunSuite with LocalSparkContext { + + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + + val sc = new Sparkcontext("local", "test") + + test("Fixed Iterations PageRank") { + val starGraph = GraphGenerators.starGraph(sc, 1000) + val resetProb = 0.15 + val prGraph1 = Analytics.pagerank(graph, 1, resetProb) + val prGraph2 = Analytics.pagerank(grpah, 2, resetProb) + val errors = prGraph1.vertices.zipJoin(prGraph2.vertices) + .map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum + + + } + + +} // end of AnalyticsSuite From 0bd92ed8d07712b7d8bb06378d877eb9643ba05a Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 22 Oct 2013 19:10:51 -0700 Subject: [PATCH 5/6] Fixing a bug in pregel where the initial vertex-program results are lost. --- graph/src/main/scala/org/apache/spark/graph/Pregel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index d1f5513f6a..2e3f86a3f0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -50,7 +50,7 @@ object Pregel { def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge) // Receive the first set of messages - g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) + g = g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) while (i < numIter) { // compute the messages From c30624dcbb6f6999f4e4f592ac4379a18f169927 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 23 Oct 2013 00:25:45 -0700 Subject: [PATCH 6/6] Adding dynamic pregel, fixing bugs in PageRank, and adding basic analytics unit tests. --- .../org/apache/spark/graph/Analytics.scala | 14 +-- .../scala/org/apache/spark/graph/Pregel.scala | 55 +++++++---- .../apache/spark/graph/AnalyticsSuite.scala | 97 +++++++++++++++++-- 3 files changed, 129 insertions(+), 37 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 2093eec311..dc1955a835 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -35,10 +35,10 @@ object Analytics extends Logging { def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double = resetProb + (1.0 - resetProb) * msgSum def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] = - Some(edge.srcAttr / edge.attr) + Some(edge.srcAttr * edge.attr) def messageCombiner(a: Double, b: Double): Double = a + b // The initial message received by all vertices in PageRank - val initialMessage = 1.0 + val initialMessage = 0.0 // Execute pregel for a fixed number of iterations. Pregel(pagerankGraph, initialMessage, numIter)( @@ -49,8 +49,8 @@ object Analytics extends Logging { /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ - def dynamicPagerank[VD: Manifest, ED: Manifest]( - graph: Graph[VD, ED], tol: Float, resetProb: Double = 0.15): Graph[Double, Double] = { + def deltaPagerank[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = { /** * Initialize the pagerankGraph with each edge attribute @@ -64,7 +64,7 @@ object Analytics extends Logging { // Set the weight on the edges based on the degree .mapTriplets( e => 1.0 / e.srcAttr ) // Set the vertex attributes to (initalPR, delta = 0) - .mapVertices( (id, attr) => (resetProb, 0.0) ) + .mapVertices( (id, attr) => (0.0, 0.0) ) // Display statistics about pagerank println(pagerankGraph.statistics) @@ -78,12 +78,12 @@ object Analytics extends Logging { } def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = { if (edge.srcAttr._2 > tol) { - Some(edge.srcAttr._2 / edge.attr) + Some(edge.srcAttr._2 * edge.attr) } else { None } } def messageCombiner(a: Double, b: Double): Double = a + b // The initial message received by all vertices in PageRank - val initialMessage = 1.0 / (1.0 - resetProb) + val initialMessage = resetProb / (1.0 - resetProb) // Execute a dynamic version of Pregel. Pregel(pagerankGraph, initialMessage)( diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 2e3f86a3f0..94dc806fc2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -43,15 +43,12 @@ object Pregel { mergeMsg: (A, A) => A) : Graph[VD, ED] = { - var g = graph - //var g = graph.cache() - var i = 0 - def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge) // Receive the first set of messages - g = g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) - + var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) + + var i = 0 while (i < numIter) { // compute the messages val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) @@ -96,27 +93,45 @@ object Pregel { mergeMsg: (A, A) => A) : Graph[VD, ED] = { - var g = graph - //var g = graph.cache() + def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = { + msgOpt match { + case Some(msg) => (vprog(id, attr._1, msg), true) + case None => (attr._1, false) + } + } + + def sendMsgFun(vid: Vid, edge: EdgeTriplet[(VD,Boolean), ED]): Option[A] = { + if(edge.srcAttr._2) { + val et = new EdgeTriplet[VD, ED] + et.srcId = edge.srcId + et.srcAttr = edge.srcAttr._1 + et.dstId = edge.dstId + et.dstAttr = edge.dstAttr._1 + et.attr = edge.attr + sendMsg(edge.otherVertexId(vid), et) + } else { None } + } + + var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) ) + // compute the messages + var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache + var activeMessages = messages.count + // Loop var i = 0 - - def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge) - - // Receive the first set of messages - g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) - - var activeMessages = g.numEdges while (activeMessages > 0) { - // compute the messages - val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In).cache - activeMessages = messages.count // receive the messages - g = g.joinVertices(messages)(vprog) + g = g.outerJoinVertices(messages)(vprogFun) + val oldMessages = messages + // compute the messages + messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache + activeMessages = messages.count + // after counting we can unpersist the old messages + oldMessages.unpersist(blocking=false) // count the iteration i += 1 } // Return the final graph - g + g.mapVertices((id, attr) => attr._1) } // end of apply } // end of class Pregel diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index 864d51e3f6..f4a8c6b4c9 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -3,9 +3,45 @@ package org.apache.spark.graph import org.scalatest.FunSuite import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ + import org.apache.spark.graph.LocalSparkContext._ + import org.apache.spark.graph.util.GraphGenerators -import org.apache.spark.graph.Analytics + + +object GridPageRank { + def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) + val outDegree = Array.fill(nRows * nCols)(0) + // Convert row column address into vertex ids (row major order) + def sub2ind(r: Int, c: Int): Int = r * nCols + c + // Make the grid graph + for(r <- 0 until nRows; c <- 0 until nCols){ + val ind = sub2ind(r,c) + if(r+1 < nRows) { + outDegree(ind) += 1 + inNbrs(sub2ind(r+1,c)) += ind + } + if(c+1 < nCols) { + outDegree(ind) += 1 + inNbrs(sub2ind(r,c+1)) += ind + } + } + // compute the pagerank + var pr = Array.fill(nRows * nCols)(resetProb) + for(iter <- 0 until nIter) { + val oldPr = pr + pr = new Array[Double](nRows * nCols) + for(ind <- 0 until (nRows * nCols)) { + pr(ind) = resetProb + (1.0 - resetProb) * + inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum + } + } + (0L until (nRows * nCols)).zip(pr) + } + +} class AnalyticsSuite extends FunSuite with LocalSparkContext { @@ -13,18 +49,59 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") - val sc = new Sparkcontext("local", "test") - test("Fixed Iterations PageRank") { - val starGraph = GraphGenerators.starGraph(sc, 1000) - val resetProb = 0.15 - val prGraph1 = Analytics.pagerank(graph, 1, resetProb) - val prGraph2 = Analytics.pagerank(grpah, 2, resetProb) - val errors = prGraph1.vertices.zipJoin(prGraph2.vertices) - .map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum + test("Star PageRank") { + withSpark(new SparkContext("local", "test")) { sc => + val nVertices = 100 + val starGraph = GraphGenerators.starGraph(sc, nVertices) + val resetProb = 0.15 + val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb) + val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb) + + val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) + .map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum + assert(notMatching === 0) + prGraph2.vertices.foreach(println(_)) + val errors = prGraph2.vertices.map{ case (vid, pr) => + val correct = (vid > 0 && pr == resetProb) || + (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) + if ( !correct ) { 1 } else { 0 } + } + assert(errors.sum === 0) + + val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb) + val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices).map{ + case (_, (pr1, Some(pr2))) if(pr1 == pr2) => 0 + case _ => 1 + }.sum + assert(errors2 === 0) + } + } // end of test Star PageRank - } + test("Grid PageRank") { + withSpark(new SparkContext("local", "test")) { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10) + val resetProb = 0.15 + val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache() + val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache() + val error = prGraph1.vertices.zipJoin(prGraph2.vertices).map { + case (id, (a, b)) => (a - b) * (a - b) + }.sum + prGraph1.vertices.zipJoin(prGraph2.vertices) + .map{ case (id, (a,b)) => (id, (a,b, a-b))}.foreach(println(_)) + println(error) + assert(error < 1.0e-5) + val pr3 = sc.parallelize(GridPageRank(10,10, 50, resetProb)) + val error2 = prGraph1.vertices.leftJoin(pr3).map { + case (id, (a, Some(b))) => (a - b) * (a - b) + case _ => 0 + }.sum + prGraph1.vertices.leftJoin(pr3).foreach(println( _ )) + println(error2) + assert(error2 < 1.0e-5) + } + } // end of Grid PageRank } // end of AnalyticsSuite