From e8ba51d6440574957a7e19ccecd139ac236cf091 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 12 Dec 2013 15:06:03 -0800 Subject: [PATCH 1/3] Add standalone PageRank using only GraphX operators --- .../spark/graph/algorithms/PageRank.scala | 52 +++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala index d190910c55..bb92e7c767 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala @@ -1,9 +1,10 @@ package org.apache.spark.graph.algorithms +import org.apache.spark.Logging import org.apache.spark.graph._ -object PageRank { +object PageRank extends Logging { /** * Run PageRank for a fixed number of iterations returning a graph @@ -60,7 +61,7 @@ object PageRank { .mapVertices( (id, attr) => 1.0 ) // Display statistics about pagerank - println(pagerankGraph.statistics) + logInfo(pagerankGraph.statistics.toString) // Define the three functions needed to implement PageRank in the GraphX // version of Pregel @@ -124,7 +125,7 @@ object PageRank { .mapVertices( (id, attr) => (0.0, 0.0) ) // Display statistics about pagerank - println(pagerankGraph.statistics) + logInfo(pagerankGraph.statistics.toString) // Define the three functions needed to implement PageRank in the GraphX // version of Pregel @@ -151,4 +152,49 @@ object PageRank { Pregel(pagerankGraph, initialMessage)(vertexProgram, sendMessage, messageCombiner) .mapVertices((vid, attr) => attr._1) } // end of deltaPageRank + + def runStandalone[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): VertexRDD[Double] = { + + // Initialize the ranks + var ranks: VertexRDD[Double] = graph.vertices.mapValues((vid, attr) => resetProb).cache() + + // Initialize the delta graph where each vertex stores its delta and each edge knows its weight + var deltaGraph: Graph[Double, Double] = + graph.outerJoinVertices(graph.outDegrees)((vid, vdata, deg) => deg.getOrElse(0)) + .mapTriplets(e => 1.0 / e.srcAttr) + .mapVertices((vid, degree) => resetProb).cache() + var numDeltas: Long = ranks.count() + + var i = 0 + val weight = (1.0 - resetProb) + while (numDeltas > 0) { + // Compute new deltas + val deltas = deltaGraph + .mapReduceTriplets[Double]( + et => { + if (et.srcMask) Iterator((et.dstId, et.srcAttr * et.attr * weight)) + else Iterator.empty + }, + _ + _) + .filter { case (vid, delta) => delta > tol } + .cache() + numDeltas = deltas.count() + logInfo("Standalone PageRank: iter %d has %d deltas".format(i, numDeltas)) + + // Apply deltas. Sets the mask for each vertex to false if it does not appear in deltas. + deltaGraph = deltaGraph.deltaJoinVertices(deltas).cache() + + // Update ranks + ranks = ranks.leftZipJoin(deltas) { (vid, oldRank, deltaOpt) => + oldRank + deltaOpt.getOrElse(0.0) + } + ranks.foreach(x => {}) // force the iteration for ease of debugging + + i += 1 + } + + ranks + } + } From a0fb477726f20c2c7eed0eed19008c3642a76da6 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 12 Dec 2013 15:42:55 -0800 Subject: [PATCH 2/3] Test standalone PageRank --- .../apache/spark/graph/AnalyticsSuite.scala | 93 ++++++++++++------- 1 file changed, 57 insertions(+), 36 deletions(-) diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index a3ac7470a5..2e6b57a8ec 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -51,35 +51,38 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { + a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } + .map { case (id, error) => error }.sum + } test("Star PageRank") { withSpark(new SparkContext("local", "test")) { sc => val nVertices = 100 val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() val resetProb = 0.15 - val prGraph1 = PageRank.run(starGraph, 1, resetProb) - val prGraph2 = PageRank.run(starGraph, 2, resetProb) + val errorTol = 1.0e-5 - val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) => - if (pr1 != pr2) { 1 } else { 0 } + val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache() + val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache() + + // Static PageRank should only take 2 iterations to converge + val notMatching = staticRanks1.zipJoin(staticRanks2) { (vid, pr1, pr2) => + if (pr1 != pr2) 1 else 0 }.map { case (vid, test) => test }.sum assert(notMatching === 0) - //prGraph2.vertices.foreach(println(_)) - val errors = prGraph2.vertices.map { case (vid, pr) => - val correct = (vid > 0 && pr == resetProb) || - (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) - if ( !correct ) { 1 } else { 0 } - } - assert(errors.sum === 0) - val prGraph3 = PageRank.runUntillConvergence(starGraph, 0, resetProb) - val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices){ (vid, pr1, pr2Opt) => - pr2Opt match { - case Some(pr2) if(pr1 == pr2) => 0 - case _ => 1 - } - }.map { case (vid, test) => test }.sum - assert(errors2 === 0) + val staticErrors = staticRanks2.map { case (vid, pr) => + val correct = (vid > 0 && pr == resetProb) || + (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) + if (!correct) 1 else 0 + } + assert(staticErrors.sum === 0) + + val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache() + assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + assert(compareRanks(staticRanks2, standaloneRanks) < errorTol) } } // end of test Star PageRank @@ -87,27 +90,46 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Grid PageRank") { withSpark(new SparkContext("local", "test")) { sc => - val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache() + val rows = 10 + val cols = 10 val resetProb = 0.15 - val prGraph1 = PageRank.run(gridGraph, 50, resetProb).cache() - val prGraph2 = PageRank.runUntillConvergence(gridGraph, 0.0001, resetProb).cache() - val error = prGraph1.vertices.zipJoin(prGraph2.vertices) { case (id, a, b) => (a - b) * (a - b) } - .map { case (id, error) => error }.sum - //prGraph1.vertices.zipJoin(prGraph2.vertices) { (id, a, b) => (a, b, a-b) }.foreach(println(_)) - println(error) - assert(error < 1.0e-5) - val pr3: RDD[(Vid, Double)] = sc.parallelize(GridPageRank(10,10, 50, resetProb)) - val error2 = prGraph1.vertices.leftJoin(pr3) { (id, a, bOpt) => - val b: Double = bOpt.get - (a - b) * (a - b) - }.map { case (id, error) => error }.sum - //prGraph1.vertices.leftJoin(pr3) { (id, a, b) => (a, b) }.foreach( println(_) ) - println(error2) - assert(error2 < 1.0e-5) + val tol = 0.0001 + val numIter = 50 + val errorTol = 1.0e-5 + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + + val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache() + val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache() + val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))) + + assert(compareRanks(staticRanks, referenceRanks) < errorTol) + assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) + assert(compareRanks(standaloneRanks, referenceRanks) < errorTol) } } // end of Grid PageRank + test("Chain PageRank") { + withSpark(new SparkContext("local", "test")) { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val errorTol = 1.0e-5 + + val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache() + val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache() + + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol) + } + } + + test("Grid Connected Components") { withSpark(new SparkContext("local", "test")) { sc => val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache() @@ -167,7 +189,6 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } } val ccMap = vertices.toMap - println(ccMap) for ( id <- 0 until 20 ) { if (id < 10) { assert(ccMap(id) === 0) From 3f69cdc81b21f59024477b74571cfb683a0d3ca6 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 12 Dec 2013 15:43:12 -0800 Subject: [PATCH 3/3] Use standalone PageRank in Analytics --- .../org/apache/spark/graph/Analytics.scala | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 755809b4b9..ac50e9a388 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -54,8 +54,6 @@ object Analytics extends Logging { taskType match { case "pagerank" => { - var numIter = Int.MaxValue - var isDynamic = false var tol:Float = 0.001F var outFname = "" var numVPart = 4 @@ -63,8 +61,6 @@ object Analytics extends Logging { var partitionStrategy: PartitionStrategy = RandomVertexCut options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean case ("tol", v) => tol = v.toFloat case ("output", v) => outFname = v case ("numVPart", v) => numVPart = v.toInt @@ -73,40 +69,27 @@ object Analytics extends Logging { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } - if(!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } println("======================================") println("| PageRank |") - println("--------------------------------------") - println(" Using parameters:") - println(" \tDynamic: " + isDynamic) - if(isDynamic) println(" \t |-> Tolerance: " + tol) - println(" \tNumIter: " + numIter) println("======================================") val sc = new SparkContext(host, "PageRank(" + fname + ")") val graph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache() + minEdgePartitions = numEPart, partitionStrategy = partitionStrategy).cache() - val startTime = System.currentTimeMillis - println("GRAPHX: starting tasks") println("GRAPHX: Number of vertices " + graph.vertices.count) println("GRAPHX: Number of edges " + graph.edges.count) //val pr = Analytics.pagerank(graph, numIter) - val pr = if(isDynamic) PageRank.runUntillConvergence(graph, tol, numIter) - else PageRank.run(graph, numIter) - println("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) ) - if (!outFname.isEmpty) { - println("Saving pageranks of pages to " + outFname) - pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) - } - println("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + val pr = PageRank.runStandalone(graph, tol) - Thread.sleep(100000) + println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_)) + + if (!outFname.isEmpty) { + logWarning("Saving pageranks of pages to " + outFname) + pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + } sc.stop() }