From 70ba4d1740e4816aaf5a6134db613b34f03c2f75 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 5 May 2013 17:12:30 -0700 Subject: [PATCH] Refactored the Graph API for discussion. --- .../main/scala/spark/graph/Analytics.scala | 682 +++++++++--------- graph/src/main/scala/spark/graph/Edge.scala | 7 + .../scala/spark/graph/EdgeWithVertices.scala | 17 + graph/src/main/scala/spark/graph/Graph.scala | 488 +------------ .../src/main/scala/spark/graph/GraphLab.scala | 174 ++--- .../main/scala/spark/graph/GraphLoader.scala | 54 ++ .../src/main/scala/spark/graph/GraphOps.scala | 30 + graph/src/main/scala/spark/graph/Pregel.scala | 33 +- graph/src/main/scala/spark/graph/Timer.scala | 14 - graph/src/main/scala/spark/graph/Vertex.scala | 11 + .../spark/graph/impl/EdgePartition.scala | 49 ++ .../{ => impl}/EdgeWithVerticesRDD.scala | 4 +- .../scala/spark/graph/impl/GraphImpl.scala | 343 +++++++++ .../spark/graph/{ => perf}/BagelTest.scala | 5 +- .../scala/spark/graph/perf/SparkTest.scala | 72 ++ 15 files changed, 1053 insertions(+), 930 deletions(-) create mode 100644 graph/src/main/scala/spark/graph/Edge.scala create mode 100644 graph/src/main/scala/spark/graph/EdgeWithVertices.scala create mode 100644 graph/src/main/scala/spark/graph/GraphLoader.scala create mode 100644 graph/src/main/scala/spark/graph/GraphOps.scala delete mode 100644 graph/src/main/scala/spark/graph/Timer.scala create mode 100644 graph/src/main/scala/spark/graph/Vertex.scala create mode 100644 graph/src/main/scala/spark/graph/impl/EdgePartition.scala rename graph/src/main/scala/spark/graph/{ => impl}/EdgeWithVerticesRDD.scala (97%) create mode 100644 graph/src/main/scala/spark/graph/impl/GraphImpl.scala rename graph/src/main/scala/spark/graph/{ => perf}/BagelTest.scala (93%) create mode 100644 graph/src/main/scala/spark/graph/perf/SparkTest.scala diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index d18630cde0..e6ad2d05cf 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -7,32 +7,32 @@ import spark.SparkContext._ object Analytics extends Logging { - /** - * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD - */ - // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { - // // Compute the out degree of each vertex - // val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, - // (vertex, deg) => (deg.getOrElse(0), 1.0F) - // ) - // GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)( - // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather - // (a: Float, b: Float) => a + b, // merge - // (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply - // numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } - // } - def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { - // Compute the out degree of each vertex - val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, - (vertex, deg) => (deg.getOrElse(0), 1.0F) - ) - GraphLab.iterateGA2[(Int, Float), ED, Float](pagerankGraph)( - (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather - (a: Float, b: Float) => a + b, // merge - 0.0F, // default - (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply - numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } - } +// /** +// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD +// */ +// // def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { +// // // Compute the out degree of each vertex +// // val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, +// // (vertex, deg) => (deg.getOrElse(0), 1.0F) +// // ) +// // GraphLab.iterateGA[(Int, Float), ED, Float](pagerankGraph)( +// // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather +// // (a: Float, b: Float) => a + b, // merge +// // (vertex, a: Option[Float]) => (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F))), // apply +// // numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } +// // } +// def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { +// // Compute the out degree of each vertex +// val pagerankGraph = graph.updateVertices[Int, (Int, Float)](graph.outDegrees, +// (vertex, deg) => (deg.getOrElse(0), 1.0F) +// ) +// GraphLab.iterateGA2[(Int, Float), ED, Float](pagerankGraph)( +// (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather +// (a: Float, b: Float) => a + b, // merge +// 0.0F, // default +// (vertex, a: Float) => (vertex.data._1, (0.15F + 0.85F * a)), // apply +// numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } +// } /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD @@ -50,372 +50,372 @@ object Analytics extends Logging { numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } } - /** - * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD - */ - def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], - tol: Float, maxIter: Int = 10) = { - // Compute the out degree of each vertex - val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees, - (vertex, degIter) => (degIter.sum, 1.0F, 1.0F) - ) +// /** +// * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD +// */ +// def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], +// tol: Float, maxIter: Int = 10) = { +// // Compute the out degree of each vertex +// val pagerankGraph = graph.updateVertices[Int, (Int, Float, Float)](graph.outDegrees, +// (vertex, degIter) => (degIter.sum, 1.0F, 1.0F) +// ) - // Run PageRank - GraphLab.iterateGAS(pagerankGraph)( - (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather - (a: Float, b: Float) => a + b, - (vertex, a: Option[Float]) => - (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply - (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter - maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) } - } +// // Run PageRank +// GraphLab.iterateGAS(pagerankGraph)( +// (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather +// (a: Float, b: Float) => a + b, +// (vertex, a: Option[Float]) => +// (vertex.data._1, (0.15F + 0.85F * a.getOrElse(0F)), vertex.data._2), // apply +// (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter +// maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) } +// } - /** - * Compute the connected component membership of each vertex - * and return an RDD with the vertex value containing the - * lowest vertex id in the connected component containing - * that vertex. - */ - def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { - val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) } - GraphLab.iterateGA[Int, ED, Int](ccGraph)( - (me_id, edge) => edge.otherVertex(me_id).data, // gather - (a: Int, b: Int) => math.min(a, b), // merge - (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply - numIter, - gatherDirection = EdgeDirection.Both) - } +// /** +// * Compute the connected component membership of each vertex +// * and return an RDD with the vertex value containing the +// * lowest vertex id in the connected component containing +// * that vertex. +// */ +// def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { +// val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) } +// GraphLab.iterateGA[Int, ED, Int](ccGraph)( +// (me_id, edge) => edge.otherVertex(me_id).data, // gather +// (a: Int, b: Int) => math.min(a, b), // merge +// (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply +// numIter, +// gatherDirection = EdgeDirection.Both) +// } - /** - * Compute the shortest path to a set of markers - */ - def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = { - val sourceSet = sources.toSet - val spGraph = graph.mapVertices { - case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue)) - } - GraphLab.iterateGA[Float, Float, Float](spGraph)( - (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather - (a: Float, b: Float) => math.min(a, b), // merge - (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply - numIter, - gatherDirection = EdgeDirection.In) - } +// /** +// * Compute the shortest path to a set of markers +// */ +// def shortestPath[VD: Manifest](graph: Graph[VD, Float], sources: List[Int], numIter: Int) = { +// val sourceSet = sources.toSet +// val spGraph = graph.mapVertices { +// case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue)) +// } +// GraphLab.iterateGA[Float, Float, Float](spGraph)( +// (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather +// (a: Float, b: Float) => math.min(a, b), // merge +// (v, a: Option[Float]) => math.min(v.data, a.getOrElse(Float.MaxValue)), // apply +// numIter, +// gatherDirection = EdgeDirection.In) +// } - // /** - // * Compute the connected component membership of each vertex - // * and return an RDD with the vertex value containing the - // * lowest vertex id in the connected component containing - // * that vertex. - // */ - // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], - // numIter: Int = Int.MaxValue) = { +// // /** +// // * Compute the connected component membership of each vertex +// // * and return an RDD with the vertex value containing the +// // * lowest vertex id in the connected component containing +// // * that vertex. +// // */ +// // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], +// // numIter: Int = Int.MaxValue) = { - // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) }) - // val edges = graph.edges // .mapValues(v => None) - // val ccGraph = new Graph(vertices, edges) +// // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) }) +// // val edges = graph.edges // .mapValues(v => None) +// // val ccGraph = new Graph(vertices, edges) - // ccGraph.iterateDynamic( - // (me_id, edge) => edge.otherVertex(me_id).data, // gather - // (a: Int, b: Int) => math.min(a, b), // merge - // Integer.MAX_VALUE, - // (v, a: Int) => math.min(v.data, a), // apply - // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter - // numIter, - // gatherEdges = EdgeDirection.Both, - // scatterEdges = EdgeDirection.Both).vertices - // // - // // graph_ret.vertices.collect.foreach(println) - // // graph_ret.edges.take(10).foreach(println) - // } +// // ccGraph.iterateDynamic( +// // (me_id, edge) => edge.otherVertex(me_id).data, // gather +// // (a: Int, b: Int) => math.min(a, b), // merge +// // Integer.MAX_VALUE, +// // (v, a: Int) => math.min(v.data, a), // apply +// // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter +// // numIter, +// // gatherEdges = EdgeDirection.Both, +// // scatterEdges = EdgeDirection.Both).vertices +// // // +// // // graph_ret.vertices.collect.foreach(println) +// // // graph_ret.edges.take(10).foreach(println) +// // } - // /** - // * Compute the shortest path to a set of markers - // */ - // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float], - // sources: List[Int], numIter: Int) = { - // val sourceSet = sources.toSet - // val vertices = graph.vertices.mapPartitions( - // iter => iter.map { - // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) ) - // }); +// // /** +// // * Compute the shortest path to a set of markers +// // */ +// // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Float], +// // sources: List[Int], numIter: Int) = { +// // val sourceSet = sources.toSet +// // val vertices = graph.vertices.mapPartitions( +// // iter => iter.map { +// // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Float.MaxValue) ) +// // }); - // val edges = graph.edges // .mapValues(v => None) - // val spGraph = new Graph(vertices, edges) +// // val edges = graph.edges // .mapValues(v => None) +// // val spGraph = new Graph(vertices, edges) - // val niterations = Int.MaxValue - // spGraph.iterateDynamic( - // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather - // (a: Float, b: Float) => math.min(a, b), // merge - // Float.MaxValue, - // (v, a: Float) => math.min(v.data, a), // apply - // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter - // numIter, - // gatherEdges = EdgeDirection.In, - // scatterEdges = EdgeDirection.Out).vertices - // } +// // val niterations = Int.MaxValue +// // spGraph.iterateDynamic( +// // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather +// // (a: Float, b: Float) => math.min(a, b), // merge +// // Float.MaxValue, +// // (v, a: Float) => math.min(v.data, a), // apply +// // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter +// // numIter, +// // gatherEdges = EdgeDirection.In, +// // scatterEdges = EdgeDirection.Out).vertices +// // } - // /** - // * - // */ - // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double], - // latentK: Int, lambda: Double, numIter: Int) = { - // val vertices = graph.vertices.mapPartitions( _.map { - // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } ) - // }).cache - // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) - // val edges = graph.edges // .mapValues(v => None) - // val alsGraph = new Graph(vertices, edges) - // alsGraph.numVPart = graph.numVPart - // alsGraph.numEPart = graph.numEPart +// // /** +// // * +// // */ +// // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double], +// // latentK: Int, lambda: Double, numIter: Int) = { +// // val vertices = graph.vertices.mapPartitions( _.map { +// // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } ) +// // }).cache +// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) +// // val edges = graph.edges // .mapValues(v => None) +// // val alsGraph = new Graph(vertices, edges) +// // alsGraph.numVPart = graph.numVPart +// // alsGraph.numEPart = graph.numEPart - // val niterations = Int.MaxValue - // alsGraph.iterateDynamic[(Array[Double], Array[Double])]( - // (me_id, edge) => { // gather - // val X = edge.otherVertex(me_id).data - // val y = edge.data - // val Xy = X.map(_ * y) - // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray - // (Xy, XtX) - // }, - // (a, b) => { - // // The difference between the while loop and the zip is a FACTOR OF TWO in overall - // // runtime - // var i = 0 - // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 } - // i = 0 - // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 } - // a - // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r }) - // }, - // (Array.empty[Double], Array.empty[Double]), // default value is empty - // (vertex, accum) => { // apply - // val XyArray = accum._1 - // val XtXArray = accum._2 - // if(XyArray.isEmpty) vertex.data // no neighbors - // else { - // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) => - // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) + - // (if(i == j) lambda else 1.0F) //regularization - // } - // val Xy = DenseMatrix.create(latentK,1,XyArray) - // val w = XtX \ Xy - // w.data - // } - // }, - // (me_id, edge) => true, - // numIter, - // gatherEdges = EdgeDirection.Both, - // scatterEdges = EdgeDirection.Both, - // vertex => vertex.id < maxUser).vertices - // } +// // val niterations = Int.MaxValue +// // alsGraph.iterateDynamic[(Array[Double], Array[Double])]( +// // (me_id, edge) => { // gather +// // val X = edge.otherVertex(me_id).data +// // val y = edge.data +// // val Xy = X.map(_ * y) +// // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray +// // (Xy, XtX) +// // }, +// // (a, b) => { +// // // The difference between the while loop and the zip is a FACTOR OF TWO in overall +// // // runtime +// // var i = 0 +// // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 } +// // i = 0 +// // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 } +// // a +// // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r }) +// // }, +// // (Array.empty[Double], Array.empty[Double]), // default value is empty +// // (vertex, accum) => { // apply +// // val XyArray = accum._1 +// // val XtXArray = accum._2 +// // if(XyArray.isEmpty) vertex.data // no neighbors +// // else { +// // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) => +// // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) + +// // (if(i == j) lambda else 1.0F) //regularization +// // } +// // val Xy = DenseMatrix.create(latentK,1,XyArray) +// // val w = XtX \ Xy +// // w.data +// // } +// // }, +// // (me_id, edge) => true, +// // numIter, +// // gatherEdges = EdgeDirection.Both, +// // scatterEdges = EdgeDirection.Both, +// // vertex => vertex.id < maxUser).vertices +// // } - def main(args: Array[String]) = { - val host = args(0) - val taskType = args(1) - val fname = args(2) - val options = args.drop(3).map { arg => - arg.dropWhile(_ == '-').split('=') match { - case Array(opt, v) => (opt -> v) - case _ => throw new IllegalArgumentException("Invalid argument: " + arg) - } - } +// def main(args: Array[String]) = { +// val host = args(0) +// val taskType = args(1) +// val fname = args(2) +// val options = args.drop(3).map { arg => +// arg.dropWhile(_ == '-').split('=') match { +// case Array(opt, v) => (opt -> v) +// case _ => throw new IllegalArgumentException("Invalid argument: " + arg) +// } +// } - System.setProperty("spark.serializer", "spark.KryoSerializer") - //System.setProperty("spark.shuffle.compress", "false") - System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator") +// System.setProperty("spark.serializer", "spark.KryoSerializer") +// //System.setProperty("spark.shuffle.compress", "false") +// System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator") - taskType match { - case "pagerank" => { +// taskType match { +// case "pagerank" => { - var numIter = Int.MaxValue - var isDynamic = false - var tol:Float = 0.001F - var outFname = "" - var numVPart = 4 - var numEPart = 4 +// var numIter = Int.MaxValue +// var isDynamic = false +// var tol:Float = 0.001F +// var outFname = "" +// var numVPart = 4 +// var numEPart = 4 - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case ("tol", v) => tol = v.toFloat - case ("output", v) => outFname = v - case ("numVPart", v) => numVPart = v.toInt - case ("numEPart", v) => numEPart = v.toInt - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("tol", v) => tol = v.toFloat +// case ("output", v) => outFname = v +// case ("numVPart", v) => numVPart = v.toInt +// case ("numEPart", v) => numEPart = v.toInt +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } - if(!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } - println("======================================") - println("| PageRank |") - println("--------------------------------------") - println(" Using parameters:") - println(" \tDynamic: " + isDynamic) - if(isDynamic) println(" \t |-> Tolerance: " + tol) - println(" \tNumIter: " + numIter) - println("======================================") +// if(!isDynamic && numIter == Int.MaxValue) { +// println("Set number of iterations!") +// sys.exit(1) +// } +// println("======================================") +// println("| PageRank |") +// println("--------------------------------------") +// println(" Using parameters:") +// println(" \tDynamic: " + isDynamic) +// if(isDynamic) println(" \t |-> Tolerance: " + tol) +// println(" \tNumIter: " + numIter) +// println("======================================") - val sc = new SparkContext(host, "PageRank(" + fname + ")") +// val sc = new SparkContext(host, "PageRank(" + fname + ")") - val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() +// val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() - val startTime = System.currentTimeMillis - logInfo("GRAPHX: starting tasks") - logInfo("GRAPHX: Number of vertices " + graph.vertices.count) - logInfo("GRAPHX: Number of edges " + graph.edges.count) +// val startTime = System.currentTimeMillis +// logInfo("GRAPHX: starting tasks") +// logInfo("GRAPHX: Number of vertices " + graph.vertices.count) +// logInfo("GRAPHX: Number of edges " + graph.edges.count) - val pr = Analytics.pagerank(graph, numIter) - // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) - // else Analytics.pagerank(graph, numIter) - logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) ) - if (!outFname.isEmpty) { - println("Saving pageranks of pages to " + outFname) - pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) - } - logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") - sc.stop() - } +// val pr = Analytics.pagerank(graph, numIter) +// // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) +// // else Analytics.pagerank(graph, numIter) +// logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) ) +// if (!outFname.isEmpty) { +// println("Saving pageranks of pages to " + outFname) +// pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) +// } +// logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") +// sc.stop() +// } - case "cc" => { +// case "cc" => { - var numIter = Int.MaxValue - var isDynamic = false +// var numIter = Int.MaxValue +// var isDynamic = false - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } - if(!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } - println("======================================") - println("| Connected Components |") - println("--------------------------------------") - println(" Using parameters:") - println(" \tDynamic: " + isDynamic) - println(" \tNumIter: " + numIter) - println("======================================") +// if(!isDynamic && numIter == Int.MaxValue) { +// println("Set number of iterations!") +// sys.exit(1) +// } +// println("======================================") +// println("| Connected Components |") +// println("--------------------------------------") +// println(" Using parameters:") +// println(" \tDynamic: " + isDynamic) +// println(" \tNumIter: " + numIter) +// println("======================================") - val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") - val graph = Graph.textFile(sc, fname, a => 1.0F) - val cc = Analytics.connectedComponents(graph, numIter) - // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) - // else Analytics.connectedComponents(graph, numIter) - println("Components: " + cc.vertices.map(_.data).distinct()) +// val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") +// val graph = Graph.textFile(sc, fname, a => 1.0F) +// val cc = Analytics.connectedComponents(graph, numIter) +// // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) +// // else Analytics.connectedComponents(graph, numIter) +// println("Components: " + cc.vertices.map(_.data).distinct()) - sc.stop() - } +// sc.stop() +// } - case "shortestpath" => { +// case "shortestpath" => { - var numIter = Int.MaxValue - var isDynamic = true - var sources: List[Int] = List.empty +// var numIter = Int.MaxValue +// var isDynamic = true +// var sources: List[Int] = List.empty - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case ("source", v) => sources ++= List(v.toInt) - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("source", v) => sources ++= List(v.toInt) +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } - if(!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } +// if(!isDynamic && numIter == Int.MaxValue) { +// println("Set number of iterations!") +// sys.exit(1) +// } - if(sources.isEmpty) { - println("No sources provided!") - sys.exit(1) - } +// if(sources.isEmpty) { +// println("No sources provided!") +// sys.exit(1) +// } - println("======================================") - println("| Shortest Path |") - println("--------------------------------------") - println(" Using parameters:") - println(" \tDynamic: " + isDynamic) - println(" \tNumIter: " + numIter) - println(" \tSources: [" + sources.mkString(", ") + "]") - println("======================================") +// println("======================================") +// println("| Shortest Path |") +// println("--------------------------------------") +// println(" Using parameters:") +// println(" \tDynamic: " + isDynamic) +// println(" \tNumIter: " + numIter) +// println(" \tSources: [" + sources.mkString(", ") + "]") +// println("======================================") - val sc = new SparkContext(host, "ShortestPath(" + fname + ")") - val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) - val sp = Analytics.shortestPath(graph, sources, numIter) - // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) - // else Analytics.shortestPath(graph, sources, numIter) - println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) +// val sc = new SparkContext(host, "ShortestPath(" + fname + ")") +// val graph = Graph.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) +// val sp = Analytics.shortestPath(graph, sources, numIter) +// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) +// // else Analytics.shortestPath(graph, sources, numIter) +// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) - sc.stop() - } +// sc.stop() +// } - // case "als" => { +// // case "als" => { - // var numIter = 5 - // var lambda = 0.01 - // var latentK = 10 - // var usersFname = "usersFactors.tsv" - // var moviesFname = "moviesFname.tsv" - // var numVPart = 4 - // var numEPart = 4 +// // var numIter = 5 +// // var lambda = 0.01 +// // var latentK = 10 +// // var usersFname = "usersFactors.tsv" +// // var moviesFname = "moviesFname.tsv" +// // var numVPart = 4 +// // var numEPart = 4 - // options.foreach{ - // case ("numIter", v) => numIter = v.toInt - // case ("lambda", v) => lambda = v.toDouble - // case ("latentK", v) => latentK = v.toInt - // case ("usersFname", v) => usersFname = v - // case ("moviesFname", v) => moviesFname = v - // case ("numVPart", v) => numVPart = v.toInt - // case ("numEPart", v) => numEPart = v.toInt - // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - // } +// // options.foreach{ +// // case ("numIter", v) => numIter = v.toInt +// // case ("lambda", v) => lambda = v.toDouble +// // case ("latentK", v) => latentK = v.toInt +// // case ("usersFname", v) => usersFname = v +// // case ("moviesFname", v) => moviesFname = v +// // case ("numVPart", v) => numVPart = v.toInt +// // case ("numEPart", v) => numEPart = v.toInt +// // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// // } - // println("======================================") - // println("| Alternating Least Squares |") - // println("--------------------------------------") - // println(" Using parameters:") - // println(" \tNumIter: " + numIter) - // println(" \tLambda: " + lambda) - // println(" \tLatentK: " + latentK) - // println(" \tusersFname: " + usersFname) - // println(" \tmoviesFname: " + moviesFname) - // println("======================================") +// // println("======================================") +// // println("| Alternating Least Squares |") +// // println("--------------------------------------") +// // println(" Using parameters:") +// // println(" \tNumIter: " + numIter) +// // println(" \tLambda: " + lambda) +// // println(" \tLatentK: " + latentK) +// // println(" \tusersFname: " + usersFname) +// // println(" \tmoviesFname: " + moviesFname) +// // println("======================================") - // val sc = new SparkContext(host, "ALS(" + fname + ")") - // val graph = Graph.textFile(sc, fname, a => a(0).toDouble ) - // graph.numVPart = numVPart - // graph.numEPart = numEPart +// // val sc = new SparkContext(host, "ALS(" + fname + ")") +// // val graph = Graph.textFile(sc, fname, a => a(0).toDouble ) +// // graph.numVPart = numVPart +// // graph.numEPart = numEPart - // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) - // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_)) - // assert(maxUser < minMovie) +// // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) +// // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_)) +// // assert(maxUser < minMovie) - // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache - // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t")) - // .saveAsTextFile(usersFname) - // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t")) - // .saveAsTextFile(moviesFname) +// // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache +// // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t")) +// // .saveAsTextFile(usersFname) +// // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t")) +// // .saveAsTextFile(moviesFname) - // sc.stop() - // } +// // sc.stop() +// // } - case _ => { - println("Invalid task type.") - } - } - } +// case _ => { +// println("Invalid task type.") +// } +// } +// } } diff --git a/graph/src/main/scala/spark/graph/Edge.scala b/graph/src/main/scala/spark/graph/Edge.scala new file mode 100644 index 0000000000..8f022e812a --- /dev/null +++ b/graph/src/main/scala/spark/graph/Edge.scala @@ -0,0 +1,7 @@ +package spark.graph + + +case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( + var src: Vid = 0, + var dst: Vid = 0, + var data: ED = nullValue[ED]) diff --git a/graph/src/main/scala/spark/graph/EdgeWithVertices.scala b/graph/src/main/scala/spark/graph/EdgeWithVertices.scala new file mode 100644 index 0000000000..a731f73709 --- /dev/null +++ b/graph/src/main/scala/spark/graph/EdgeWithVertices.scala @@ -0,0 +1,17 @@ +package spark.graph + + +class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD, + @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] { + var src: Vertex[VD] = _ + var dst: Vertex[VD] = _ + var data: ED = _ + + def otherVertex(vid: Vid): Vertex[VD] = if (src.id == vid) dst else src + + def vertex(vid: Vid): Vertex[VD] = if (src.id == vid) src else dst + + def relativeDirection(vid: Vid): EdgeDirection = { + if (vid == src.id) EdgeDirection.Out else EdgeDirection.In + } +} diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 1db92217de..04b8c840fd 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -1,483 +1,57 @@ package spark.graph -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer - -import it.unimi.dsi.fastutil.ints.IntArrayList - -import spark.{ClosureCleaner, HashPartitioner, Partitioner, RDD} -import spark.SparkContext -import spark.SparkContext._ -import spark.graph.Graph.EdgePartition -import spark.storage.StorageLevel +import spark.RDD -case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] ( - var id: Vid = 0, - var data: VD = nullValue[VD]) { +abstract class Graph[VD: ClassManifest, ED: ClassManifest] { - def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2) + def vertices(): RDD[Vertex[VD]] - def tuple = (id, data) -} + def edges(): RDD[Edge[ED]] + def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] -case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( - var src: Vid = 0, - var dst: Vid = 0, - var data: ED = nullValue[ED]) + def cache(): Graph[VD, ED] + def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] -class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD, - @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] { - var src: Vertex[VD] = _ - var dst: Vertex[VD] = _ - var data: ED = _ - - def otherVertex(vid: Vid): Vertex[VD] = if (src.id == vid) dst else src - - def vertex(vid: Vid): Vertex[VD] = if (src.id == vid) src else dst - - def relativeDirection(vid: Vid): EdgeDirection = { - if (vid == src.id) EdgeDirection.Out else EdgeDirection.In - } -} - - -/** - * A Graph RDD that supports computation on graphs. - */ -class Graph[VD: ClassManifest, ED: ClassManifest] protected ( - val numVertexPartitions: Int, - val numEdgePartitions: Int, - _rawVertices: RDD[Vertex[VD]], - _rawEdges: RDD[Edge[ED]], - _rawVTable: RDD[(Vid, (VD, Array[Pid]))], - _rawETable: RDD[(Pid, EdgePartition[ED])]) { - - def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = { - this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null) - } - - def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = { - if (_cached) { - (new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)) - .cache() - } else { - new Graph(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null) - } - } - - def withVertexPartitioner(numVertexPartitions: Int) = { - withPartitioner(numVertexPartitions, numEdgePartitions) - } - - def withEdgePartitioner(numEdgePartitions: Int) = { - withPartitioner(numVertexPartitions, numEdgePartitions) - } - - protected var _cached = false - - def cache(): Graph[VD, ED] = { - eTable.cache() - vTable.cache() - _cached = true - this - } - - /** Return a RDD of vertices. */ - def vertices: RDD[Vertex[VD]] = { - if (!_cached && _rawVertices != null) { - _rawVertices - } else { - vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } - } - } - - /** Return a RDD of edges. */ - def edges: RDD[Edge[ED]] = { - if (!_cached && _rawEdges != null) { - _rawEdges - } else { - eTable.mapPartitions { iter => iter.next._2.iterator } - } - } - - /** Return a RDD that brings edges with its source and destination vertices together. */ - def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = { - (new EdgeWithVerticesRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 } - } - - lazy val numEdges: Long = edges.count() - - lazy val numVertices: Long = vertices.count() - - lazy val inDegrees = mapReduceNeighborhood[Vid]((vid, edge) => 1, _+_, 0, EdgeDirection.In) - - lazy val outDegrees = mapReduceNeighborhood[Vid]((vid,edge) => 1, _+_, 0, EdgeDirection.Out) - - lazy val degrees = mapReduceNeighborhood[Vid]((vid,edge) => 1, _+_, 0, EdgeDirection.Both) + def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] /** Return a new graph with its edge directions reversed. */ - lazy val reverse: Graph[VD,ED] = { - newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) }) - } + def reverse: Graph[VD, ED] - def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { - mapReduceNeighborhood[Array[Vid]]( - (vid, edge) => Array(edge.otherVertex(vid).id), - (a, b) => a ++ b, - Array.empty[Vid], - edgeDirection) - } + def aggregateNeighbors[VD2: ClassManifest]( + mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], + reduceFunc: (VD2, VD2) => VD2, + gatherDirection: EdgeDirection) + : RDD[(Vid, VD2)] - def mapVertices[VD2: ClassManifest](f: Vertex[VD] => Vertex[VD2]): Graph[VD2, ED] = { - newGraph(vertices.map(f), edges) - } - - def mapEdges[ED2: ClassManifest](f: Edge[ED] => Edge[ED2]): Graph[VD, ED2] = { - newGraph(vertices, edges.map(f)) - } - - ////////////////////////////////////////////////////////////////////////////////////////////////// - // Lower level transformation methods - ////////////////////////////////////////////////////////////////////////////////////////////////// - - def mapReduceNeighborhood[VD2: ClassManifest]( - mapFunc: (Vid, EdgeWithVertices[VD, ED]) => VD2, - reduceFunc: (VD2, VD2) => VD2, - default: VD2, - gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = { - - ClosureCleaner.clean(mapFunc) - ClosureCleaner.clean(reduceFunc) - - val newVTable = vTableReplicated.mapPartitions({ part => - part.map { v => (v._1, MutableTuple2(v._2, default)) } - }, preservesPartitioning = true) - - (new EdgeWithVerticesRDD[MutableTuple2[VD, VD2], ED](newVTable, eTable)) - .mapPartitions { part => - val (vmap, edges) = part.next() - val edgeSansAcc = new EdgeWithVertices[VD, ED]() - edgeSansAcc.src = new Vertex[VD] - edgeSansAcc.dst = new Vertex[VD] - edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, VD2], ED] => - edgeSansAcc.data = e.data - edgeSansAcc.src.data = e.src.data._1 - edgeSansAcc.dst.data = e.dst.data._1 - edgeSansAcc.src.id = e.src.id - edgeSansAcc.dst.id = e.dst.id - if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) { - e.dst.data._2 = reduceFunc(e.dst.data._2, mapFunc(edgeSansAcc.dst.id, edgeSansAcc)) - } - if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { - e.src.data._2 = reduceFunc(e.src.data._2, mapFunc(edgeSansAcc.src.id, edgeSansAcc)) - } - } - vmap.int2ObjectEntrySet().fastIterator().map{ entry => - (entry.getIntKey(), entry.getValue()._2) - } - } - .combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false) - } - - /** - * Same as mapReduceNeighborhood but map function can return none and there is no default value. - * As a consequence, the resulting table may be much smaller than the set of vertices. - */ - def flatMapReduceNeighborhood[VD2: ClassManifest]( - mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], - reduceFunc: (VD2, VD2) => VD2, - gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = { - - ClosureCleaner.clean(mapFunc) - ClosureCleaner.clean(reduceFunc) - - val newVTable = vTableReplicated.mapPartitions({ part => - part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) } - }, preservesPartitioning = true) - - (new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable)) - .mapPartitions { part => - val (vmap, edges) = part.next() - val edgeSansAcc = new EdgeWithVertices[VD, ED]() - edgeSansAcc.src = new Vertex[VD] - edgeSansAcc.dst = new Vertex[VD] - edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] => - edgeSansAcc.data = e.data - edgeSansAcc.src.data = e.src.data._1 - edgeSansAcc.dst.data = e.dst.data._1 - edgeSansAcc.src.id = e.src.id - edgeSansAcc.dst.id = e.dst.id - if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) { - e.dst.data._2 = - if (e.dst.data._2.isEmpty) { - mapFunc(edgeSansAcc.dst.id, edgeSansAcc) - } else { - val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc) - if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2 - } - } - if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { - e.dst.data._2 = - if (e.dst.data._2.isEmpty) { - mapFunc(edgeSansAcc.src.id, edgeSansAcc) - } else { - val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc) - if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2 - } - } - } - vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry => - (entry.getIntKey(), entry.getValue()._2) - } - } - .map{ case (vid, aOpt) => (vid, aOpt.get) } - .combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false) - } + def aggregateNeighbors[VD2: ClassManifest]( + mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], + reduceFunc: (VD2, VD2) => VD2, + default: VD2, // Should this be a function or a value? + gatherDirection: EdgeDirection) + : RDD[(Vid, VD2)] def updateVertices[U: ClassManifest, VD2: ClassManifest]( updates: RDD[(Vid, U)], updateFunc: (Vertex[VD], Option[U]) => VD2) - : Graph[VD2, ED] = { + : Graph[VD2, ED] - ClosureCleaner.clean(updateFunc) + // This one can be used to skip records when we can do in-place update. + // Annoying that we can't rename it ... + def updateVertices2[U: ClassManifest]( + updates: RDD[(Vid, U)], + updateFunc: (Vertex[VD], U) => VD) + : Graph[VD, ED] - val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter => - iter.map { case (vid, ((vdata, pids), update)) => - val newVdata = updateFunc(Vertex(vid, vdata), update) - (vid, (newVdata, pids)) - } - }, preservesPartitioning = true).cache() - - new Graph(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable) - } - - // def mapPartitions[U: ClassManifest]( - // f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U], - // preservesPartitioning: Boolean = false): RDD[U] = { - // (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part => - // val (vmap, iter) = part.next() - // f(vmap, iter) - // }, preservesPartitioning) - // } - - ////////////////////////////////////////////////////////////////////////////////////////////////// - // Internals hidden from callers - ////////////////////////////////////////////////////////////////////////////////////////////////// - - // TODO: Support non-hash partitioning schemes. - protected val vertexPartitioner = new HashPartitioner(numVertexPartitions) - protected val edgePartitioner = new HashPartitioner(numEdgePartitions) - - /** Create a new graph but keep the current partitioning scheme. */ - protected def newGraph[VD2: ClassManifest, ED2: ClassManifest]( - vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = { - (new Graph[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions) - } - - protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = { - if (_rawETable == null) { - Graph.createETable(_rawEdges, numEdgePartitions) - } else { - _rawETable - } - } - - protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = { - if (_rawVTable == null) { - Graph.createVTable(_rawVertices, eTable, numVertexPartitions) - } else { - _rawVTable - } - } - - protected lazy val vTableReplicated: RDD[(Vid, VD)] = { - // Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get - // the shuffle id so we can use it on the slave. - vTable - .flatMap { case (vid, (vdata, pids)) => pids.iterator.map { pid => (pid, (vid, vdata)) } } - .partitionBy(edgePartitioner) - .mapPartitions( - { part => part.map { case(pid, (vid, vdata)) => (vid, vdata) } }, - preservesPartitioning = true) - } + // Save a copy of the GraphOps object so there is always one unique GraphOps object + // for a given Graph object, and thus the lazy vals in GraphOps would work as intended. + val ops = new GraphOps(this) } object Graph { - /** - * Load an edge list from file initializing the Graph RDD - */ - def textFile[ED: ClassManifest]( - sc: SparkContext, - path: String, - edgeParser: Array[String] => ED, - minEdgePartitions: Int = 1, - minVertexPartitions: Int = 1) - : Graph[Int, ED] = { - - // Parse the edge data table - val edges = sc.textFile(path).flatMap { line => - if (!line.isEmpty && line(0) != '#') { - val lineArray = line.split("\\s+") - if(lineArray.length < 2) { - println("Invalid line: " + line) - assert(false) - } - val source = lineArray(0) - val target = lineArray(1) - val tail = lineArray.drop(2) - val edata = edgeParser(tail) - Array(Edge(source.trim.toInt, target.trim.toInt, edata)) - } else { - Array.empty[Edge[ED]] - } - }.cache() - - val graph = fromEdges(edges) - // println("Loaded graph:" + - // "\n\t#edges: " + graph.numEdges + - // "\n\t#vertices: " + graph.numVertices) - - graph - } - - def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): Graph[Int, ED] = { - val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) } - .reduceByKey(_ + _) - .map{ case (vid, degree) => Vertex(vid, degree) } - (new Graph[Int, ED](vertices, edges)) - } - - /** - * Make k-cycles - */ - def kCycles(sc: SparkContext, numCycles: Int = 3, size: Int = 3) = { - // Construct the edges - val edges = sc.parallelize(for (i <- 0 until numCycles; j <- 0 until size) yield { - val offset = i * numCycles - val source = offset + j - val target = offset + ((j + 1) % size) - Edge(source, target, i * numCycles + j) - }) - // Change vertex data to be the lowest vertex id of the vertex in that cycle - val graph = fromEdges(edges).mapVertices{ - case Vertex(id, degree) => Vertex(id, (id/numCycles) * numCycles) - } - graph - } - - /** - * Make a regular grid graph - **/ - def grid(sc: SparkContext, numRows: Int = 5, numCols: Int = 5) = { - def coord(vid: Int) = (vid % numRows, vid / numRows) - val vertices = sc.parallelize( 0 until (numRows * numCols) ).map( - vid => Vertex(vid, coord(vid))) - def index(r: Int, c:Int) = (r + c * numRows) - val edges = vertices.flatMap{ case Vertex(vid, (r,c)) => - (if(r+1 < numRows) List(Edge(vid, index(r+1,c), 1.0F)) else List.empty) ++ - (if(c+1 < numCols) List(Edge(vid, index(r,c+1), 1.0F)) else List.empty) - } - new Graph(vertices, edges) - } - - - /** - * A partition of edges in 3 large columnar arrays. - */ - private[graph] - class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] - { - val srcIds: IntArrayList = new IntArrayList - val dstIds: IntArrayList = new IntArrayList - // TODO: Specialize data. - val data: ArrayBuffer[ED] = new ArrayBuffer[ED] - - /** Add a new edge to the partition. */ - def add(src: Vid, dst: Vid, d: ED) { - srcIds.add(src) - dstIds.add(dst) - data += d - } - - def trim() { - srcIds.trim() - dstIds.trim() - } - - def size: Int = srcIds.size - - def iterator = new Iterator[Edge[ED]] { - private var edge = new Edge[ED] - private var pos = 0 - - override def hasNext: Boolean = pos < EdgePartition.this.size - - override def next(): Edge[ED] = { - edge.src = srcIds.get(pos) - edge.dst = dstIds.get(pos) - edge.data = data(pos) - pos += 1 - edge - } - } - } - - /** - * Create the edge table RDD, which is much more efficient for Java heap storage than the - * normal edges data structure (RDD[(Vid, Vid, ED)]). - * - * The edge table contains multiple partitions, and each partition contains only one RDD - * key-value pair: the key is the partition id, and the value is an EdgePartition object - * containing all the edges in a partition. - */ - protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int) - : RDD[(Pid, EdgePartition[ED])] = { - edges - .map { e => - // Random partitioning based on the source vertex id. - (math.abs(e.src) % numPartitions, (e.src, e.dst, e.data)) - } - .partitionBy(new HashPartitioner(numPartitions)) - .mapPartitionsWithIndex({ (pid, iter) => - val edgePartition = new Graph.EdgePartition[ED] - iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) } - Iterator((pid, edgePartition)) - }, preservesPartitioning = true) - } - - protected def createVTable[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[Vertex[VD]], - eTable: RDD[(Pid, EdgePartition[ED])], - numPartitions: Int) - : RDD[(Vid, (VD, Array[Pid]))] = { - val partitioner = new HashPartitioner(numPartitions) - - // A key-value RDD. The key is a vertex id, and the value is a list of - // partitions that contains edges referencing the vertex. - val vid2pid : RDD[(Int, Seq[Pid])] = eTable.mapPartitions { iter => - val (pid, edgePartition) = iter.next() - val vSet = new it.unimi.dsi.fastutil.ints.IntOpenHashSet - var i = 0 - while (i < edgePartition.srcIds.size) { - vSet.add(edgePartition.srcIds.getInt(i)) - vSet.add(edgePartition.dstIds.getInt(i)) - i += 1 - } - vSet.iterator.map { vid => (vid.intValue, pid) } - }.groupByKey(partitioner) - - vertices - .map { v => (v.id, v.data) } - .partitionBy(partitioner) - .leftOuterJoin(vid2pid) - .mapValues { - case (vdata, None) => (vdata, Array.empty[Pid]) - case (vdata, Some(pids)) => (vdata, pids.toArray) - } - } + implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops } diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index 4de453663d..9c157b9361 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -6,116 +6,116 @@ import spark.RDD object GraphLab { - def iterateGA2[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( - gather: (Vid, EdgeWithVertices[VD, ED]) => A, - merge: (A, A) => A, - default: A, - apply: (Vertex[VD], A) => VD, - numIter: Int, - gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { + // def iterateGA2[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + // gather: (Vid, EdgeWithVertices[VD, ED]) => A, + // merge: (A, A) => A, + // default: A, + // apply: (Vertex[VD], A) => VD, + // numIter: Int, + // gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { - var g = graph.cache() + // var g = graph.cache() - var i = 0 - while (i < numIter) { + // var i = 0 + // while (i < numIter) { - val accUpdates: RDD[(Vid, A)] = - g.mapReduceNeighborhood(gather, merge, default, gatherDirection) + // val accUpdates: RDD[(Vid, A)] = + // g.aggregateNeighbors(gather, merge, default, gatherDirection) - def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } - g = g.updateVertices(accUpdates, applyFunc).cache() + // def applyFunc(v: Vertex[VD], update: Option[A]): VD = { apply(v, update.get) } + // g = g.updateVertices(accUpdates, applyFunc).cache() - i += 1 - } - g - } + // i += 1 + // } + // g + // } - def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( - gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, - mergeFunc: (A, A) => A, - applyFunc: (Vertex[VD], Option[A]) => VD, - numIter: Int, - gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { + // def iterateGA[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + // gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, + // mergeFunc: (A, A) => A, + // applyFunc: (Vertex[VD], Option[A]) => VD, + // numIter: Int, + // gatherDirection: EdgeDirection = EdgeDirection.In) : Graph[VD, ED] = { - var g = graph.cache() + // var g = graph.cache() - def someGather(vid: Vid, edge: EdgeWithVertices[VD, ED]) = Some(gatherFunc(vid, edge)) + // def someGather(vid: Vid, edge: EdgeWithVertices[VD, ED]) = Some(gatherFunc(vid, edge)) - var i = 0 - while (i < numIter) { + // var i = 0 + // while (i < numIter) { - val accUpdates: RDD[(Vid, A)] = - g.flatMapReduceNeighborhood(someGather, mergeFunc, gatherDirection) + // val accUpdates: RDD[(Vid, A)] = + // g.flatMapReduceNeighborhood(someGather, mergeFunc, gatherDirection) - g = g.updateVertices(accUpdates, applyFunc).cache() + // g = g.updateVertices(accUpdates, applyFunc).cache() - i += 1 - } - g - } + // i += 1 + // } + // g + // } - def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( - gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, - mergeFunc: (A, A) => A, - applyFunc: (Vertex[VD], Option[A]) => VD, - scatterFunc: (Vid, EdgeWithVertices[VD, ED]) => Boolean, - numIter: Int, - gatherDirection: EdgeDirection = EdgeDirection.In, - scatterDirection: EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = { + // def iterateGAS[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + // gatherFunc: (Vid, EdgeWithVertices[VD, ED]) => A, + // mergeFunc: (A, A) => A, + // applyFunc: (Vertex[VD], Option[A]) => VD, + // scatterFunc: (Vid, EdgeWithVertices[VD, ED]) => Boolean, + // numIter: Int, + // gatherDirection: EdgeDirection = EdgeDirection.In, + // scatterDirection: EdgeDirection = EdgeDirection.Out) : Graph[VD, ED] = { - var g = graph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache() + // var g = graph.mapVertices{ case Vertex(id,data) => Vertex(id, (true, data)) }.cache() - def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = { - if(e.vertex(vid).data._1) { - val edge = new EdgeWithVertices[VD,ED] - edge.src = Vertex(e.src.id, e.src.data._2) - edge.dst = Vertex(e.dst.id, e.dst.data._2) - Some(gatherFunc(vid, edge)) - } else { - None - } - } + // def gather(vid: Vid, e: EdgeWithVertices[(Boolean, VD), ED]) = { + // if(e.vertex(vid).data._1) { + // val edge = new EdgeWithVertices[VD,ED] + // edge.src = Vertex(e.src.id, e.src.data._2) + // edge.dst = Vertex(e.dst.id, e.dst.data._2) + // Some(gatherFunc(vid, edge)) + // } else { + // None + // } + // } - def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = { - if(v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum)) - else (false, v.data._2) - } + // def apply(v: Vertex[(Boolean, VD)], accum: Option[A]) = { + // if(v.data._1) (true, applyFunc(Vertex(v.id, v.data._2), accum)) + // else (false, v.data._2) + // } - def scatter(rawVid: Vid, e: EdgeWithVertices[(Boolean, VD),ED]) = { - val vid = e.otherVertex(rawVid).id - if(e.vertex(vid).data._1) { - val edge = new EdgeWithVertices[VD,ED] - edge.src = Vertex(e.src.id, e.src.data._2) - edge.dst = Vertex(e.dst.id, e.dst.data._2) - Some(scatterFunc(vid, edge)) - } else { - None - } - } + // def scatter(rawVid: Vid, e: EdgeWithVertices[(Boolean, VD),ED]) = { + // val vid = e.otherVertex(rawVid).id + // if(e.vertex(vid).data._1) { + // val edge = new EdgeWithVertices[VD,ED] + // edge.src = Vertex(e.src.id, e.src.data._2) + // edge.dst = Vertex(e.dst.id, e.dst.data._2) + // Some(scatterFunc(vid, edge)) + // } else { + // None + // } + // } - def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) = - (accum.getOrElse(false), v.data._2) + // def applyActive(v: Vertex[(Boolean, VD)], accum: Option[Boolean]) = + // (accum.getOrElse(false), v.data._2) - var i = 0 - var numActive = g.numVertices - while (i < numIter && numActive > 0) { + // var i = 0 + // var numActive = g.numVertices + // while (i < numIter && numActive > 0) { - val accUpdates: RDD[(Vid, A)] = - g.flatMapReduceNeighborhood(gather, mergeFunc, gatherDirection) + // val accUpdates: RDD[(Vid, A)] = + // g.flatMapReduceNeighborhood(gather, mergeFunc, gatherDirection) - g = g.updateVertices(accUpdates, apply).cache() + // g = g.updateVertices(accUpdates, apply).cache() - // Scatter is basically a gather in the opposite direction so we reverse the edge direction - val activeVertices: RDD[(Vid, Boolean)] = - g.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection.reverse) + // // Scatter is basically a gather in the opposite direction so we reverse the edge direction + // val activeVertices: RDD[(Vid, Boolean)] = + // g.flatMapReduceNeighborhood(scatter, _ || _, scatterDirection.reverse) - g = g.updateVertices(activeVertices, applyActive).cache() + // g = g.updateVertices(activeVertices, applyActive).cache() - numActive = g.vertices.map(v => if (v.data._1) 1 else 0).reduce( _ + _ ) - println("Number active vertices: " + numActive) - i += 1 - } + // numActive = g.vertices.map(v => if (v.data._1) 1 else 0).reduce( _ + _ ) + // println("Number active vertices: " + numActive) + // i += 1 + // } - g.mapVertices(v => Vertex(v.id, v.data._2)) - } + // g.mapVertices(v => Vertex(v.id, v.data._2)) + // } } diff --git a/graph/src/main/scala/spark/graph/GraphLoader.scala b/graph/src/main/scala/spark/graph/GraphLoader.scala new file mode 100644 index 0000000000..7e1a054413 --- /dev/null +++ b/graph/src/main/scala/spark/graph/GraphLoader.scala @@ -0,0 +1,54 @@ +package spark.graph + +import spark.RDD +import spark.SparkContext +import spark.SparkContext._ +import spark.graph.impl.GraphImpl + + +object GraphLoader { + + /** + * Load an edge list from file initializing the Graph RDD + */ + def textFile[ED: ClassManifest]( + sc: SparkContext, + path: String, + edgeParser: Array[String] => ED, + minEdgePartitions: Int = 1, + minVertexPartitions: Int = 1) + : GraphImpl[Int, ED] = { + + // Parse the edge data table + val edges = sc.textFile(path).flatMap { line => + if (!line.isEmpty && line(0) != '#') { + val lineArray = line.split("\\s+") + if(lineArray.length < 2) { + println("Invalid line: " + line) + assert(false) + } + val source = lineArray(0) + val target = lineArray(1) + val tail = lineArray.drop(2) + val edata = edgeParser(tail) + Array(Edge(source.trim.toInt, target.trim.toInt, edata)) + } else { + Array.empty[Edge[ED]] + } + }.cache() + + val graph = fromEdges(edges) + // println("Loaded graph:" + + // "\n\t#edges: " + graph.numEdges + + // "\n\t#vertices: " + graph.numVertices) + + graph + } + + def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { + val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) } + .reduceByKey(_ + _) + .map{ case (vid, degree) => Vertex(vid, degree) } + new GraphImpl[Int, ED](vertices, edges) + } +} diff --git a/graph/src/main/scala/spark/graph/GraphOps.scala b/graph/src/main/scala/spark/graph/GraphOps.scala new file mode 100644 index 0000000000..4fba8d1976 --- /dev/null +++ b/graph/src/main/scala/spark/graph/GraphOps.scala @@ -0,0 +1,30 @@ +package spark.graph + +import spark.RDD + + +class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) { + + lazy val numEdges: Long = g.edges.count() + + lazy val numVertices: Long = g.vertices.count() + + lazy val inDegrees: RDD[(Vid, Int)] = { + g.aggregateNeighbors((vid, edge) => Some(1), _+_, EdgeDirection.In) + } + + lazy val outDegrees: RDD[(Vid, Int)] = { + g.aggregateNeighbors((vid,edge) => Some(1), _+_, EdgeDirection.Out) + } + + lazy val degrees: RDD[(Vid, Int)] = { + g.aggregateNeighbors((vid,edge) => Some(1), _+_, EdgeDirection.Both) + } + + def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { + g.aggregateNeighbors( + (vid, edge) => Some(Array(edge.otherVertex(vid).id)), + (a, b) => a ++ b, + edgeDirection) + } +} diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/spark/graph/Pregel.scala index 4bd8810634..7f8849e442 100644 --- a/graph/src/main/scala/spark/graph/Pregel.scala +++ b/graph/src/main/scala/spark/graph/Pregel.scala @@ -16,41 +16,20 @@ object Pregel { var g = graph.cache var i = 0 - def reverseGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = - sendMsg(edge.otherVertex(vid).id, edge) + def mapF(vid: Vid, edge: EdgeWithVertices[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge) + + def runProg(v: Vertex[VD], msg: Option[A]): VD = { + if (msg.isEmpty) v.data else vprog(v, msg.get) + } var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) } while (i < numIter) { - - def runProg(v: Vertex[VD], msg: Option[A]): VD = if(msg.isEmpty) v.data else vprog(v, msg.get) - g = g.updateVertices(msgs, runProg).cache() - - msgs = g.flatMapReduceNeighborhood(reverseGather, mergeMsg, EdgeDirection.In) - + msgs = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) i += 1 } g } - - def iterateOriginal[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( - rawGraph: Graph[VD, ED])( - vprog: ( Vertex[VD], A, Seq[Vid]) => Seq[(Vid, A)], - mergeMsg: (A, A) => A, - numIter: Int) : Graph[VD, ED] = { - - var graph = rawGraph.cache - var i = 0 - - val outNbrIds : RDD[(Vid, Array[Vid])] = graph.collectNeighborIds(EdgeDirection.Out) - - /// Todo implement - /// vprog takes the vertex, the message (A), and list of out neighbor ids - - graph - - } - } diff --git a/graph/src/main/scala/spark/graph/Timer.scala b/graph/src/main/scala/spark/graph/Timer.scala deleted file mode 100644 index 5ed4d70e1b..0000000000 --- a/graph/src/main/scala/spark/graph/Timer.scala +++ /dev/null @@ -1,14 +0,0 @@ -package spark.graph - - -class Timer { - - var lastTime = System.currentTimeMillis - - def tic = { - val currentTime = System.currentTimeMillis - val elapsedTime = (currentTime - lastTime)/1000.0 - lastTime = currentTime - elapsedTime - } -} diff --git a/graph/src/main/scala/spark/graph/Vertex.scala b/graph/src/main/scala/spark/graph/Vertex.scala new file mode 100644 index 0000000000..543cc8e942 --- /dev/null +++ b/graph/src/main/scala/spark/graph/Vertex.scala @@ -0,0 +1,11 @@ +package spark.graph + + +case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] ( + var id: Vid = 0, + var data: VD = nullValue[VD]) { + + def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2) + + def tuple = (id, data) +} diff --git a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/spark/graph/impl/EdgePartition.scala new file mode 100644 index 0000000000..e5ed2db0f2 --- /dev/null +++ b/graph/src/main/scala/spark/graph/impl/EdgePartition.scala @@ -0,0 +1,49 @@ +package spark.graph.impl + +import scala.collection.mutable.ArrayBuffer + +import it.unimi.dsi.fastutil.ints.IntArrayList + +import spark.graph._ + + +/** + * A partition of edges in 3 large columnar arrays. + */ +private[graph] +class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] +{ + val srcIds: IntArrayList = new IntArrayList + val dstIds: IntArrayList = new IntArrayList + // TODO: Specialize data. + val data: ArrayBuffer[ED] = new ArrayBuffer[ED] + + /** Add a new edge to the partition. */ + def add(src: Vid, dst: Vid, d: ED) { + srcIds.add(src) + dstIds.add(dst) + data += d + } + + def trim() { + srcIds.trim() + dstIds.trim() + } + + def size: Int = srcIds.size + + def iterator = new Iterator[Edge[ED]] { + private var edge = new Edge[ED] + private var pos = 0 + + override def hasNext: Boolean = pos < EdgePartition.this.size + + override def next(): Edge[ED] = { + edge.src = srcIds.get(pos) + edge.dst = dstIds.get(pos) + edge.data = data(pos) + pos += 1 + edge + } + } +} \ No newline at end of file diff --git a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala b/graph/src/main/scala/spark/graph/impl/EdgeWithVerticesRDD.scala similarity index 97% rename from graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala rename to graph/src/main/scala/spark/graph/impl/EdgeWithVerticesRDD.scala index 91e4d0e017..5370b4e160 100644 --- a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala +++ b/graph/src/main/scala/spark/graph/impl/EdgeWithVerticesRDD.scala @@ -1,9 +1,9 @@ -package spark.graph +package spark.graph.impl import spark.{Aggregator, HashPartitioner, Partition, RDD, SparkEnv, TaskContext} import spark.{Dependency, OneToOneDependency, ShuffleDependency} import spark.SparkContext._ -import spark.graph.Graph.EdgePartition +import spark.graph._ private[graph] diff --git a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala new file mode 100644 index 0000000000..c362746d22 --- /dev/null +++ b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala @@ -0,0 +1,343 @@ +package spark.graph.impl + +import scala.collection.JavaConversions._ + +import spark.ClosureCleaner +import spark.HashPartitioner +import spark.Partitioner +import spark.RDD +import spark.SparkContext +import spark.SparkContext._ + +import spark.graph._ +import spark.graph.impl.GraphImpl._ + + +/** + * A Graph RDD that supports computation on graphs. + */ +class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( + val numVertexPartitions: Int, + val numEdgePartitions: Int, + _rawVertices: RDD[Vertex[VD]], + _rawEdges: RDD[Edge[ED]], + _rawVTable: RDD[(Vid, (VD, Array[Pid]))], + _rawETable: RDD[(Pid, EdgePartition[ED])]) + extends Graph[VD, ED] { + + def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = { + this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null) + } + + def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = { + if (_cached) { + (new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)) + .cache() + } else { + new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null) + } + } + + def withVertexPartitioner(numVertexPartitions: Int) = { + withPartitioner(numVertexPartitions, numEdgePartitions) + } + + def withEdgePartitioner(numEdgePartitions: Int) = { + withPartitioner(numVertexPartitions, numEdgePartitions) + } + + protected var _cached = false + + override def cache(): Graph[VD, ED] = { + eTable.cache() + vTable.cache() + _cached = true + this + } + + override def reverse: Graph[VD, ED] = { + newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) }) + } + + /** Return a RDD of vertices. */ + override def vertices: RDD[Vertex[VD]] = { + if (!_cached && _rawVertices != null) { + _rawVertices + } else { + vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } + } + } + + /** Return a RDD of edges. */ + override def edges: RDD[Edge[ED]] = { + if (!_cached && _rawEdges != null) { + _rawEdges + } else { + eTable.mapPartitions { iter => iter.next._2.iterator } + } + } + + /** Return a RDD that brings edges with its source and destination vertices together. */ + override def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = { + (new EdgeWithVerticesRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 } + } + + override def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] = { + newGraph(vertices.map(v => Vertex(v.id, f(v))), edges) + } + + override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = { + newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e)))) + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // Lower level transformation methods + ////////////////////////////////////////////////////////////////////////////////////////////////// + + override def aggregateNeighbors[VD2: ClassManifest]( + mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], + reduceFunc: (VD2, VD2) => VD2, + default: VD2, + gatherDirection: EdgeDirection) + : RDD[(Vid, VD2)] = { + + ClosureCleaner.clean(mapFunc) + ClosureCleaner.clean(reduceFunc) + + val newVTable = vTableReplicated.mapPartitions({ part => + part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) } + }, preservesPartitioning = true) + + (new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable)) + .mapPartitions { part => + val (vmap, edges) = part.next() + val edgeSansAcc = new EdgeWithVertices[VD, ED]() + edgeSansAcc.src = new Vertex[VD] + edgeSansAcc.dst = new Vertex[VD] + edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] => + edgeSansAcc.data = e.data + edgeSansAcc.src.data = e.src.data._1 + edgeSansAcc.dst.data = e.dst.data._1 + edgeSansAcc.src.id = e.src.id + edgeSansAcc.dst.id = e.dst.id + if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) { + e.dst.data._2 = + if (e.dst.data._2.isEmpty) { + mapFunc(edgeSansAcc.dst.id, edgeSansAcc) + } else { + val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc) + if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2 + } + } + if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { + e.dst.data._2 = + if (e.dst.data._2.isEmpty) { + mapFunc(edgeSansAcc.src.id, edgeSansAcc) + } else { + val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc) + if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2 + } + } + } + vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry => + (entry.getIntKey(), entry.getValue()._2) + } + } + .map{ case (vid, aOpt) => (vid, aOpt.get) } + .combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false) + } + + /** + * Same as mapReduceNeighborhood but map function can return none and there is no default value. + * As a consequence, the resulting table may be much smaller than the set of vertices. + */ + override def aggregateNeighbors[VD2: ClassManifest]( + mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], + reduceFunc: (VD2, VD2) => VD2, + gatherDirection: EdgeDirection): RDD[(Vid, VD2)] = { + + ClosureCleaner.clean(mapFunc) + ClosureCleaner.clean(reduceFunc) + + val newVTable = vTableReplicated.mapPartitions({ part => + part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) } + }, preservesPartitioning = true) + + (new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable)) + .mapPartitions { part => + val (vmap, edges) = part.next() + val edgeSansAcc = new EdgeWithVertices[VD, ED]() + edgeSansAcc.src = new Vertex[VD] + edgeSansAcc.dst = new Vertex[VD] + edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] => + edgeSansAcc.data = e.data + edgeSansAcc.src.data = e.src.data._1 + edgeSansAcc.dst.data = e.dst.data._1 + edgeSansAcc.src.id = e.src.id + edgeSansAcc.dst.id = e.dst.id + if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) { + e.dst.data._2 = + if (e.dst.data._2.isEmpty) { + mapFunc(edgeSansAcc.dst.id, edgeSansAcc) + } else { + val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc) + if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2 + } + } + if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { + e.dst.data._2 = + if (e.dst.data._2.isEmpty) { + mapFunc(edgeSansAcc.src.id, edgeSansAcc) + } else { + val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc) + if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2 + } + } + } + vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry => + (entry.getIntKey(), entry.getValue()._2) + } + } + .map{ case (vid, aOpt) => (vid, aOpt.get) } + .combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false) + } + + override def updateVertices[U: ClassManifest, VD2: ClassManifest]( + updates: RDD[(Vid, U)], + updateF: (Vertex[VD], Option[U]) => VD2) + : Graph[VD2, ED] = { + + ClosureCleaner.clean(updateF) + + val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter => + iter.map { case (vid, ((vdata, pids), update)) => + val newVdata = updateF(Vertex(vid, vdata), update) + (vid, (newVdata, pids)) + } + }, preservesPartitioning = true).cache() + + new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable) + } + + override def updateVertices2[U: ClassManifest]( + updates: RDD[(Vid, U)], + updateF: (Vertex[VD], U) => VD) + : Graph[VD, ED] = { + + ClosureCleaner.clean(updateF) + + val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter => + iter.map { case (vid, ((vdata, pids), update)) => + if (update.isDefined) { + val newVdata = updateF(Vertex(vid, vdata), update.get) + (vid, (newVdata, pids)) + } else { + (vid, (vdata, pids)) + } + } + }, preservesPartitioning = true).cache() + + new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable) + } + + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // Internals hidden from callers + ////////////////////////////////////////////////////////////////////////////////////////////////// + + // TODO: Support non-hash partitioning schemes. + protected val vertexPartitioner = new HashPartitioner(numVertexPartitions) + protected val edgePartitioner = new HashPartitioner(numEdgePartitions) + + /** Create a new graph but keep the current partitioning scheme. */ + protected def newGraph[VD2: ClassManifest, ED2: ClassManifest]( + vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = { + (new GraphImpl[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions) + } + + protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = { + if (_rawETable == null) { + createETable(_rawEdges, numEdgePartitions) + } else { + _rawETable + } + } + + protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = { + if (_rawVTable == null) { + createVTable(_rawVertices, eTable, numVertexPartitions) + } else { + _rawVTable + } + } + + protected lazy val vTableReplicated: RDD[(Vid, VD)] = { + // Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get + // the shuffle id so we can use it on the slave. + vTable + .flatMap { case (vid, (vdata, pids)) => pids.iterator.map { pid => (pid, (vid, vdata)) } } + .partitionBy(edgePartitioner) + .mapPartitions( + { part => part.map { case(pid, (vid, vdata)) => (vid, vdata) } }, + preservesPartitioning = true) + } +} + + +object GraphImpl { + + /** + * Create the edge table RDD, which is much more efficient for Java heap storage than the + * normal edges data structure (RDD[(Vid, Vid, ED)]). + * + * The edge table contains multiple partitions, and each partition contains only one RDD + * key-value pair: the key is the partition id, and the value is an EdgePartition object + * containing all the edges in a partition. + */ + protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int) + : RDD[(Pid, EdgePartition[ED])] = { + edges + .map { e => + // Random partitioning based on the source vertex id. + (math.abs(e.src) % numPartitions, (e.src, e.dst, e.data)) + } + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitionsWithIndex({ (pid, iter) => + val edgePartition = new EdgePartition[ED] + iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) } + Iterator((pid, edgePartition)) + }, preservesPartitioning = true) + } + + protected def createVTable[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[Vertex[VD]], + eTable: RDD[(Pid, EdgePartition[ED])], + numPartitions: Int) + : RDD[(Vid, (VD, Array[Pid]))] = { + val partitioner = new HashPartitioner(numPartitions) + + // A key-value RDD. The key is a vertex id, and the value is a list of + // partitions that contains edges referencing the vertex. + val vid2pid : RDD[(Int, Seq[Pid])] = eTable.mapPartitions { iter => + val (pid, edgePartition) = iter.next() + val vSet = new it.unimi.dsi.fastutil.ints.IntOpenHashSet + var i = 0 + while (i < edgePartition.srcIds.size) { + vSet.add(edgePartition.srcIds.getInt(i)) + vSet.add(edgePartition.dstIds.getInt(i)) + i += 1 + } + vSet.iterator.map { vid => (vid.intValue, pid) } + }.groupByKey(partitioner) + + vertices + .map { v => (v.id, v.data) } + .partitionBy(partitioner) + .leftOuterJoin(vid2pid) + .mapValues { + case (vdata, None) => (vdata, Array.empty[Pid]) + case (vdata, Some(pids)) => (vdata, pids.toArray) + } + } +} + diff --git a/graph/src/main/scala/spark/graph/BagelTest.scala b/graph/src/main/scala/spark/graph/perf/BagelTest.scala similarity index 93% rename from graph/src/main/scala/spark/graph/BagelTest.scala rename to graph/src/main/scala/spark/graph/perf/BagelTest.scala index eee53bd6f6..7547292500 100644 --- a/graph/src/main/scala/spark/graph/BagelTest.scala +++ b/graph/src/main/scala/spark/graph/perf/BagelTest.scala @@ -1,9 +1,10 @@ -package spark.graph +package spark.graph.perf import spark._ import spark.SparkContext._ import spark.bagel.Bagel import spark.bagel.examples._ +import spark.graph._ object BagelTest { @@ -41,7 +42,7 @@ object BagelTest { } val sc = new SparkContext(host, "PageRank(" + fname + ")") - val g = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() + val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() val startTime = System.currentTimeMillis val numVertices = g.vertices.count() diff --git a/graph/src/main/scala/spark/graph/perf/SparkTest.scala b/graph/src/main/scala/spark/graph/perf/SparkTest.scala new file mode 100644 index 0000000000..85ebd14bcb --- /dev/null +++ b/graph/src/main/scala/spark/graph/perf/SparkTest.scala @@ -0,0 +1,72 @@ +package spark.graph.perf + +import spark._ +import spark.SparkContext._ +import spark.bagel.Bagel +import spark.bagel.examples._ +import spark.graph._ + + +object SparkTest { + + def main(args: Array[String]) { + val host = args(0) + val taskType = args(1) + val fname = args(2) + val options = args.drop(3).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + System.setProperty("spark.serializer", "spark.KryoSerializer") + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator") + + var numIter = Int.MaxValue + var isDynamic = false + var tol:Float = 0.001F + var outFname = "" + var numVPart = 4 + var numEPart = 4 + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("tol", v) => tol = v.toFloat + case ("output", v) => outFname = v + case ("numVPart", v) => numVPart = v.toInt + case ("numEPart", v) => numEPart = v.toInt + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + val sc = new SparkContext(host, "PageRank(" + fname + ")") + val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() + val startTime = System.currentTimeMillis + + val numVertices = g.vertices.count() + + val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => + (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) + } + + // Do the computation + val epsilon = 0.01 / numVertices + val messages = sc.parallelize(Array[(String, PRMessage)]()) + val utils = new PageRankUtils + val result = + Bagel.run( + sc, vertices, messages, combiner = new PRCombiner(), + numPartitions = numVPart)( + utils.computeWithCombiner(numVertices, epsilon, numIter)) + + println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) + if (!outFname.isEmpty) { + println("Saving pageranks of pages to " + outFname) + result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) + } + println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + sc.stop() + } +}