[SPARK-5854] personalized page rank

Here's a modification to PageRank which does personalized PageRank.  The approach is basically similar to that outlined by Bahmani et al. from 2010 (http://arxiv.org/pdf/1006.2880.pdf).

I'm sure this needs tuning up or other considerations, so let me know how I can improve this.

Author: Dan McClary <dan.mcclary@gmail.com>
Author: dwmclary <dan.mcclary@gmail.com>

Closes #4774 from dwmclary/SPARK-5854-Personalized-PageRank and squashes the following commits:

8b907db [dwmclary] fixed scalastyle errors in PageRankSuite
2c20e5d [dwmclary] merged with upstream master
d6cebac [dwmclary] updated as per style requests
7d00c23 [Dan McClary] fixed line overrun in personalizedVertexPageRank
d711677 [Dan McClary] updated vertexProgram to restore binary compatibility for inner method
bb8d507 [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
fba0edd [Dan McClary] fixed silly mistakes
de51be2 [Dan McClary] cleaned up whitespace between comments and methods
0c30d0c [Dan McClary] updated to maintain binary compatibility
aaf0b4b [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
76773f6 [Dan McClary] Merge branch 'master' of https://github.com/apache/spark into SPARK-5854-Personalized-PageRank
44ada8e [Dan McClary] updated tolerance on chain PPR
1ffed95 [Dan McClary] updated tolerance on chain PPR
b67ac69 [Dan McClary] updated tolerance on chain PPR
a560942 [Dan McClary] rolled PPR into pregel code for PageRank
6dc2c29 [Dan McClary] initial implementation of personalized page rank
This commit is contained in:
Dan McClary 2015-05-01 11:55:43 -07:00 committed by Joseph Gonzalez
parent 27de6fef6a
commit 7d427222dc
3 changed files with 159 additions and 6 deletions

View file

@ -372,6 +372,31 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
PageRank.runUntilConvergence(graph, tol, resetProb)
}
/**
* Run personalized PageRank for a given vertex, such that all random walks
* are started relative to the source node.
*
* @see [[org.apache.spark.graphx.lib.PageRank$#runUntilConvergenceWithOptions]]
*/
def personalizedPageRank(src: VertexId, tol: Double,
resetProb: Double = 0.15) : Graph[Double, Double] = {
PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src))
}
/**
* Run Personalized PageRank for a fixed number of iterations with
* with all iterations originating at the source node
* returning a graph with vertex attributes
* containing the PageRank and edge attributes the normalized edge weight.
*
* @see [[org.apache.spark.graphx.lib.PageRank$#runWithOptions]]
*/
def staticPersonalizedPageRank(src: VertexId, numIter: Int,
resetProb: Double = 0.15) : Graph[Double, Double] = {
PageRank.runWithOptions(graph, numIter, resetProb, Some(src))
}
/**
* Run PageRank for a fixed number of iterations returning a graph with vertex attributes
* containing the PageRank and edge attributes the normalized edge weight.

View file

@ -18,6 +18,7 @@
package org.apache.spark.graphx.lib
import scala.reflect.ClassTag
import scala.language.postfixOps
import org.apache.spark.Logging
import org.apache.spark.graphx._
@ -60,6 +61,7 @@ import org.apache.spark.graphx._
*/
object PageRank extends Logging {
/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
@ -74,10 +76,33 @@ object PageRank extends Logging {
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
resetProb: Double = 0.15): Graph[Double, Double] =
{
runWithOptions(graph, numIter, resetProb)
}
/**
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
* @param srcId the source vertex for a Personalized Page Rank (optional)
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*
*/
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
def runWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute 1.0.
@ -89,6 +114,10 @@ object PageRank extends Logging {
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => resetProb )
val personalized = srcId isDefined
val src: VertexId = srcId.getOrElse(-1L)
def delta(u: VertexId, v: VertexId):Double = { if (u == v) 1.0 else 0.0 }
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
@ -103,8 +132,14 @@ object PageRank extends Logging {
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
val rPrb = if (personalized) {
(src: VertexId ,id: VertexId) => resetProb * delta(src,id)
} else {
(src: VertexId, id: VertexId) => resetProb
}
rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
(id, oldRank, msgSum) => rPrb(src,id) + (1.0 - resetProb) * msgSum
}.cache()
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
@ -133,7 +168,29 @@ object PageRank extends Logging {
* containing the normalized weight.
*/
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
{
runUntilConvergenceWithOptions(graph, tol, resetProb)
}
/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param tol the tolerance allowed at convergence (smaller => more accurate).
* @param resetProb the random reset probability (alpha)
* @param srcId the source vertex for a Personalized Page Rank (optional)
*
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*/
def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double] =
{
// Initialize the pagerankGraph with each edge attribute
// having weight 1/outDegree and each vertex with attribute 1.0.
@ -148,6 +205,10 @@ object PageRank extends Logging {
.mapVertices( (id, attr) => (0.0, 0.0) )
.cache()
val personalized = srcId.isDefined
val src: VertexId = srcId.getOrElse(-1L)
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
@ -156,7 +217,18 @@ object PageRank extends Logging {
(newPR, newPR - oldPR)
}
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]): Iterator[(VertexId, Double)] = {
def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
var teleport = oldPR
val delta = if (src==id) 1.0 else 0.0
teleport = oldPR*delta
val newPR = teleport + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
}
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
if (edge.srcAttr._2 > tol) {
Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
} else {
@ -170,8 +242,17 @@ object PageRank extends Logging {
val initialMessage = resetProb / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
val vp = if (personalized) {
(id: VertexId, attr: (Double, Double),msgSum: Double) =>
personalizedVertexProgram(id, attr, msgSum)
} else {
(id: VertexId, attr: (Double, Double), msgSum: Double) =>
vertexProgram(id, attr, msgSum)
}
Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
vertexProgram, sendMessage, messageCombiner)
vp, sendMessage, messageCombiner)
.mapVertices((vid, attr) => attr._1)
} // end of deltaPageRank
}

View file

@ -92,6 +92,36 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
}
} // end of test Star PageRank
test("Star PersonalPageRank") {
withSpark { sc =>
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
val errorTol = 1.0e-5
val staticRanks1 = starGraph.staticPersonalizedPageRank(0,numIter = 1, resetProb).vertices
val staticRanks2 = starGraph.staticPersonalizedPageRank(0,numIter = 2, resetProb)
.vertices.cache()
// Static PageRank should only take 2 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum
assert(notMatching === 0)
val staticErrors = staticRanks2.map { case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) ||
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb *
(nVertices - 1)) )) < 1.0E-5)
if (!correct) 1 else 0
}
assert(staticErrors.sum === 0)
val dynamicRanks = starGraph.personalizedPageRank(0,0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
}
} // end of test Star PageRank
test("Grid PageRank") {
withSpark { sc =>
val rows = 10
@ -128,4 +158,21 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
}
}
test("Chain PersonalizedPageRank") {
withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x + 1) )
val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
val resetProb = 0.15
val tol = 0.0001
val numIter = 10
val errorTol = 1.0e-1
val staticRanks = chain.staticPersonalizedPageRank(4, numIter, resetProb).vertices
val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
}
}
}