From b53761357019b1c0ce3dd3094876dcee702aff37 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 4 Apr 2013 19:18:15 -0700 Subject: [PATCH] added dynamic graphlab --- .../main/scala/spark/graph/Analytics.scala | 81 +++++++++---------- .../src/main/scala/spark/graph/GraphLab.scala | 42 +++++----- 2 files changed, 57 insertions(+), 66 deletions(-) diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index 27b8e371d4..40f378fbc2 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -23,6 +23,7 @@ import spark.SparkContext._ object Analytics { + /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ @@ -31,15 +32,36 @@ object Analytics { val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, (vertex, degIter) => (degIter.sum, 1.0F) ) - GraphLab.iterateGAS[(Int, Float), ED, Float](pagerankGraph)( + GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)( (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather (a: Float, b: Float) => a + b, // merge - 0F, - (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply - numIter).vertices.map{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } + (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply + numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } } + /** + * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD + */ + def dynamicPageRank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], + tol: Float, maxIter: Int = 10) = { + // Compute the out degree of each vertex + val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees, + (vertex, degIter) => (degIter.sum, 1.0F, 1.0F) + ) + + // Run PageRank + GraphLab.iterateGAS(pagerankGraph)( + (me_id, edge) => edge.src.data._2 / edge.dst.data._1, // gather + (a: Float, b: Float) => a + b, + (vertex, a: Option[Float]) => + (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply + (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter + maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) } + } + + + /** * Compute the connected component membership of each vertex * and return an RDD with the vertex value containing the @@ -48,13 +70,12 @@ object Analytics { */ def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) } - GraphLab.iterateGAS[Int, ED, Int](ccGraph)( + GraphLab.iterateGA[Int, ED, Int](ccGraph)( (me_id, edge) => edge.otherVertex(me_id).data, // gather (a: Int, b: Int) => math.min(a, b), // merge - Integer.MAX_VALUE, - (v, a: Int) => math.min(v.data, a), // apply + (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply numIter, - gatherDirection = EdgeDirection.Both).vertices + gatherDirection = EdgeDirection.Both) } @@ -66,47 +87,17 @@ object Analytics { val spGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue)) } - GraphLab.iterateGAS[Float, Float, Float](spGraph)( + GraphLab.iterateGA[Float, Float, Float](spGraph)( (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather (a: Float, b: Float) => math.min(a, b), // merge - Float.MaxValue, - (v, a: Float) => math.min(v.data, a), // apply + (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply numIter, - gatherDirection = EdgeDirection.In).vertices + gatherDirection = EdgeDirection.In) } - // /** - // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD - // */ - // def dynamicPageRank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], - // tol: Float, maxIter: Int = 10) = { - // graph.edges.cache - // // Compute the out degree of each vertex - // val outDegree = graph.edges.map { case (src, target, data) => (src, 1)}.reduceByKey(_ + _) - // val vertices = graph.vertices.leftOuterJoin(outDegree).mapValues { - // case (_, Some(deg)) => (deg, 1.0F, 1.0F) - // case (_, None) => (0, 1.0F, 1.0F) - // }.cache - - // val edges = graph.edges - // val pageRankGraph = new Graph(vertices, edges) - // pageRankGraph.numVPart = graph.numVPart - // pageRankGraph.numEPart = graph.numEPart - - // // Run PageRank - // pageRankGraph.iterateDynamic( - // (me_id, edge) => edge.source.data._2 / edge.source.data._1, // gather - // (a: Float, b: Float) => a + b, // merge - // 0F, - // (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a), vertex.data._2), // apply - // (me_id, edge) => math.abs(edge.source.data._2 - edge.source.data._1) > tol, // scatter - // maxIter).vertices.mapValues { case (degree, rank, oldRank) => rank } - // // println("Computed graph: #edges: " + graph_ret.numEdges + " #vertices" + graph_ret.numVertices) - // // graph_ret.vertices.take(10).foreach(println) - // } @@ -288,10 +279,10 @@ object Analytics { val pr = Analytics.pagerank(graph, numIter) // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) // else Analytics.pagerank(graph, numIter) - println("Total rank: " + pr.map{ case Vertex(id,r) => r }.reduce(_+_) ) + println("Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) ) if(!outFname.isEmpty) { println("Saving pageranks of pages to " + outFname) - pr.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) + pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) } println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") sc.stop() @@ -325,7 +316,7 @@ object Analytics { val cc = Analytics.connectedComponents(graph, numIter) // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) // else Analytics.connectedComponents(graph, numIter) - println("Components: " + cc.map(_.data).distinct()) + println("Components: " + cc.vertices.map(_.data).distinct()) sc.stop() } @@ -368,7 +359,7 @@ object Analytics { val sp = Analytics.shortestPath(graph, sources, numIter) // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) // else Analytics.shortestPath(graph, sources, numIter) - println("Longest Path: " + sp.map(_.data).reduce(math.max(_,_))) + println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) sc.stop() } diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index d4fdb76040..fa69e1ad9b 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -6,35 +6,35 @@ import spark.RDD object GraphLab { - def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( - rawGraph: Graph[VD, ED])( - gather: (Vid, EdgeWithVertices[VD, ED]) => A, - merge: (A, A) => A, - default: A, - apply: (Vertex[VD], A) => VD, - numIter: Int, - gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { + // def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( + // rawGraph: Graph[VD, ED])( + // gather: (Vid, EdgeWithVertices[VD, ED]) => A, + // merge: (A, A) => A, + // default: A, + // apply: (Vertex[VD], A) => VD, + // numIter: Int, + // gatherDirection: EdgeDirection.EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { - var graph = rawGraph.cache() + // var graph = rawGraph.cache() - var i = 0 - while (i < numIter) { + // var i = 0 + // while (i < numIter) { - val accUpdates: RDD[(Vid, A)] = - graph.mapReduceNeighborhood(gather, merge, default, gatherDirection) + // val accUpdates: RDD[(Vid, A)] = + // graph.mapReduceNeighborhood(gather, merge, default, gatherDirection) - def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } - graph = graph.updateVertices(accUpdates, applyFunc).cache() + // def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } + // graph = graph.updateVertices(accUpdates, applyFunc).cache() - i += 1 - } - graph - } + // i += 1 + // } + // graph + // } - def iterateGASOption[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( + def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( rawGraph: Graph[VD, ED])( gather: (Vid, EdgeWithVertices[VD, ED]) => A, merge: (A, A) => A, @@ -60,7 +60,7 @@ object GraphLab { - def iterateDynamic[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( + def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( rawGraph: Graph[VD, ED])( rawGather: (Vid, EdgeWithVertices[VD, ED]) => A, merge: (A, A) => A,