Updating analytics to reflect changes in the pregel interface and moving degree information into the edge attribute.

This commit is contained in:
Joseph E. Gonzalez 2013-10-22 15:03:00 -07:00
parent 46b195253e
commit ba5c75692a

View file

@ -11,43 +11,99 @@ object Analytics extends Logging {
*/ */
def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
numIter: Int, numIter: Int,
resetProb: Double = 0.15) = { resetProb: Double = 0.15): Graph[Double, Double] = {
// Compute the out degree of each vertex
val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => (deg.getOrElse(0), 1.0)
}
/**
* Initialize the pagerankGraph with each edge attribute
* having weight 1/outDegree and each vertex with attribute 1.0.
*/
val pagerankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => 1.0 )
// Display statistics about pagerank
println(pagerankGraph.statistics) println(pagerankGraph.statistics)
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( // Define the three functions needed to implement PageRank in the GraphX
(vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply // version of Pregel
(me_id, edge) => Some(edge.srcAttr._2 / edge.srcAttr._1), // gather def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
(a: Double, b: Double) => a + b, // merge resetProb + (1.0 - resetProb) * msgSum
1.0, def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
numIter).mapVertices{ case (id, (outDeg, r)) => r } Some(edge.srcAttr / edge.attr)
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 1.0
// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
vertexProgram, sendMessage, messageCombiner)
} }
/** /**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/ */
def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], def dynamicPagerank[VD: Manifest, ED: Manifest](
tol: Float, graph: Graph[VD, ED], tol: Float, resetProb: Double = 0.15): Graph[Double, Double] = {
maxIter: Int = Integer.MAX_VALUE,
resetProb: Double = 0.15) = { /**
// Compute the out degree of each vertex * Initialize the pagerankGraph with each edge attribute
val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ * having weight 1/outDegree and each vertex with attribute 1.0.
(id, data, degIter) => (degIter.sum, 1.0, 1.0) */
val pagerankGraph: Graph[(Double, Double), Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
.mapVertices( (id, attr) => (resetProb, 0.0) )
// Display statistics about pagerank
println(pagerankGraph.statistics)
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
} }
def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = {
if (edge.srcAttr._2 > tol) {
Some(edge.srcAttr._2 / edge.attr)
} else { None }
}
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 1.0 / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
Pregel(pagerankGraph, initialMessage)(
vertexProgram, sendMessage, messageCombiner)
.mapVertices( (vid, attr) => attr._1 )
// // Compute the out degree of each vertex
// val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
// (id, data, degIter) => (degIter.sum, 1.0, 1.0)
// }
// Run PageRank // // Run PageRank
GraphLab.iterate(pagerankGraph)( // GraphLab.iterate(pagerankGraph)(
(me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather // (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather
(a: Double, b: Double) => a + b, // (a: Double, b: Double) => a + b,
(id, data, a: Option[Double]) => // (id, data, a: Option[Double]) =>
(data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply // (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
(me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter // (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter
maxIter).mapVertices { case (vid, data) => data._2 } // maxIter).mapVertices { case (vid, data) => data._2 }
} }