From 91e1227edb42d35a6a811aa7768c301a1c022f48 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Wed, 3 Apr 2013 23:04:29 -0700 Subject: [PATCH] completed port of several analytics packages as well as analytics main --- .../main/scala/spark/graph/Analytics.scala | 134 ++++++++---------- graph/src/main/scala/spark/graph/Graph.scala | 35 +---- .../src/main/scala/spark/graph/GraphLab.scala | 6 +- 3 files changed, 61 insertions(+), 114 deletions(-) diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index 48f2087559..b5ffaf1b34 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -26,25 +26,19 @@ object Analytics { /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ - def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], maxIter: Int) = { + def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { // Compute the out degree of each vertex - val outDegree = graph.edges.map { - case Edge(src, target, data) => (src, 1) - }.reduceByKey(_ + _) - val pagerankGraph = graph.updateVertices[Int, (Int, Float)](outDegree, + val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, (vertex, degIter) => (degIter.sum, 1.0F) ) - GraphLab.iterateGAS[Float, (Int, Float), ED](pagerankGraph, + GraphLab.iterateGAS[(Int, Float), ED, Float](pagerankGraph)( (me_id, edge) => edge.src.data._2 / edge.dst.data._1, // gather (a: Float, b: Float) => a + b, // merge 0F, (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply - maxIter).vertices.map { case Vertex(id, (degree, rank)) => (id, rank) } - // println("Computed graph: #edges: " + graph_ret.numEdges + " #vertices" + graph_ret.numVertices) - // graph_ret.vertices.take(10).foreach(println) + numIter).vertices.map{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } } -/* /** * Compute the connected component membership of each vertex @@ -53,41 +47,26 @@ object Analytics { * that vertex. */ def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { - - val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) }) - val edges = graph.edges // .mapValues(v => None) - val ccGraph = new Graph(vertices, edges) - - - ccGraph.iterateStatic( + val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) } + GraphLab.iterateGAS[Int, ED, Int](ccGraph)( (me_id, edge) => edge.otherVertex(me_id).data, // gather (a: Int, b: Int) => math.min(a, b), // merge Integer.MAX_VALUE, (v, a: Int) => math.min(v.data, a), // apply numIter, gatherEdges = EdgeDirection.Both).vertices - // - // graph_ret.vertices.`.foreach(println) - // graph_ret.edges.take(10).foreach(println) } /** * Compute the shortest path to a set of markers */ - def shortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float], - sources: List[Int], numIter: Int) = { + def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = { val sourceSet = sources.toSet - val vertices = graph.vertices.mapPartitions( - iter => iter.map { - case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) ) - }); - - val edges = graph.edges // .mapValues(v => None) - val spGraph = new Graph(vertices, edges) - - val niterations = Int.MaxValue - spGraph.iterateStatic( + val spGraph = graph.mapVertices { + case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue)) + } + GraphLab.iterateGAS[Float, Float, Float](spGraph)( (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather (a: Float, b: Float) => math.min(a, b), // merge Float.MaxValue, @@ -96,7 +75,7 @@ object Analytics { gatherEdges = EdgeDirection.In).vertices } -*/ + // /** @@ -266,7 +245,7 @@ object Analytics { -/* + taskType match { case "pagerank" => { @@ -289,7 +268,7 @@ object Analytics { if(!isDynamic && numIter == Int.MaxValue) { println("Set number of iterations!") - exit(1) + sys.exit(1) } println("======================================") println("| PageRank |") @@ -306,13 +285,13 @@ object Analytics { graph.numEdgePartitions = numEPart val startTime = System.currentTimeMillis - Analytics.pageRank(graph, numIter) - // val pr = if(isDynamic) Analytics.dynamicPageRank(graph, tol, numIter) - // else Analytics.pageRank(graph, numIter) - println("Total rank: " + pr.map(_._2).reduce(_+_)) + val pr = Analytics.pagerank(graph, numIter) + // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) + // else Analytics.pagerank(graph, numIter) + println("Total rank: " + pr.map{ case Vertex(id,r) => r }.reduce(_+_) ) if(!outFname.isEmpty) { println("Saving pageranks of pages to " + outFname) - pr.map(p => p._1 + "\t" + p._2).saveAsTextFile(outFname) + pr.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) } println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") sc.stop() @@ -331,7 +310,7 @@ object Analytics { if(!isDynamic && numIter == Int.MaxValue) { println("Set number of iterations!") - exit(1) + sys.exit(1) } println("======================================") println("| Connected Components |") @@ -343,55 +322,56 @@ object Analytics { val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") val graph = Graph.textFile(sc, fname, a => 1.0F) - Analytics.connectedComponents(graph, numIter) + val cc = Analytics.connectedComponents(graph, numIter) // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) // else Analytics.connectedComponents(graph, numIter) - println("Components: " + cc.map(_._2).distinct()) + println("Components: " + cc.map(_.data).distinct()) sc.stop() } - // case "shortestpath" => { + case "shortestpath" => { - // var numIter = Int.MaxValue - // var isDynamic = true - // var sources: List[Int] = List.empty + var numIter = Int.MaxValue + var isDynamic = true + var sources: List[Int] = List.empty - // options.foreach{ - // case ("numIter", v) => numIter = v.toInt - // case ("dynamic", v) => isDynamic = v.toBoolean - // case ("source", v) => sources ++= List(v.toInt) - // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - // } + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("source", v) => sources ++= List(v.toInt) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } - // if(!isDynamic && numIter == Int.MaxValue) { - // println("Set number of iterations!") - // exit(1) - // } + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } - // if(sources.isEmpty) { - // println("No sources provided!") - // exit(1) - // } + if(sources.isEmpty) { + println("No sources provided!") + sys.exit(1) + } - // println("======================================") - // println("| Shortest Path |") - // println("--------------------------------------") - // println(" Using parameters:") - // println(" \tDynamic: " + isDynamic) - // println(" \tNumIter: " + numIter) - // println(" \tSources: [" + sources.mkString(", ") + "]") - // println("======================================") + println("======================================") + println("| Shortest Path |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + println(" \tNumIter: " + numIter) + println(" \tSources: [" + sources.mkString(", ") + "]") + println("======================================") - // val sc = new SparkContext(host, "ShortestPath(" + fname + ")") - // val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) - // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) - // else Analytics.shortestPath(graph, sources, numIter) - // println("Longest Path: " + cc.map(_._2).reduce(math.max(_,_))) + val sc = new SparkContext(host, "ShortestPath(" + fname + ")") + val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) + val sp = Analytics.shortestPath(graph, sources, numIter) + // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) + // else Analytics.shortestPath(graph, sources, numIter) + println("Longest Path: " + sp.map(_.data).reduce(math.max(_,_))) - // sc.stop() - // } + sc.stop() + } // case "als" => { @@ -449,7 +429,7 @@ object Analytics { println("Invalid task type.") } } - */ + } diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index caf31e04ae..f8e16f24a3 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -159,7 +159,7 @@ object Graph { * Load an edge list from file initializing the Graph RDD */ def textFile[ED: ClassManifest](sc: SparkContext, - fname: String, edgeParser: Array[String] => ED) = { + fname: String, edgeParser: Array[String] => ED ) = { // Parse the edge data table val edges = sc.textFile(fname).map { line => @@ -287,39 +287,6 @@ object Graph { }, preservesPartitioning = true) } - /** - * Load a graph from a text file. - */ - def textFile[ED: Manifest](sc: SparkContext, - fname: String, edgeParser: Array[String] => ED) = { - - // Parse the edge data table - val edges = sc.textFile(fname).map( - line => { - val lineArray = line.split("\\s+") - if(lineArray.length < 2) { - println("Invalid line: " + line) - assert(false) - } - val source = lineArray(0) - val target = lineArray(1) - val tail = lineArray.drop(2) - val edata = edgeParser(tail) - Edge(source.trim.toInt, target.trim.toInt, edata) - }).cache - - // Parse the vertex data table - val vertices = edges.flatMap { - case Edge(source, target, _) => List((source, 1), (target, 1)) - }.reduceByKey(_ + _).map(pair => Vertex(pair._1, pair._2)) - - val graph = new Graph[Int, ED](vertices, edges) - - println("Loaded graph:" + - "\n\t#edges: " + graph.numEdges + - "\n\t#vertices: " + graph.numVertices) - graph - } } diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index b3127c64d3..4bc7c4d2e5 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -6,14 +6,14 @@ import spark.RDD object GraphLab { - def iterateGAS[A: ClassManifest, VD: ClassManifest, ED: ClassManifest]( - graph: Graph[VD, ED], + def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( gather: (Vid, EdgeWithVertices[VD, ED]) => A, merge: (A, A) => A, default: A, apply: (Vertex[VD], A) => VD, numIter: Int, - gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) = { + gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) : + Graph[VD, ED] = { var g = graph.mapVertices(v => Vertex(v.id, VDataWithAcc(v.data, default))).cache()