[SPARK-35357][GRAPHX] Allow to turn off the normalization applied by static PageRank utilities

### What changes were proposed in this pull request?

Overload methods `PageRank.runWithOptions` and  `PageRank.runWithOptionsWithPreviousPageRank` (not to break any user-facing signature) with a `normalized` parameter that describes "whether or not to normalize the rank sum".

### Why are the changes needed?

https://issues.apache.org/jira/browse/SPARK-35357

When dealing with a non negligible proportion of sinks in a graph, algorithm based on incremental update of ranks can get a **precision gain for free** if they are allowed to manipulate non normalized ranks.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

By adding a unit test that verifies that (even when dealing with a graph containing a sink) we end up with the same result for both these scenarios:
a)
  - Run **6 iterations** of pagerank in a row using `PageRank.runWithOptions` with **normalization enabled**

b)
  - Run **2 iterations** using `PageRank.runWithOptions` with **normalization disabled**
  - Resume from the `preRankGraph1` and run **2 more iterations** using `PageRank.runWithOptionsWithPreviousPageRank` with **normalization disabled**
  - Finally resume from the `preRankGraph2` and run **2 more iterations** using `PageRank.runWithOptionsWithPreviousPageRank` with **normalization enabled**

Closes #32485 from bonnal-enzo/make-pagerank-normalization-optional.

Authored-by: Enzo Bonnal <enzobonnal@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
Enzo Bonnal 2021-05-12 08:56:22 -05:00 committed by Sean Owen
parent ed059541eb
commit 402375b59e
2 changed files with 99 additions and 7 deletions

View file

@ -140,8 +140,32 @@ object PageRank extends Logging {
*/
def runWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
srcId: Option[VertexId] = None): Graph[Double, Double] = {
runWithOptions(graph, numIter, resetProb, srcId, normalized = true)
}
/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
* @param srcId the source vertex for a Personalized Page Rank (optional)
* @param normalized whether or not to normalize rank sum
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*
* @since 3.2.0
*/
def runWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double,
srcId: Option[VertexId], normalized: Boolean): Graph[Double, Double] = {
require(numIter > 0, s"Number of iterations must be greater than 0," +
s" but got ${numIter}")
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
@ -179,8 +203,13 @@ object PageRank extends Logging {
iteration += 1
}
// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
if (normalized) {
// SPARK-18847 If the graph has sinks (vertices with no outgoing edges),
// correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
} else {
rankGraph
}
}
/**
@ -204,6 +233,34 @@ object PageRank extends Logging {
def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId],
preRankGraph: Graph[Double, Double]): Graph[Double, Double] = {
runWithOptionsWithPreviousPageRank(
graph, numIter, resetProb, srcId, normalized = true, preRankGraph
)
}
/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
* @param srcId the source vertex for a Personalized Page Rank (optional)
* @param normalized whether or not to normalize rank sum
* @param preRankGraph PageRank graph from which to keep iterating
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*
* @since 3.2.0
*/
def runWithOptionsWithPreviousPageRank[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double, srcId: Option[VertexId],
normalized: Boolean, preRankGraph: Graph[Double, Double]): Graph[Double, Double] = {
require(numIter > 0, s"Number of iterations must be greater than 0," +
s" but got ${numIter}")
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
@ -238,8 +295,13 @@ object PageRank extends Logging {
iteration += 1
}
// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
if (normalized) {
// SPARK-18847 If the graph has sinks (vertices with no outgoing edges),
// correct the sum of ranks
normalizeRankSum(rankGraph, personalized)
} else {
rankGraph
}
}
/**

View file

@ -233,7 +233,37 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(totalIters == 19)
assert(iterAfterHalfCheckPoint == 18)
}
} // end of Grid PageRank
} // end of Grid PageRank with checkpoint
test("Grid PageRank with checkpoint without intermediate normalization") {
withSpark { sc =>
// Check that 6 iterations in a row are equivalent
// to 3 times 2 iterations without intermediate normalization
val rows = 10
val cols = 10
val resetProb = 0.15
val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
val ranksA: Array[(VertexId, Double)] = PageRank.runWithOptions(
gridGraph, numIter = 6, resetProb, srcId = None, normalized = true
).vertices.collect()
val preRankGraph1 = PageRank.runWithOptions(
gridGraph, numIter = 2, resetProb, srcId = None, normalized = false
)
val preRankGraph2 = PageRank.runWithOptionsWithPreviousPageRank(
gridGraph, numIter = 2, resetProb, srcId = None, normalized = false, preRankGraph1
)
val ranksB: Array[(VertexId, Double)] = PageRank.runWithOptionsWithPreviousPageRank(
gridGraph, numIter = 2, resetProb, srcId = None, normalized = true, preRankGraph2
).vertices.collect()
// assert that all scores are equal
assert(ranksA.zip(ranksB).forall { case(rankA, rankB) => rankA == rankB })
}
} // end of Grid PageRank with checkpoint without intermediate normalization
test("Chain PageRank") {
withSpark { sc =>