Merge pull request #34 from jegonzal/AnalyticsCleanup

Analytics Cleanup
This commit is contained in:
Reynold Xin 2013-10-24 11:09:46 -07:00
commit 6f82c42690
4 changed files with 319 additions and 40 deletions

View file

@ -11,43 +11,99 @@ object Analytics extends Logging {
*/
def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
numIter: Int,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => (deg.getOrElse(0), 1.0)
}
resetProb: Double = 0.15): Graph[Double, Double] = {
/**
* Initialize the pagerankGraph with each edge attribute
* having weight 1/outDegree and each vertex with attribute 1.0.
*/
val pagerankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => 1.0 )
// Display statistics about pagerank
println(pagerankGraph.statistics)
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(
(vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply
(me_id, edge) => Some(edge.srcAttr._2 / edge.srcAttr._1), // gather
(a: Double, b: Double) => a + b, // merge
1.0,
numIter).mapVertices{ case (id, (outDeg, r)) => r }
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
Some(edge.srcAttr * edge.attr)
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 0.0
// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
vertexProgram, sendMessage, messageCombiner)
}
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
tol: Float,
maxIter: Int = Integer.MAX_VALUE,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
(id, data, degIter) => (degIter.sum, 1.0, 1.0)
}
def deltaPagerank[VD: Manifest, ED: Manifest](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
// Run PageRank
GraphLab.iterate(pagerankGraph)(
(me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather
(a: Double, b: Double) => a + b,
(id, data, a: Option[Double]) =>
(data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
(me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter
maxIter).mapVertices { case (vid, data) => data._2 }
/**
* Initialize the pagerankGraph with each edge attribute
* having weight 1/outDegree and each vertex with attribute 1.0.
*/
val pagerankGraph: Graph[(Double, Double), Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
.mapVertices( (id, attr) => (0.0, 0.0) )
// Display statistics about pagerank
println(pagerankGraph.statistics)
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
}
def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = {
if (edge.srcAttr._2 > tol) {
Some(edge.srcAttr._2 * edge.attr)
} else { None }
}
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = resetProb / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
Pregel(pagerankGraph, initialMessage)(
vertexProgram, sendMessage, messageCombiner)
.mapVertices( (vid, attr) => attr._1 )
// // Compute the out degree of each vertex
// val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
// (id, data, degIter) => (degIter.sum, 1.0, 1.0)
// }
// // Run PageRank
// GraphLab.iterate(pagerankGraph)(
// (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather
// (a: Double, b: Double) => a + b,
// (id, data, a: Option[Double]) =>
// (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
// (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter
// maxIter).mapVertices { case (vid, data) => data._2 }
}

View file

@ -10,6 +10,8 @@ import org.apache.spark.rdd.RDD
object Pregel {
/**
* Execute the Pregel program.
*
@ -34,23 +36,19 @@ object Pregel {
* @return the resulting graph at the end of the computation
*
*/
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A,
initialMsg: A,
numIter: Int)
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
var g = graph
//var g = graph.cache()
var i = 0
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
// Receive the first set of messages
g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
var i = 0
while (i < numIter) {
// compute the messages
val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
@ -61,5 +59,79 @@ object Pregel {
}
// Return the final graph
g
} // end of apply
/**
* Execute the Pregel program.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param vprog a user supplied function that acts as the vertex program for
* the Pregel computation. It takes the vertex ID of the vertex it is running on,
* the accompanying data for that vertex, and the incoming data and returns the
* new vertex value.
* @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
* between the vertex and one of its neighbors and produces a message to send
* to that neighbor.
* @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
* them into a single message of type A. ''This function must be commutative and
* associative.''
* @param initialMsg the message each vertex will receive at the beginning of the
* first iteration.
* @param numIter the number of iterations to run this computation for.
*
* @return the resulting graph at the end of the computation
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = {
msgOpt match {
case Some(msg) => (vprog(id, attr._1, msg), true)
case None => (attr._1, false)
}
}
}
def sendMsgFun(vid: Vid, edge: EdgeTriplet[(VD,Boolean), ED]): Option[A] = {
if(edge.srcAttr._2) {
val et = new EdgeTriplet[VD, ED]
et.srcId = edge.srcId
et.srcAttr = edge.srcAttr._1
et.dstId = edge.dstId
et.dstAttr = edge.dstAttr._1
et.attr = edge.attr
sendMsg(edge.otherVertexId(vid), et)
} else { None }
}
var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )
// compute the messages
var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache
var activeMessages = messages.count
// Loop
var i = 0
while (activeMessages > 0) {
// receive the messages
g = g.outerJoinVertices(messages)(vprogFun)
val oldMessages = messages
// compute the messages
messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, EdgeDirection.In).cache
activeMessages = messages.count
// after counting we can unpersist the old messages
oldMessages.unpersist(blocking=false)
// count the iteration
i += 1
}
// Return the final graph
g.mapVertices((id, attr) => attr._1)
} // end of apply
} // end of class Pregel

View file

@ -236,7 +236,51 @@ object GraphGenerators {
}
}
}
/**
* Create `rows` by `cols` grid graph with each vertex connected to its
* row+1 and col+1 neighbors. Vertex ids are assigned in row major
* order.
*
* @param sc the spark context in which to construct the graph
* @param rows the number of rows
* @param cols the number of columns
*
* @return A graph containing vertices with the row and column ids
* as their attributes and edge values as 1.0.
*/
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Vid = r * cols + c
val vertices: RDD[(Vid, (Int,Int))] =
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
(if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++
(if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty })
}.map{ case (src, dst) => Edge(src, dst, 1.0) }
Graph(vertices, edges)
} // end of gridGraph
/**
* Create a star graph with vertex 0 being the center.
*
* @param sc the spark context in which to construct the graph
* @param the number of vertices in the star
*
* @return A star graph containing `nverts` vertices with vertex 0
* being the center vertex.
*/
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph(edges, false)
} // end of starGraph
} // end of Graph Generators

View file

@ -0,0 +1,107 @@
package org.apache.spark.graph
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graph.LocalSparkContext._
import org.apache.spark.graph.util.GraphGenerators
object GridPageRank {
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
val outDegree = Array.fill(nRows * nCols)(0)
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Int = r * nCols + c
// Make the grid graph
for(r <- 0 until nRows; c <- 0 until nCols){
val ind = sub2ind(r,c)
if(r+1 < nRows) {
outDegree(ind) += 1
inNbrs(sub2ind(r+1,c)) += ind
}
if(c+1 < nCols) {
outDegree(ind) += 1
inNbrs(sub2ind(r,c+1)) += ind
}
}
// compute the pagerank
var pr = Array.fill(nRows * nCols)(resetProb)
for(iter <- 0 until nIter) {
val oldPr = pr
pr = new Array[Double](nRows * nCols)
for(ind <- 0 until (nRows * nCols)) {
pr(ind) = resetProb + (1.0 - resetProb) *
inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
}
}
(0L until (nRows * nCols)).zip(pr)
}
}
class AnalyticsSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
test("Star PageRank") {
withSpark(new SparkContext("local", "test")) { sc =>
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices)
val resetProb = 0.15
val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb)
val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb)
val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices)
.map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum
assert(notMatching === 0)
prGraph2.vertices.foreach(println(_))
val errors = prGraph2.vertices.map{ case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) ||
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
if ( !correct ) { 1 } else { 0 }
}
assert(errors.sum === 0)
val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb)
val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices).map{
case (_, (pr1, Some(pr2))) if(pr1 == pr2) => 0
case _ => 1
}.sum
assert(errors2 === 0)
}
} // end of test Star PageRank
test("Grid PageRank") {
withSpark(new SparkContext("local", "test")) { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
val resetProb = 0.15
val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache()
val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache()
val error = prGraph1.vertices.zipJoin(prGraph2.vertices).map {
case (id, (a, b)) => (a - b) * (a - b)
}.sum
prGraph1.vertices.zipJoin(prGraph2.vertices)
.map{ case (id, (a,b)) => (id, (a,b, a-b))}.foreach(println(_))
println(error)
assert(error < 1.0e-5)
val pr3 = sc.parallelize(GridPageRank(10,10, 50, resetProb))
val error2 = prGraph1.vertices.leftJoin(pr3).map {
case (id, (a, Some(b))) => (a - b) * (a - b)
case _ => 0
}.sum
prGraph1.vertices.leftJoin(pr3).foreach(println( _ ))
println(error2)
assert(error2 < 1.0e-5)
}
} // end of Grid PageRank
} // end of AnalyticsSuite