diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 102dc2d2dd..5dd6f13235 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -140,8 +140,32 @@ object PageRank extends Logging { */ def runWithOptions[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, - srcId: Option[VertexId] = None): Graph[Double, Double] = - { + srcId: Option[VertexId] = None): Graph[Double, Double] = { + runWithOptions(graph, numIter, resetProb, srcId, normalized = true) + } + + /** + * Run PageRank for a fixed number of iterations returning a graph + * with vertex attributes containing the PageRank and edge + * attributes the normalized edge weight. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param numIter the number of iterations of PageRank to run + * @param resetProb the random reset probability (alpha) + * @param srcId the source vertex for a Personalized Page Rank (optional) + * @param normalized whether or not to normalize rank sum + * + * @return the graph containing with each vertex containing the PageRank and each edge + * containing the normalized weight. + * + * @since 3.2.0 + */ + def runWithOptions[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numIter: Int, resetProb: Double, + srcId: Option[VertexId], normalized: Boolean): Graph[Double, Double] = { require(numIter > 0, s"Number of iterations must be greater than 0," + s" but got ${numIter}") require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + @@ -179,8 +203,13 @@ object PageRank extends Logging { iteration += 1 } - // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks - normalizeRankSum(rankGraph, personalized) + if (normalized) { + // SPARK-18847 If the graph has sinks (vertices with no outgoing edges), + // correct the sum of ranks + normalizeRankSum(rankGraph, personalized) + } else { + rankGraph + } } /** @@ -204,6 +233,34 @@ object PageRank extends Logging { def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId], preRankGraph: Graph[Double, Double]): Graph[Double, Double] = { + runWithOptionsWithPreviousPageRank( + graph, numIter, resetProb, srcId, normalized = true, preRankGraph + ) + } + + /** + * Run PageRank for a fixed number of iterations returning a graph + * with vertex attributes containing the PageRank and edge + * attributes the normalized edge weight. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param numIter the number of iterations of PageRank to run + * @param resetProb the random reset probability (alpha) + * @param srcId the source vertex for a Personalized Page Rank (optional) + * @param normalized whether or not to normalize rank sum + * @param preRankGraph PageRank graph from which to keep iterating + * + * @return the graph containing with each vertex containing the PageRank and each edge + * containing the normalized weight. + * + * @since 3.2.0 + */ + def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId], + normalized: Boolean, preRankGraph: Graph[Double, Double]): Graph[Double, Double] = { require(numIter > 0, s"Number of iterations must be greater than 0," + s" but got ${numIter}") require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + @@ -238,8 +295,13 @@ object PageRank extends Logging { iteration += 1 } - // SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks - normalizeRankSum(rankGraph, personalized) + if (normalized) { + // SPARK-18847 If the graph has sinks (vertices with no outgoing edges), + // correct the sum of ranks + normalizeRankSum(rankGraph, personalized) + } else { + rankGraph + } } /** diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 8008a89c6c..caa2fdcdf5 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -233,7 +233,37 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { assert(totalIters == 19) assert(iterAfterHalfCheckPoint == 18) } - } // end of Grid PageRank + } // end of Grid PageRank with checkpoint + + test("Grid PageRank with checkpoint without intermediate normalization") { + withSpark { sc => + // Check that 6 iterations in a row are equivalent + // to 3 times 2 iterations without intermediate normalization + val rows = 10 + val cols = 10 + val resetProb = 0.15 + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + + val ranksA: Array[(VertexId, Double)] = PageRank.runWithOptions( + gridGraph, numIter = 6, resetProb, srcId = None, normalized = true + ).vertices.collect() + + val preRankGraph1 = PageRank.runWithOptions( + gridGraph, numIter = 2, resetProb, srcId = None, normalized = false + ) + + val preRankGraph2 = PageRank.runWithOptionsWithPreviousPageRank( + gridGraph, numIter = 2, resetProb, srcId = None, normalized = false, preRankGraph1 + ) + + val ranksB: Array[(VertexId, Double)] = PageRank.runWithOptionsWithPreviousPageRank( + gridGraph, numIter = 2, resetProb, srcId = None, normalized = true, preRankGraph2 + ).vertices.collect() + + // assert that all scores are equal + assert(ranksA.zip(ranksB).forall { case(rankA, rankB) => rankA == rankB }) + } + } // end of Grid PageRank with checkpoint without intermediate normalization test("Chain PageRank") { withSpark { sc =>