From de3d6ee5a7ee5ed2e0369f05ed045a0cdc6b1668 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 19 Nov 2013 22:03:49 -0800 Subject: [PATCH] Fixing build after merging upstream changes. --- .../org/apache/spark/graph/Analytics.scala | 20 ++--- .../scala/org/apache/spark/graph/Graph.scala | 75 ++++++++++++------- .../org/apache/spark/graph/GraphLoader.scala | 3 +- .../apache/spark/graph/impl/GraphImpl.scala | 42 +++++------ .../spark/graph/util/GraphGenerators.scala | 34 ++++----- 5 files changed, 96 insertions(+), 78 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index d3d65942d0..b1feb5e887 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -351,7 +351,7 @@ object Analytics extends Logging { val sc = new SparkContext(host, "PageRank(" + fname + ")") val graph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() + minEdgePartitions = numEPart).cache() val startTime = System.currentTimeMillis logInfo("GRAPHX: starting tasks") @@ -401,14 +401,10 @@ object Analytics extends Logging { println("======================================") val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") - //val graph = GraphLoader.textFile(sc, fname, a => 1.0F) val graph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() + minEdgePartitions = numEPart).cache() val cc = Analytics.connectedComponents(graph) - //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) - // else Analytics.connectedComponents(graph, numIter) println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) - sc.stop() } @@ -425,14 +421,12 @@ object Analytics extends Logging { println("| Triangle Count |") println("--------------------------------------") val sc = new SparkContext(host, "TriangleCount(" + fname + ")") - //val graph = GraphLoader.textFile(sc, fname, a => 1.0F) - val graph = GraphLoader.edgeListFileUndirected(sc, fname, - minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() + val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, + minEdgePartitions = numEPart).cache() val triangles = Analytics.triangleCount(graph) - //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) - // else Analytics.connectedComponents(graph, numIter) - println("Triangles: " + triangles.vertices.map{ case (vid,data) => data.toLong }.reduce(_+_) /3) - + println("Triangles: " + triangles.vertices.map { + case (vid,data) => data.toLong + }.reduce(_+_) / 3) sc.stop() } diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 9c01d60b59..42ede2d420 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -295,20 +295,6 @@ object Graph { import org.apache.spark.graph.impl._ import org.apache.spark.SparkContext._ - /** - * Construct a graph from a collection of edges. - * - * @param edges the RDD containing the set of edges in the graph - * @param defaultValue the default vertex attribute to use for each vertex - * - * @return a graph with edge attributes described by `edges` and vertices - * given by all vertices in `edges` with value `defaultValue` - */ - def apply[VD: ClassManifest, ED: ClassManifest]( - edges: RDD[Edge[ED]], defaultValue: VD): Graph[VD, ED] = { - GraphImpl(edges, defaultValue) - } - /** * Construct a graph from a collection of edges encoded as vertex id pairs. * @@ -317,7 +303,7 @@ object Graph { * @return a graph with edge attributes containing the count of duplicate edges. */ def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): Graph[VD, Int] = { - Graph(rawEdges, defaultValue, false) + Graph(rawEdges, defaultValue, false, RandomVertexCut()) } /** @@ -334,9 +320,14 @@ object Graph { * attributes containing the total degree of each vertex. * */ - def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD, uniqueEdges: Boolean): + def apply[VD: ClassManifest]( + rawEdges: RDD[(Vid, Vid)], + defaultValue: VD, + uniqueEdges: Boolean, + partitionStrategy: PartitionStrategy): Graph[VD, Int] = { - val graph = GraphImpl(rawEdges.map(p => Edge(p._1, p._2, 1)), defaultValue) + val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) + val graph = GraphImpl(edges, defaultValue, partitionStrategy) if (uniqueEdges) { graph.groupEdges((a,b) => a+b) } else { @@ -344,6 +335,37 @@ object Graph { } } + /** + * Construct a graph from a collection of edges. + * + * @param edges the RDD containing the set of edges in the graph + * @param defaultValue the default vertex attribute to use for each vertex + * + * @return a graph with edge attributes described by `edges` and vertices + * given by all vertices in `edges` with value `defaultValue` + */ + def apply[VD: ClassManifest, ED: ClassManifest]( + edges: RDD[Edge[ED]], + defaultValue: VD): Graph[VD, ED] = { + Graph(edges, defaultValue, RandomVertexCut()) + } + + /** + * Construct a graph from a collection of edges. + * + * @param edges the RDD containing the set of edges in the graph + * @param defaultValue the default vertex attribute to use for each vertex + * + * @return a graph with edge attributes described by `edges` and vertices + * given by all vertices in `edges` with value `defaultValue` + */ + def apply[VD: ClassManifest, ED: ClassManifest]( + edges: RDD[Edge[ED]], + defaultValue: VD, + partitionStrategy: PartitionStrategy): Graph[VD, ED] = { + GraphImpl(edges, defaultValue, partitionStrategy) + } + /** * Construct a graph from a collection attributed vertices and * edges. @@ -362,32 +384,30 @@ object Graph { vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = { val defaultAttr: VD = null.asInstanceOf[VD] - Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a) + Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a, RandomVertexCut()) } - - /** * Construct a graph from a collection attributed vertices and * edges. Duplicate vertices are combined using the `mergeFunc` and * vertices found in the edge collection but not in the input * vertices are the default attribute `defautVertexAttr`. * + * @note Duplicate vertices are removed arbitrarily . + * * @tparam VD the vertex attribute type * @tparam ED the edge attribute type * @param vertices the "set" of vertices and their attributes * @param edges the collection of edges in the graph * @param defaultVertexAttr the default vertex attribute to use for - * vertices that are mentioned in `edges` but not in `vertices - * @param mergeFunc the function used to merge duplicate vertices - * in the `vertices` collection. + * vertices that are mentioned in `edges` but not in `vertices` * */ def apply[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD): Graph[VD, ED] = { - GraphImpl(vertices, edges, defaultVertexAttr, (a,b) => a) + Graph(vertices, edges, defaultVertexAttr, (a,b) => a, RandomVertexCut()) } /** @@ -404,14 +424,17 @@ object Graph { * vertices that are mentioned in `edges` but not in `vertices * @param mergeFunc the function used to merge duplicate vertices * in the `vertices` collection. + * @param partitionStrategy the partition strategy to use when + * partitioning the edges. * */ def apply[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD, - mergeFunc: (VD, VD) => VD): Graph[VD, ED] = { - GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc) + mergeFunc: (VD, VD) => VD, + partitionStrategy: PartitionStrategy): Graph[VD, ED] = { + GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc, partitionStrategy) } /** diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index fadc58284c..d97c028faa 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -26,7 +26,8 @@ object GraphLoader { path: String, edgeParser: Array[String] => ED, minEdgePartitions: Int = 1, - partitionStrategy: PartitionStrategy = RandomVertexCut()): GraphImpl[Int, ED] = { + partitionStrategy: PartitionStrategy = RandomVertexCut()): + Graph[Int, ED] = { // Parse the edge data table val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter => iter.filter(line => !line.isEmpty && line(0) != '#').map { line => diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index f995878bff..650bb515b5 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -255,7 +255,7 @@ object GraphImpl { defaultValue: VD, partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = { - val etable = createETable(edges).cache + val etable = createETable(edges, partitionStrategy).cache // Get the set of all vids val vids = etable.mapPartitions( iter => { val (pid, epart) = iter.next() @@ -271,28 +271,28 @@ object GraphImpl { new GraphImpl(vtable, vid2pid, localVidMap, etable, partitionStrategy) } - def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid, VD)], - edges: RDD[Edge[ED]], - defaultVertexAttr: VD): GraphImpl[VD,ED] = { - apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut()) - } + // def apply[VD: ClassManifest, ED: ClassManifest]( + // vertices: RDD[(Vid, VD)], + // edges: RDD[Edge[ED]], + // defaultVertexAttr: VD): GraphImpl[VD,ED] = { + // apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut()) + // } - def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid, VD)], - edges: RDD[Edge[ED]], - defaultVertexAttr: VD, - partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = { - apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy) - } + // def apply[VD: ClassManifest, ED: ClassManifest]( + // vertices: RDD[(Vid, VD)], + // edges: RDD[Edge[ED]], + // defaultVertexAttr: VD, + // partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = { + // apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy) + // } - def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid, VD)], - edges: RDD[Edge[ED]], - defaultVertexAttr: VD, - mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = { - apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut()) - } + // def apply[VD: ClassManifest, ED: ClassManifest]( + // vertices: RDD[(Vid, VD)], + // edges: RDD[Edge[ED]], + // defaultVertexAttr: VD, + // mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = { + // apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut()) + // } def apply[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[(Vid, VD)], diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index f0e423a57b..4c17bab0c4 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -65,7 +65,7 @@ object GraphGenerators { // Right now it just generates a bunch of edges where // the edge data is the weight (default 1) - def logNormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = { + def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = { // based on Pregel settings val mu = 4 val sigma = 1.3 @@ -75,11 +75,11 @@ object GraphGenerators { src => (src, sampleLogNormal(mu, sigma, numVertices)) } - val edges = vertices.flatMap{ - v => generateRandomEdges(v._1.toInt, v._2, numVertices) + val edges = vertices.flatMap{ + v => generateRandomEdges(v._1.toInt, v._2, numVertices) } - - GraphImpl(vertices, edges, 0) + + Graph(vertices, edges, 0) //println("Vertices:") //for (v <- vertices) { // println(v.id) @@ -137,7 +137,7 @@ object GraphGenerators { - def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): GraphImpl[Int, Int] = { + def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): Graph[Int, Int] = { // let N = requestedNumVertices // the number of vertices is 2^n where n=ceil(log2[N]) // This ensures that the 4 quadrants are the same size at all recursion levels @@ -155,12 +155,12 @@ object GraphGenerators { } - def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { - + def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): Graph[Int, ED] = { + val vertices = edges.flatMap { edge => List((edge.srcId, 1)) } .reduceByKey(_ + _) .map{ case (vid, degree) => (vid, degree) } - GraphImpl(vertices, edges, 0) + Graph(vertices, edges, 0) } /** @@ -192,7 +192,7 @@ object GraphGenerators { * | c | d | | * | | | | * *************** - - * + * * where this represents the subquadrant of the adj matrix currently being * subdivided. (x,y) represent the upper left hand corner of the subquadrant, * and T represents the side length (guaranteed to be a power of 2). @@ -204,7 +204,7 @@ object GraphGenerators { * quad = c, x'=x, y'=y+T/2, T'=T/2 * quad = d, x'=x+T/2, y'=y+T/2, T'=T/2 * - * @param src is the + * @param src is the */ @tailrec def chooseCell(x: Int, y: Int, t: Int): (Int, Int) = { @@ -242,7 +242,7 @@ object GraphGenerators { * Create `rows` by `cols` grid graph with each vertex connected to its * row+1 and col+1 neighbors. Vertex ids are assigned in row major * order. - * + * * @param sc the spark context in which to construct the graph * @param rows the number of rows * @param cols the number of columns @@ -252,12 +252,12 @@ object GraphGenerators { */ def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = { // Convert row column address into vertex ids (row major order) - def sub2ind(r: Int, c: Int): Vid = r * cols + c + def sub2ind(r: Int, c: Int): Vid = r * cols + c - val vertices: RDD[(Vid, (Int,Int))] = + val vertices: RDD[(Vid, (Int,Int))] = sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) - val edges: RDD[Edge[Double]] = - vertices.flatMap{ case (vid, (r,c)) => + val edges: RDD[Edge[Double]] = + vertices.flatMap{ case (vid, (r,c)) => (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++ (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty }) }.map{ case (src, dst) => Edge(src, dst, 1.0) } @@ -266,7 +266,7 @@ object GraphGenerators { /** * Create a star graph with vertex 0 being the center. - * + * * @param sc the spark context in which to construct the graph * @param the number of vertices in the star *