added dynamic graphlab

This commit is contained in:
Joseph E. Gonzalez 2013-04-04 19:18:15 -07:00
parent 4a2b8aa557
commit b537613570
2 changed files with 57 additions and 66 deletions

View file

@ -23,6 +23,7 @@ import spark.SparkContext._
object Analytics { object Analytics {
/** /**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/ */
@ -31,15 +32,36 @@ object Analytics {
val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees,
(vertex, degIter) => (degIter.sum, 1.0F) (vertex, degIter) => (degIter.sum, 1.0F)
) )
GraphLab.iterateGAS[(Int, Float), ED, Float](pagerankGraph)( GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Float, b: Float) => a + b, // merge (a: Float, b: Float) => a + b, // merge
0F, (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply
(vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
numIter).vertices.map{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
} }
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
def dynamicPageRank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
tol: Float, maxIter: Int = 10) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees,
(vertex, degIter) => (degIter.sum, 1.0F, 1.0F)
)
// Run PageRank
GraphLab.iterateGAS(pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.dst.data._1, // gather
(a: Float, b: Float) => a + b,
(vertex, a: Option[Float]) =>
(vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply
(me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter
maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) }
}
/** /**
* Compute the connected component membership of each vertex * Compute the connected component membership of each vertex
* and return an RDD with the vertex value containing the * and return an RDD with the vertex value containing the
@ -48,13 +70,12 @@ object Analytics {
*/ */
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) } val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) }
GraphLab.iterateGAS[Int, ED, Int](ccGraph)( GraphLab.iterateGA[Int, ED, Int](ccGraph)(
(me_id, edge) => edge.otherVertex(me_id).data, // gather (me_id, edge) => edge.otherVertex(me_id).data, // gather
(a: Int, b: Int) => math.min(a, b), // merge (a: Int, b: Int) => math.min(a, b), // merge
Integer.MAX_VALUE, (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply
(v, a: Int) => math.min(v.data, a), // apply
numIter, numIter,
gatherDirection = EdgeDirection.Both).vertices gatherDirection = EdgeDirection.Both)
} }
@ -66,47 +87,17 @@ object Analytics {
val spGraph = graph.mapVertices { val spGraph = graph.mapVertices {
case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue)) case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue))
} }
GraphLab.iterateGAS[Float, Float, Float](spGraph)( GraphLab.iterateGA[Float, Float, Float](spGraph)(
(me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
(a: Float, b: Float) => math.min(a, b), // merge (a: Float, b: Float) => math.min(a, b), // merge
Float.MaxValue, (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply
(v, a: Float) => math.min(v.data, a), // apply
numIter, numIter,
gatherDirection = EdgeDirection.In).vertices gatherDirection = EdgeDirection.In)
} }
// /**
// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
// */
// def dynamicPageRank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
// tol: Float, maxIter: Int = 10) = {
// graph.edges.cache
// // Compute the out degree of each vertex
// val outDegree = graph.edges.map { case (src, target, data) => (src, 1)}.reduceByKey(_ + _)
// val vertices = graph.vertices.leftOuterJoin(outDegree).mapValues {
// case (_, Some(deg)) => (deg, 1.0F, 1.0F)
// case (_, None) => (0, 1.0F, 1.0F)
// }.cache
// val edges = graph.edges
// val pageRankGraph = new Graph(vertices, edges)
// pageRankGraph.numVPart = graph.numVPart
// pageRankGraph.numEPart = graph.numEPart
// // Run PageRank
// pageRankGraph.iterateDynamic(
// (me_id, edge) => edge.source.data._2 / edge.source.data._1, // gather
// (a: Float, b: Float) => a + b, // merge
// 0F,
// (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a), vertex.data._2), // apply
// (me_id, edge) => math.abs(edge.source.data._2 - edge.source.data._1) > tol, // scatter
// maxIter).vertices.mapValues { case (degree, rank, oldRank) => rank }
// // println("Computed graph: #edges: " + graph_ret.numEdges + " #vertices" + graph_ret.numVertices)
// // graph_ret.vertices.take(10).foreach(println)
// }
@ -288,10 +279,10 @@ object Analytics {
val pr = Analytics.pagerank(graph, numIter) val pr = Analytics.pagerank(graph, numIter)
// val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// else Analytics.pagerank(graph, numIter) // else Analytics.pagerank(graph, numIter)
println("Total rank: " + pr.map{ case Vertex(id,r) => r }.reduce(_+_) ) println("Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
if(!outFname.isEmpty) { if(!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname) println("Saving pageranks of pages to " + outFname)
pr.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
} }
println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop() sc.stop()
@ -325,7 +316,7 @@ object Analytics {
val cc = Analytics.connectedComponents(graph, numIter) val cc = Analytics.connectedComponents(graph, numIter)
// val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter) // else Analytics.connectedComponents(graph, numIter)
println("Components: " + cc.map(_.data).distinct()) println("Components: " + cc.vertices.map(_.data).distinct())
sc.stop() sc.stop()
} }
@ -368,7 +359,7 @@ object Analytics {
val sp = Analytics.shortestPath(graph, sources, numIter) val sp = Analytics.shortestPath(graph, sources, numIter)
// val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
// else Analytics.shortestPath(graph, sources, numIter) // else Analytics.shortestPath(graph, sources, numIter)
println("Longest Path: " + sp.map(_.data).reduce(math.max(_,_))) println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
sc.stop() sc.stop()
} }

View file

@ -6,35 +6,35 @@ import spark.RDD
object GraphLab { object GraphLab {
def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( // def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])( // rawGraph: Graph[VD, ED])(
gather: (Vid, EdgeWithVertices[VD, ED]) => A, // gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A, // merge: (A, A) => A,
default: A, // default: A,
apply: (Vertex[VD], A) => VD, // apply: (Vertex[VD], A) => VD,
numIter: Int, // numIter: Int,
gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { // gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = {
var graph = rawGraph.cache() // var graph = rawGraph.cache()
var i = 0 // var i = 0
while (i < numIter) { // while (i < numIter) {
val accUpdates: RDD[(Vid, A)] = // val accUpdates: RDD[(Vid, A)] =
graph.mapReduceNeighborhood(gather, merge, default, gatherDirection) // graph.mapReduceNeighborhood(gather, merge, default, gatherDirection)
def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } // def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) }
graph = graph.updateVertices(accUpdates, applyFunc).cache() // graph = graph.updateVertices(accUpdates, applyFunc).cache()
i += 1 // i += 1
} // }
graph // graph
} // }
def iterateGASOption[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])( rawGraph: Graph[VD, ED])(
gather: (Vid, EdgeWithVertices[VD, ED]) => A, gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A, merge: (A, A) => A,
@ -60,7 +60,7 @@ object GraphLab {
def iterateDynamic[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
rawGraph: Graph[VD, ED])( rawGraph: Graph[VD, ED])(
rawGather: (Vid, EdgeWithVertices[VD, ED]) => A, rawGather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A, merge: (A, A) => A,