From 494472a6ccba6aa60305f0ba7e59657f0f980709 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Sun, 13 Oct 2013 19:42:32 -0700 Subject: [PATCH] Integrated IndexedRDD into graph design. --- .../apache/spark/IndexedRDDFunctions.scala | 26 + .../org/apache/spark/rdd/IndexedRDD.scala | 5 + .../apache/spark/rdd/PairRDDFunctions.scala | 9 + .../org/apache/spark/graph/Analytics.scala | 27 +- .../scala/org/apache/spark/graph/Graph.scala | 146 ++-- .../org/apache/spark/graph/GraphLab.scala | 37 +- .../org/apache/spark/graph/GraphLoader.scala | 4 +- .../org/apache/spark/graph/GraphOps.scala | 20 +- .../scala/org/apache/spark/graph/Pregel.scala | 27 +- .../spark/graph/impl/EdgePartition.scala | 55 +- .../graph/impl/EdgePartitionBuilder.scala | 31 + .../spark/graph/impl/EdgeTripletRDD.scala | 178 ++--- .../apache/spark/graph/impl/GraphImpl.scala | 700 ++++++++++-------- .../spark/graph/util/GraphGenerators.scala | 24 +- 14 files changed, 749 insertions(+), 540 deletions(-) create mode 100644 graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala diff --git a/core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala index 65c6963b71..358ab57b0c 100644 --- a/core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala @@ -49,6 +49,32 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K } + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + override def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { + val cleanF = self.index.rdd.context.clean(f) + val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ (keysIter, valuesIter) => + val index = keysIter.next() + assert(keysIter.hasNext() == false) + val oldValues = valuesIter.next() + assert(valuesIter.hasNext() == false) + // Allocate the array to store the results into + val newValues: Array[Seq[U]] = new Array[Seq[U]](oldValues.size) + // Populate the new Values + for( (k,i) <- index ) { + if(oldValues(i) != null) { + newValues(i) = oldValues(i).map( v => f(k,v) ) + } + } + Array(newValues.toSeq).iterator + } + new IndexedRDD[K,U](self.index, newValues) + } + + + /** * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala index 79a007a939..8d2e9782c2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -53,6 +53,8 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI rdd.persist(newLevel) return this } + + def partitioner: Partitioner = rdd.partitioner.get } @@ -85,6 +87,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( override val partitioner = index.rdd.partitioner + + + /** * The actual partitions are defined by the tuples. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 435ddb9e94..569d74ae7a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -393,6 +393,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) new MappedValuesRDD(self, cleanF) } + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { + self.map{ case (k,v) => (k, f(k,v)) } + } + /** * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index b411c60cee..49498fbcd4 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -44,9 +44,9 @@ object Analytics extends Logging { numIter: Int, resetProb: Double = 0.15) = { // Compute the out degree of each vertex - val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees, - (vertex, deg) => (deg.getOrElse(0), 1.0) - ) + val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => (deg.getOrElse(0), 1.0) + } println("Vertex Replication: " + pagerankGraph.replication) @@ -59,11 +59,11 @@ object Analytics extends Logging { Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( - (vertex, a: Double) => (vertex.data._1, (resetProb + (1.0 - resetProb) * a)), // apply + (vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply (me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather (a: Double, b: Double) => a + b, // merge 1.0, - numIter).mapVertices{ case Vertex(id, (outDeg, r)) => r } + numIter).mapVertices{ case (id, (outDeg, r)) => r } } /** @@ -74,18 +74,19 @@ object Analytics extends Logging { maxIter: Int = Integer.MAX_VALUE, resetProb: Double = 0.15) = { // Compute the out degree of each vertex - val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double, Double)](graph.outDegrees, - (vertex, degIter) => (degIter.sum, 1.0, 1.0) - ) + val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ + (id, data, degIter) => (degIter.sum, 1.0, 1.0) + } + // Run PageRank GraphLab.iterate(pagerankGraph)( (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather (a: Double, b: Double) => a + b, - (vertex, a: Option[Double]) => - (vertex.data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), vertex.data._2), // apply + (id, data, a: Option[Double]) => + (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply (me_id, edge) => math.abs(edge.src.data._3 - edge.src.data._2) > tol, // scatter - maxIter).mapVertices { case Vertex(vid, data) => data._2 } + maxIter).mapVertices { case (vid, data) => data._2 } } @@ -96,12 +97,12 @@ object Analytics extends Logging { * that vertex. */ def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = { - val ccGraph = graph.mapVertices { case Vertex(vid, _) => vid } + val ccGraph = graph.mapVertices { case (vid, _) => vid } GraphLab.iterate(ccGraph)( (me_id, edge) => edge.otherVertex(me_id).data, // gather (a: Vid, b: Vid) => math.min(a, b), // merge - (v, a: Option[Vid]) => math.min(v.data, a.getOrElse(Long.MaxValue)), // apply + (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply (me_id, edge) => (edge.vertex(me_id).data < edge.otherVertex(me_id).data), // scatter gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both ) diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 61032bf0be..39c699ce8b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -2,6 +2,7 @@ package org.apache.spark.graph import org.apache.spark.rdd.RDD +import org.apache.spark.util.ClosureCleaner @@ -33,7 +34,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * * @todo should vertices return tuples instead of vertex objects? */ - def vertices: RDD[Vertex[VD]] + def vertices: RDD[(Vid,VD)] /** * Get the Edges and their data as an RDD. The entries in the RDD contain @@ -101,7 +102,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * }}} * */ - def mapVertices[VD2: ClassManifest](map: Vertex[VD] => VD2): Graph[VD2, ED] + def mapVertices[VD2: ClassManifest](map: (Vid, VD) => VD2): Graph[VD2, ED] /** * Construct a new graph where each the value of each edge is transformed by @@ -149,13 +150,13 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] - /** - * Remove edges conntecting vertices that are not in the graph. - * - * @todo remove this function and ensure that for a graph G=(V,E): - * if (u,v) in E then u in V and v in V - */ - def correctEdges(): Graph[VD, ED] + // /** + // * Remove edges conntecting vertices that are not in the graph. + // * + // * @todo remove this function and ensure that for a graph G=(V,E): + // * if (u,v) in E then u in V and v in V + // */ + // def correctEdges(): Graph[VD, ED] /** * Construct a new graph with all the edges reversed. If this graph contains @@ -183,8 +184,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @return the subgraph containing only the vertices and edges that satisfy the * predicates. */ - def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (_ => true), - vpred: Vertex[VD] => Boolean = (_ => true) ): Graph[VD, ED] + def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] // /** @@ -200,51 +201,55 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { + def mapReduceTriplets[A: ClassManifest]( + mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], + reduceFunc: (A, A) => A) + : RDD[(Vid, A)] - /** - * This function is used to compute a statistic for the neighborhood of each - * vertex. - * - * This is one of the core functions in the Graph API in that enables - * neighborhood level computation. For example this function can be used to - * count neighbors satisfying a predicate or implement PageRank. - * - * @note The returned RDD may contain fewer entries than their are vertices - * in the graph. This is because some vertices may not have neighbors or the - * map function may return None for all neighbors. - * - * @param mapFunc the function applied to each edge adjacent to each vertex. - * The mapFunc can optionally return None in which case it does not - * contribute to the final sum. - * @param mergeFunc the function used to merge the results of each map - * operation. - * @param direction the direction of edges to consider (e.g., In, Out, Both). - * @tparam VD2 The returned type of the aggregation operation. - * - * @return A Spark.RDD containing tuples of vertex identifiers and thee - * resulting value. Note that the returned RDD may contain fewer vertices - * than in the original graph since some vertices may not have neighbors or - * the map function could return None for all neighbors. - * - * @example We can use this function to compute the average follower age for - * each user - * {{{ - * val graph: Graph[Int,Int] = loadGraph() - * val averageFollowerAge: RDD[(Int, Int)] = - * graph.aggregateNeighbors[(Int,Double)]( - * (vid, edge) => (edge.otherVertex(vid).data, 1), - * (a, b) => (a._1 + b._1, a._2 + b._2), - * EdgeDirection.In) - * .mapValues{ case (sum,followers) => sum.toDouble / followers} - * }}} - * - */ - def aggregateNeighbors[A: ClassManifest]( - mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], - mergeFunc: (A, A) => A, - direction: EdgeDirection) - : Graph[(VD, Option[A]), ED] + // /** + // * This function is used to compute a statistic for the neighborhood of each + // * vertex. + // * + // * This is one of the core functions in the Graph API in that enables + // * neighborhood level computation. For example this function can be used to + // * count neighbors satisfying a predicate or implement PageRank. + // * + // * @note The returned RDD may contain fewer entries than their are vertices + // * in the graph. This is because some vertices may not have neighbors or the + // * map function may return None for all neighbors. + // * + // * @param mapFunc the function applied to each edge adjacent to each vertex. + // * The mapFunc can optionally return None in which case it does not + // * contribute to the final sum. + // * @param mergeFunc the function used to merge the results of each map + // * operation. + // * @param direction the direction of edges to consider (e.g., In, Out, Both). + // * @tparam VD2 The returned type of the aggregation operation. + // * + // * @return A Spark.RDD containing tuples of vertex identifiers and thee + // * resulting value. Note that the returned RDD may contain fewer vertices + // * than in the original graph since some vertices may not have neighbors or + // * the map function could return None for all neighbors. + // * + // * @example We can use this function to compute the average follower age for + // * each user + // * {{{ + // * val graph: Graph[Int,Int] = loadGraph() + // * val averageFollowerAge: RDD[(Int, Int)] = + // * graph.aggregateNeighbors[(Int,Double)]( + // * (vid, edge) => (edge.otherVertex(vid).data, 1), + // * (a, b) => (a._1 + b._1, a._2 + b._2), + // * EdgeDirection.In) + // * .mapValues{ case (sum,followers) => sum.toDouble / followers} + // * }}} + // * + // */ + // def aggregateNeighbors[A: ClassManifest]( + // mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], + // mergeFunc: (A, A) => A, + // direction: EdgeDirection) + // : Graph[(VD, Option[A]), ED] /** * This function is used to compute a statistic for the neighborhood of each @@ -291,9 +296,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { def aggregateNeighbors[A: ClassManifest]( mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], reduceFunc: (A, A) => A, - default: A, // Should this be a function or a value? direction: EdgeDirection) - : Graph[(VD, Option[A]), ED] + : RDD[(Vid, A)] /** @@ -328,9 +332,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * }}} * @todo Is leftJoinVertices the right name? */ - def leftJoinVertices[U: ClassManifest, VD2: ClassManifest]( - table: RDD[(Vid, U)], - mapFunc: (Vertex[VD], Option[U]) => VD2) + def outerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)]) + (mapFunc: (Vid, VD, Option[U]) => VD2) : Graph[VD2, ED] /** @@ -366,10 +369,15 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * graph.joinVertices(tbl)( (v, row) => row ) * }}} */ - def joinVertices[U: ClassManifest]( - table: RDD[(Vid, U)], - mapFunc: (Vertex[VD], U) => VD) - : Graph[VD, ED] + def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD) + : Graph[VD, ED] = { + ClosureCleaner.clean(mapFunc) + def uf(id: Vid, data: VD, o: Option[U]): VD = o match { + case Some(u) => mapFunc(id, data, u) + case None => data + } + outerJoinVertices(table)(uf) + } // Save a copy of the GraphOps object so there is always one unique GraphOps object // for a given Graph object, and thus the lazy vals in GraphOps would work as intended. @@ -391,16 +399,16 @@ object Graph { rawEdges.map { case (s, t) => Edge(s, t, 1) } } // Determine unique vertices - val vertices: RDD[Vertex[Int]] = edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) } - .reduceByKey(_ + _) - .map{ case (id, deg) => Vertex(id, deg) } + val vertices: RDD[(Vid, Int)] = + edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _) + // Return graph - new GraphImpl(vertices, edges) + GraphImpl(vertices, edges) } def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]): Graph[VD, ED] = { - new GraphImpl(vertices, edges) + vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = { + GraphImpl(vertices, edges) } implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index 01f24a1302..ccb1bd8e5d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -36,7 +36,7 @@ object GraphLab { def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A, mergeFunc: (A, A) => A, - applyFunc: (Vertex[VD], Option[A]) => VD, + applyFunc: (Vid, VD, Option[A]) => VD, scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean, numIter: Int = Integer.MAX_VALUE, gatherDirection: EdgeDirection = EdgeDirection.In, @@ -45,7 +45,7 @@ object GraphLab { // Add an active attribute to all vertices to track convergence. var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices { - case Vertex(id, data) => (true, data) + case (id, data) => (true, data) }.cache() // The gather function wrapper strips the active attribute and @@ -64,9 +64,9 @@ object GraphLab { // The apply function wrapper strips the vertex of the active attribute // and only invokes the apply function on active vertices - def apply(v: Vertex[((Boolean, VD), Option[A])]): (Boolean, VD) = { - val ((active, vData), accum) = v.data - if (active) (true, applyFunc(Vertex(v.id, vData), accum)) + def apply(vid: Vid, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = { + val (active, vData) = data + if (active) (true, applyFunc(vid, vData, accum)) else (false, vData) } @@ -89,9 +89,9 @@ object GraphLab { } // Used to set the active status of vertices for the next round - def applyActive(v: Vertex[((Boolean, VD), Option[Boolean])]): (Boolean, VD) = { - val ((prevActive, vData), newActive) = v.data - (newActive.getOrElse(false), vData) + def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = { + val (prevActive, vData) = data + (newActive, vData) } // Main Loop --------------------------------------------------------------------- @@ -99,29 +99,32 @@ object GraphLab { var numActive = activeGraph.numVertices while (i < numIter && numActive > 0) { - val gathered: Graph[((Boolean, VD), Option[A]), ED] = + // Gather + val gathered: RDD[(Vid, A)] = activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) - val applied: Graph[(Boolean, VD), ED] = gathered.mapVertices(apply).cache() + // Apply + activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache() - activeGraph = applied.cache() + // Scatter is basically a gather in the opposite direction so we reverse the edge direction // activeGraph: Graph[(Boolean, VD), ED] - val scattered: Graph[((Boolean, VD), Option[Boolean]), ED] = + val scattered: RDD[(Vid, Boolean)] = activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) - val newActiveGraph: Graph[(Boolean, VD), ED] = - scattered.mapVertices(applyActive) - activeGraph = newActiveGraph.cache() + activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache() - numActive = activeGraph.vertices.map(v => if (v.data._1) 1 else 0).reduce(_ + _) + // Calculate the number of active vertices + numActive = activeGraph.vertices.map{ + case (vid, data) => if (data._1) 1 else 0 + }.reduce(_ + _) println("Number active vertices: " + numActive) i += 1 } // Remove the active attribute from the vertex data before returning the graph - activeGraph.mapVertices(v => v.data._2) + activeGraph.mapVertices{case (vid, data) => data._2 } } } diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 4d7ca1268d..903e407b2d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -48,7 +48,7 @@ object GraphLoader { def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) } .reduceByKey(_ + _) - .map{ case (vid, degree) => Vertex(vid, degree) } - new GraphImpl[Int, ED](vertices, edges) + .map{ case (vid, degree) => (vid, degree) } + GraphImpl(vertices, edges) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index 9e8cc0a6d5..23c783ba3a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -1,7 +1,7 @@ package org.apache.spark.graph import org.apache.spark.rdd.RDD - +import org.apache.spark.SparkContext._ class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) { @@ -16,22 +16,18 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) { lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both) def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { - val graph: Graph[(VD, Option[Array[Vid]]), ED] = g.aggregateNeighbors( + val nbrs = g.aggregateNeighbors[Array[Vid]]( (vid, edge) => Some(Array(edge.otherVertex(vid).id)), (a, b) => a ++ b, edgeDirection) - graph.vertices.map(v => { - val (_, neighborIds) = v.data - (v.id, neighborIds.getOrElse(Array())) - }) + + g.vertices.leftOuterJoin(nbrs).mapValues{ + case (_, Some(nbrs)) => nbrs + case (_, None) => Array.empty[Vid] + } } private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = { - val degreeGraph: Graph[(VD, Option[Int]), ED] = - g.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection) - degreeGraph.vertices.map(v => { - val (_, degree) = v.data - (v.id, degree.getOrElse(0)) - }) + g.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 09bcc67c8c..93c9c09ee3 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -6,7 +6,7 @@ import org.apache.spark.rdd.RDD object Pregel { def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( - vprog: (Vertex[VD], A) => VD, + vprog: (Vid, VD, A) => VD, sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], mergeMsg: (A, A) => A, initialMsg: A, @@ -19,25 +19,26 @@ object Pregel { def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge) - def runProg(vertexWithMsgs: Vertex[(VD, Option[A])]): VD = { - val (vData, msg) = vertexWithMsgs.data - val v = Vertex(vertexWithMsgs.id, vData) + def runProg(id: Vid, data: (VD, Option[A]) ): VD = { + val (vData, msg) = data msg match { - case Some(m) => vprog(v, m) - case None => v.data + case Some(m) => vprog(id, vData, m) + case None => vData } } - var graphWithMsgs: Graph[(VD, Option[A]), ED] = - g.mapVertices(v => (v.data, Some(initialMsg))) + // Receive the first set of messages + g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) while (i < numIter) { - val newGraph: Graph[VD, ED] = graphWithMsgs.mapVertices(runProg).cache() - graphWithMsgs = newGraph.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) + // compute the messages + val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) + // receive the messages + g = g.joinVertices(messages)(vprog) + // count the iteration i += 1 } - graphWithMsgs.mapVertices(vertexWithMsgs => vertexWithMsgs.data match { - case (vData, _) => vData - }) + // Return the final graph + g } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index 3d218f27b1..f0d9080d97 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -1,9 +1,6 @@ package org.apache.spark.graph.impl import scala.collection.mutable.ArrayBuilder - -import it.unimi.dsi.fastutil.ints.IntArrayList - import org.apache.spark.graph._ @@ -11,29 +8,43 @@ import org.apache.spark.graph._ * A partition of edges in 3 large columnar arrays. */ private[graph] -class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] { +class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]( + val srcIds: Array[Vid], + val dstIds: Array[Vid], + val data: Array[ED] + ){ - private var _data: Array[ED] = _ - private var _dataBuilder = ArrayBuilder.make[ED] + // private var _data: Array[ED] = _ + // private var _dataBuilder = ArrayBuilder.make[ED] - val srcIds = new VertexArrayList - val dstIds = new VertexArrayList + // var srcIds = new VertexArrayList + // var dstIds = new VertexArrayList - def data: Array[ED] = _data + def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data) - /** Add a new edge to the partition. */ - def add(src: Vid, dst: Vid, d: ED) { - srcIds.add(src) - dstIds.add(dst) - _dataBuilder += d + def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = { + val newData = new Array[ED2](data.size) + val edge = new Edge[ED]() + for(i <- 0 until data.size){ + edge.src = srcIds(i) + edge.dst = dstIds(i) + edge.data = data(i) + newData(i) = f(edge) + } + new EdgePartition(srcIds, dstIds, newData) } - def trim() { - srcIds.trim() - dstIds.trim() - _data = _dataBuilder.result() + def foreach(f: Edge[ED] => Unit) { + val edge = new Edge[ED] + for(i <- 0 until data.size){ + edge.src = srcIds(i) + edge.dst = dstIds(i) + edge.data = data(i) + f(edge) + } } + def size: Int = srcIds.size def iterator = new Iterator[Edge[ED]] { @@ -43,11 +54,13 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) override def hasNext: Boolean = pos < EdgePartition.this.size override def next(): Edge[ED] = { - edge.src = srcIds.get(pos) - edge.dst = dstIds.get(pos) - edge.data = _data(pos) + edge.src = srcIds(pos) + edge.dst = dstIds(pos) + edge.data = data(pos) pos += 1 edge } } } + + diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala new file mode 100644 index 0000000000..f2d07d55c6 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala @@ -0,0 +1,31 @@ +package org.apache.spark.graph.impl + +import scala.collection.mutable.ArrayBuilder +import org.apache.spark.graph._ + + +private[graph] +class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) +ED: ClassManifest]{ + val srcIds = new VertexArrayList + val dstIds = new VertexArrayList + var dataBuilder = ArrayBuilder.make[ED] + + + /** Add a new edge to the partition. */ + def add(src: Vid, dst: Vid, d: ED) { + srcIds.add(src) + dstIds.add(dst) + dataBuilder += d + } + + def toEdgePartition: EdgePartition[ED] = { + new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result()) + } + + +} + + + + diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala index 1cd48120a1..6779f4aa09 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala @@ -1,112 +1,112 @@ -package org.apache.spark.graph.impl +// package org.apache.spark.graph.impl -import scala.collection.mutable +// import scala.collection.mutable -import org.apache.spark.Aggregator -import org.apache.spark.Partition -import org.apache.spark.SparkEnv -import org.apache.spark.TaskContext -import org.apache.spark.rdd.RDD -import org.apache.spark.Dependency -import org.apache.spark.OneToOneDependency -import org.apache.spark.ShuffleDependency -import org.apache.spark.SparkContext._ -import org.apache.spark.graph._ +// import org.apache.spark.Aggregator +// import org.apache.spark.Partition +// import org.apache.spark.SparkEnv +// import org.apache.spark.TaskContext +// import org.apache.spark.rdd.RDD +// import org.apache.spark.Dependency +// import org.apache.spark.OneToOneDependency +// import org.apache.spark.ShuffleDependency +// import org.apache.spark.SparkContext._ +// import org.apache.spark.graph._ -private[graph] -class EdgeTripletPartition(idx: Int, val vPart: Partition, val ePart: Partition) - extends Partition { - override val index: Int = idx - override def hashCode(): Int = idx -} +// private[graph] +// class EdgeTripletPartition(idx: Int, val vPart: Partition, val ePart: Partition) +// extends Partition { +// override val index: Int = idx +// override def hashCode(): Int = idx +// } -/** - * A RDD that brings together edge data with its associated vertex data. - */ -private[graph] -class EdgeTripletRDD[VD: ClassManifest, ED: ClassManifest]( - vTableReplicated: RDD[(Vid, VD)], - eTable: RDD[(Pid, EdgePartition[ED])]) - extends RDD[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])](eTable.context, Nil) { +// /** +// * A RDD that brings together edge data with its associated vertex data. +// */ +// private[graph] +// class EdgeTripletRDD[VD: ClassManifest, ED: ClassManifest]( +// vTableReplicated: IndexedRDD[Pid, VertexHashMap[VD]], +// eTable: IndexedRDD[Pid, EdgePartition[ED]]) +// extends RDD[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])](eTable.context, Nil) { - //println("ddshfkdfhds" + vTableReplicated.partitioner.get.numPartitions) - //println("9757984589347598734549" + eTable.partitioner.get.numPartitions) +// //println("ddshfkdfhds" + vTableReplicated.partitioner.get.numPartitions) +// //println("9757984589347598734549" + eTable.partitioner.get.numPartitions) - assert(vTableReplicated.partitioner == eTable.partitioner) +// assert(vTableReplicated.partitioner == eTable.partitioner) - override def getDependencies: List[Dependency[_]] = { - List(new OneToOneDependency(eTable), new OneToOneDependency(vTableReplicated)) - } +// override def getDependencies: List[Dependency[_]] = { +// List(new OneToOneDependency(eTable), new OneToOneDependency(vTableReplicated)) +// } - override def getPartitions = Array.tabulate[Partition](eTable.partitions.size) { - i => new EdgeTripletPartition(i, eTable.partitions(i), vTableReplicated.partitions(i)) - } +// override def getPartitions = Array.tabulate[Partition](eTable.partitions.size) { +// i => new EdgeTripletPartition(i, eTable.partitions(i), vTableReplicated.partitions(i)) +// } - override val partitioner = eTable.partitioner +// override val partitioner = eTable.partitioner - override def getPreferredLocations(s: Partition) = - eTable.preferredLocations(s.asInstanceOf[EdgeTripletPartition].ePart) +// override def getPreferredLocations(s: Partition) = +// eTable.preferredLocations(s.asInstanceOf[EdgeTripletPartition].ePart) - override def compute(s: Partition, context: TaskContext) - : Iterator[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])] = { +// override def compute(s: Partition, context: TaskContext) +// : Iterator[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])] = { - val split = s.asInstanceOf[EdgeTripletPartition] +// val split = s.asInstanceOf[EdgeTripletPartition] - // Fetch the vertices and put them in a hashmap. - // TODO: use primitive hashmaps for primitive VD types. - val vmap = new VertexHashMap[VD]//(1000000) - vTableReplicated.iterator(split.vPart, context).foreach { v => vmap.put(v._1, v._2) } +// // Fetch the vertices and put them in a hashmap. +// // TODO: use primitive hashmaps for primitive VD types. +// val vmap = new VertexHashMap[VD]//(1000000) +// vTableReplicated.iterator(split.vPart, context).foreach { v => vmap.put(v._1, v._2) } - val (pid, edgePartition) = eTable.iterator(split.ePart, context).next() - .asInstanceOf[(Pid, EdgePartition[ED])] +// val (pid, edgePartition) = eTable.iterator(split.ePart, context).next() +// .asInstanceOf[(Pid, EdgePartition[ED])] - // Return an iterator that looks up the hash map to find matching vertices for each edge. - val iter = new Iterator[EdgeTriplet[VD, ED]] { - private var pos = 0 - private val e = new EdgeTriplet[VD, ED] - e.src = new Vertex[VD] - e.dst = new Vertex[VD] +// // Return an iterator that looks up the hash map to find matching vertices for each edge. +// val iter = new Iterator[EdgeTriplet[VD, ED]] { +// private var pos = 0 +// private val e = new EdgeTriplet[VD, ED] +// e.src = new Vertex[VD] +// e.dst = new Vertex[VD] - override def hasNext: Boolean = pos < edgePartition.size - override def next() = { - e.src.id = edgePartition.srcIds.getLong(pos) - // assert(vmap.containsKey(e.src.id)) - e.src.data = vmap.get(e.src.id) +// override def hasNext: Boolean = pos < edgePartition.size +// override def next() = { +// e.src.id = edgePartition.srcIds.getLong(pos) +// // assert(vmap.containsKey(e.src.id)) +// e.src.data = vmap.get(e.src.id) - e.dst.id = edgePartition.dstIds.getLong(pos) - // assert(vmap.containsKey(e.dst.id)) - e.dst.data = vmap.get(e.dst.id) +// e.dst.id = edgePartition.dstIds.getLong(pos) +// // assert(vmap.containsKey(e.dst.id)) +// e.dst.data = vmap.get(e.dst.id) - //println("Iter called: " + pos) - e.data = edgePartition.data(pos) - pos += 1 - e - } +// //println("Iter called: " + pos) +// e.data = edgePartition.data(pos) +// pos += 1 +// e +// } - override def toList: List[EdgeTriplet[VD, ED]] = { - val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]] - for (i <- (0 until edgePartition.size)) { - val currentEdge = new EdgeTriplet[VD, ED] - currentEdge.src = new Vertex[VD] - currentEdge.dst = new Vertex[VD] - currentEdge.src.id = edgePartition.srcIds.getLong(i) - // assert(vmap.containsKey(e.src.id)) - currentEdge.src.data = vmap.get(currentEdge.src.id) +// override def toList: List[EdgeTriplet[VD, ED]] = { +// val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]] +// for (i <- (0 until edgePartition.size)) { +// val currentEdge = new EdgeTriplet[VD, ED] +// currentEdge.src = new Vertex[VD] +// currentEdge.dst = new Vertex[VD] +// currentEdge.src.id = edgePartition.srcIds.getLong(i) +// // assert(vmap.containsKey(e.src.id)) +// currentEdge.src.data = vmap.get(currentEdge.src.id) - currentEdge.dst.id = edgePartition.dstIds.getLong(i) - // assert(vmap.containsKey(e.dst.id)) - currentEdge.dst.data = vmap.get(currentEdge.dst.id) +// currentEdge.dst.id = edgePartition.dstIds.getLong(i) +// // assert(vmap.containsKey(e.dst.id)) +// currentEdge.dst.data = vmap.get(currentEdge.dst.id) - currentEdge.data = edgePartition.data(i) - //println("Iter: " + pos + " " + e.src.id + " " + e.dst.id + " " + e.data) - //println("List: " + i + " " + currentEdge.src.id + " " + currentEdge.dst.id + " " + currentEdge.data) - lb += currentEdge - } - lb.toList - } - } - Iterator((vmap, iter)) - } -} +// currentEdge.data = edgePartition.data(i) +// //println("Iter: " + pos + " " + e.src.id + " " + e.dst.id + " " + e.data) +// //println("List: " + i + " " + currentEdge.src.id + " " + currentEdge.dst.id + " " + currentEdge.data) +// lb += currentEdge +// } +// lb.toList +// } +// } +// Iterator((vmap, iter)) +// } +// } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index e178df3841..45dc863a6b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -2,12 +2,18 @@ package org.apache.spark.graph.impl import scala.collection.JavaConversions._ +import scala.collection.mutable + import org.apache.spark.SparkContext._ import org.apache.spark.Partitioner import org.apache.spark.HashPartitioner import org.apache.spark.util.ClosureCleaner +import org.apache.spark.rdd import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.IndexedRDD +import org.apache.spark.rdd.RDDIndex + import org.apache.spark.graph._ import org.apache.spark.graph.impl.GraphImpl._ @@ -18,112 +24,224 @@ import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._ * A Graph RDD that supports computation on graphs. */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( - val numVertexPartitions: Int, - val numEdgePartitions: Int, - _rawVertices: RDD[Vertex[VD]], - _rawEdges: RDD[Edge[ED]], - _rawVTable: RDD[(Vid, (VD, Array[Pid]))], - _rawETable: RDD[(Pid, EdgePartition[ED])]) + val vTable: IndexedRDD[Vid, VD], + val vid2pid: IndexedRDD[Vid, Pid], + val eTable: IndexedRDD[Pid, EdgePartition[ED]]) extends Graph[VD, ED] { - def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = { - this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null) + + /** + * The vTableReplicated is a version of the vertex data after it is + * replicated. + */ + val vTableReplicated: IndexedRDD[Pid, VertexHashMap[VD]] = { + // Join vid2pid and vTable, generate a shuffle dependency on the joined + // result, and get the shuffle id so we can use it on the slave. + vTable.cogroup(vid2pid) + .flatMap { case (vid, (vdatas, pids)) => + pids.iterator.map { + pid => MessageToPartition(pid, (vid, vdatas.head)) + } + } + .partitionBy(eTable.partitioner.get) //@todo assert edge table has partitioner + .mapPartitionsWithIndex( (pid, iter) => { + // Build the hashmap for each partition + val vmap = new VertexHashMap[VD] + for( msg <- iter ) { vmap.put(msg.data._1, msg.data._2) } + Array((pid, vmap)).iterator + }, preservesPartitioning = true) + .indexed(eTable.index) } - def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = { - if (_cached) { - new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable) - .cache() - } else { - new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null) - } - } - def withVertexPartitioner(numVertexPartitions: Int) = { - withPartitioner(numVertexPartitions, numEdgePartitions) - } - def withEdgePartitioner(numEdgePartitions: Int) = { - withPartitioner(numVertexPartitions, numEdgePartitions) - } - protected var _cached = false + + + // def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = { + // this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null) + // } + + // def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = { + // if (_cached) { + // new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable) + // .cache() + // } else { + // new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null) + // } + // } + + // def withVertexPartitioner(numVertexPartitions: Int) = { + // withPartitioner(numVertexPartitions, numEdgePartitions) + // } + + // def withEdgePartitioner(numEdgePartitions: Int) = { + // withPartitioner(numVertexPartitions, numEdgePartitions) + // } + + override def cache(): Graph[VD, ED] = { eTable.cache() + vid2pid.cache() vTable.cache() - _cached = true + // @todo: should we cache the replicated data? + vTableReplicated.cache() this } override def replication(): Double = { - val rep = vTable.map{ case (_, (_, a)) => a.size }.sum + val rep = vid2pid.groupByKey().map(kv => kv._2.size).sum rep / vTable.count } override def balance(): Array[Int] = { - eTable.map{ case (_, epart) => epart.data.size }.collect + eTable.map{ case (pid, epart) => epart.data.size }.collect } override def reverse: Graph[VD, ED] = { - newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) }) + val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]] + new GraphImpl(vTable, vid2pid, etable) } /** Return a RDD of vertices. */ - override def vertices: RDD[Vertex[VD]] = { - if (!_cached && _rawVertices != null) { - _rawVertices - } else { - vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } - } - } + override def vertices: RDD[(Vid, VD)] = vTable + /** Return a RDD of edges. */ override def edges: RDD[Edge[ED]] = { - if (!_cached && _rawEdges != null) { - _rawEdges - } else { - eTable.mapPartitions { iter => iter.next()._2.iterator } - } + eTable.mapPartitions { iter => iter.next()._2.iterator } } /** Return a RDD that brings edges with its source and destination vertices together. */ override def triplets: RDD[EdgeTriplet[VD, ED]] = { - new EdgeTripletRDD(vTableReplicated, eTable).mapPartitions { part => part.next()._2 } + vTableReplicated.join(eTable) + .mapPartitions{ iter => + val (pid, (vmap, edgePartition)) = iter.next() + assert(iter.hasNext == false) + // Return an iterator that looks up the hash map to find matching + // vertices for each edge. + new Iterator[EdgeTriplet[VD, ED]] { + private var pos = 0 + private val e = new EdgeTriplet[VD, ED] + e.src = new Vertex[VD] + e.dst = new Vertex[VD] + + override def hasNext: Boolean = pos < edgePartition.size + override def next() = { + e.src.id = edgePartition.srcIds(pos) + // assert(vmap.containsKey(e.src.id)) + e.src.data = vmap.get(e.src.id) + e.dst.id = edgePartition.dstIds(pos) + // assert(vmap.containsKey(e.dst.id)) + e.dst.data = vmap.get(e.dst.id) + //println("Iter called: " + pos) + e.data = edgePartition.data(pos) + pos += 1 + e + } + + override def toList: List[EdgeTriplet[VD, ED]] = { + val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]] + for (i <- (0 until edgePartition.size)) { + val currentEdge = new EdgeTriplet[VD, ED] + currentEdge.src = new Vertex[VD] + currentEdge.dst = new Vertex[VD] + currentEdge.src.id = edgePartition.srcIds(i) + // assert(vmap.containsKey(e.src.id)) + currentEdge.src.data = vmap.get(currentEdge.src.id) + + currentEdge.dst.id = edgePartition.dstIds(i) + // assert(vmap.containsKey(e.dst.id)) + currentEdge.dst.data = vmap.get(currentEdge.dst.id) + + currentEdge.data = edgePartition.data(i) + lb += currentEdge + } + lb.toList + } + } // end of iterator + } // end of map partition } - override def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] = { - newGraph(vertices.map(v => Vertex(v.id, f(v))), edges) + override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { + val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data)) + .asInstanceOf[IndexedRDD[Vid, VD2]] + new GraphImpl(newVTable, vid2pid, eTable) } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = { - newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e)))) + val newETable = eTable.mapValues(eBlock => eBlock.map(f)) + .asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]] + new GraphImpl(vTable, vid2pid, newETable) } + override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { - newGraph(vertices, triplets.map(e => Edge(e.src.id, e.dst.id, f(e)))) + val newETable = eTable.join(vTableReplicated).mapValues{ + case (edgePartition, vmap) => + val et = new EdgeTriplet[VD, ED] + et.src = new Vertex[VD] + et.dst = new Vertex[VD] + + edgePartition.map{e => + et.data = e.data + et.src.id = e.src + et.src.data = vmap(e.src) + et.dst.id = e.dst + et.dst.data = vmap(e.dst) + f(et) + } + }.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]] + new GraphImpl(vTable, vid2pid, newETable) } - override def correctEdges(): Graph[VD, ED] = { - val sc = vertices.context - val vset = sc.broadcast(vertices.map(_.id).collect().toSet) - val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst)) - Graph(vertices, newEdges) - } + // override def correctEdges(): Graph[VD, ED] = { + // val sc = vertices.context + // val vset = sc.broadcast(vertices.map(_.id).collect().toSet) + // val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst)) + // Graph(vertices, newEdges) + // } - override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (_ => true), - vpred: Vertex[VD] => Boolean = (_ => true) ): Graph[VD, ED] = { + override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = { + + /// @todo: The following code behaves deterministically on each + /// vertex predicate but uses additional space. Should we swithc to + /// this version + // val predGraph = mapVertices(v => (v.data, vpred(v))) + // val newETable = predGraph.triplets.filter(t => + // if(v.src.data._2 && v.dst.data._2) { + // val src = Vertex(t.src.id, t.src.data._1) + // val dst = Vertex(t.dst.id, t.dst.data._1) + // epred(new EdgeTriplet[VD, ED](src, dst, t.data)) + // } else { false }) + + // val newVTable = predGraph.vertices.filter(v => v.data._1) + // .map(v => (v.id, v.data._1)).indexed() + + // Reuse the partitioner (but not the index) from this graph + val newVTable = vertices.filter(v => vpred(v._1, v._2)).indexed(vTable.index.partitioner) + - // Restrict the set of vertices to those that satisfy the vertex predicate - val newVertices = vertices.filter(vpred) // Restrict the set of edges to those that satisfy the vertex and the edge predicate. - val newEdges = triplets.filter(t => vpred(t.src) && vpred(t.dst) && epred(t)) - .map( t => Edge(t.src.id, t.dst.id, t.data) ) + val newETable = createETable( + triplets.filter( + t => vpred( t.src.id, t.src.data ) && vpred( t.dst.id, t.dst.data ) && epred(t) + ) + .map( t => Edge(t.src.id, t.dst.id, t.data) ), + eTable.index.partitioner.numPartitions + ) - new GraphImpl(newVertices, newEdges) + // Construct the Vid2Pid map. Here we assume that the filter operation + // behaves deterministically. + // @todo reindex the vertex and edge tables + val newVid2Pid = createVid2Pid(newETable, newVTable.index) + + new GraphImpl(newVTable, newVid2Pid, newETable) } @@ -135,10 +253,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // TODO(crankshaw) is there a better way to do this using RDD.groupBy() // functions? - override def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): + override def groupEdgeTriplets[ED2: ClassManifest]( + f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = { //override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): - Graph[VD,ED2] = { - + // I think that // myRDD.mapPartitions { part => // val (vmap, edges) = part.next() @@ -169,7 +287,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) } // convert the resulting map back to a list of tuples .toList - // TODO(crankshaw) needs an iterator over the tuples? Why can't I map over the list? + // TODO(crankshaw) needs an iterator over the tuples? + // Why can't I map over the list? .toIterator // map over those tuples that contain src and dst info plus the // new edge data to make my new edges @@ -185,7 +304,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list } - newGraph(vertices, newEdges) + + // @todo eliminate the need to call createETable + val newETable = createETable(newEdges, + eTable.index.partitioner.numPartitions) + + new GraphImpl(vTable, vid2pid, newETable) } @@ -202,11 +326,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( .toList .toIterator .map { case ((src, dst), data) => Edge(src, dst, data) } - - } - newGraph(vertices, newEdges) + // @todo eliminate the need to call createETable + val newETable = createETable(newEdges, + eTable.index.partitioner.numPartitions) + new GraphImpl(vTable, vid2pid, newETable) } @@ -215,156 +340,90 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Lower level transformation methods ////////////////////////////////////////////////////////////////////////////////////////////////// - override def aggregateNeighbors[A: ClassManifest]( + override def mapReduceTriplets[A: ClassManifest]( + mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], + reduceFunc: (A, A) => A) + : RDD[(Vid, A)] = { + + ClosureCleaner.clean(mapFunc) + ClosureCleaner.clean(reduceFunc) + + val newVTable: RDD[(Vid, A)] = + vTableReplicated.join(eTable).flatMap{ + case (pid, (vmap, edgePartition)) => + val aggMap = new VertexHashMap[A] + val et = new EdgeTriplet[VD, ED] + et.src = new Vertex[VD] + et.dst = new Vertex[VD] + edgePartition.foreach{e => + et.data = e.data + et.src.id = e.src + et.src.data = vmap(e.src) + et.dst.id = e.dst + et.dst.data = vmap(e.dst) + mapFunc(et).foreach{case (vid, a) => + if(aggMap.containsKey(vid)) { + aggMap.put(vid, reduceFunc(aggMap.get(vid), a)) + } else { aggMap.put(vid, a) } + } + } + // Return the aggregate map + aggMap.long2ObjectEntrySet().fastIterator().map{ + entry => (entry.getLongKey(), entry.getValue()) + } + } + .indexed(vTable.index).reduceByKey(reduceFunc) + + newVTable + } + + def aggregateNeighbors[A: ClassManifest]( mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], reduceFunc: (A, A) => A, - default: A, - gatherDirection: EdgeDirection) - : Graph[(VD, Option[A]), ED] = { + dir: EdgeDirection) + : RDD[(Vid, A)] = { ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(reduceFunc) - val newVTable = vTableReplicated.mapPartitions({ part => - part.map { v => (v._1, MutableTuple2(v._2, Option.empty[A])) } - }, preservesPartitioning = true) + // Define a new map function over edge triplets + def mf(et: EdgeTriplet[VD,ED]): Array[(Vid, A)] = { + // Compute the message to the dst vertex + val dstA = + if (dir == EdgeDirection.In || dir == EdgeDirection.Both) { + mapFunc(et.dst.id, et) + } else { Option.empty[A] } + // Compute the message to the source vertex + val srcA = + if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) { + mapFunc(et.src.id, et) + } else { Option.empty[A] } + // construct the return array + (srcA, dstA) match { + case (None, None) => Array.empty[(Vid, A)] + case (Some(src),None) => Array((et.src.id, src)) + case (None, Some(dst)) => Array((et.dst.id, dst)) + case (Some(src), Some(dst)) => + Array((et.src.id, src), (et.dst.id, dst)) + } + } - val newVertices: RDD[(Vid, A)] = - new EdgeTripletRDD[MutableTuple2[VD, Option[A]], ED](newVTable, eTable) - .mapPartitions { part => - val (vmap, edges) = part.next() - val edgeSansAcc = new EdgeTriplet[VD, ED]() - edgeSansAcc.src = new Vertex[VD] - edgeSansAcc.dst = new Vertex[VD] - edges.foreach { e: EdgeTriplet[MutableTuple2[VD, Option[A]], ED] => - edgeSansAcc.data = e.data - edgeSansAcc.src.data = e.src.data._1 - edgeSansAcc.dst.data = e.dst.data._1 - edgeSansAcc.src.id = e.src.id - edgeSansAcc.dst.id = e.dst.id - if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) { - e.dst.data._2 = - if (e.dst.data._2.isEmpty) { - mapFunc(edgeSansAcc.dst.id, edgeSansAcc) - } else { - val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc) - if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2 - } - } - if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { - e.dst.data._2 = - if (e.dst.data._2.isEmpty) { - mapFunc(edgeSansAcc.src.id, edgeSansAcc) - } else { - val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc) - if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2 - } - } - } - vmap.long2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry => - (entry.getLongKey(), entry.getValue()._2) - } - } - .map{ case (vid, aOpt) => (vid, aOpt.get) } - .combineByKey((v: A) => v, reduceFunc, null, vertexPartitioner, false) - - this.leftJoinVertices(newVertices, (v: Vertex[VD], a: Option[A]) => (v.data, a)) + mapReduceTriplets(mf, reduceFunc) } - /** - * Same as aggregateNeighbors but map function can return none and there is no default value. - * As a consequence, the resulting table may be much smaller than the set of vertices. - */ - override def aggregateNeighbors[A: ClassManifest]( - mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], - reduceFunc: (A, A) => A, - gatherDirection: EdgeDirection): Graph[(VD, Option[A]), ED] = { - ClosureCleaner.clean(mapFunc) - ClosureCleaner.clean(reduceFunc) - val newVTable = vTableReplicated.mapPartitions({ part => - part.map { v => (v._1, MutableTuple2(v._2, Option.empty[A])) } - }, preservesPartitioning = true) - val newVertices: RDD[(Vid, A)] = - new EdgeTripletRDD[MutableTuple2[VD, Option[A]], ED](newVTable, eTable) - .mapPartitions { part => - val (vmap, edges) = part.next() - val edgeSansAcc = new EdgeTriplet[VD, ED]() - edgeSansAcc.src = new Vertex[VD] - edgeSansAcc.dst = new Vertex[VD] - edges.foreach { e: EdgeTriplet[MutableTuple2[VD, Option[A]], ED] => - edgeSansAcc.data = e.data - edgeSansAcc.src.data = e.src.data._1 - edgeSansAcc.dst.data = e.dst.data._1 - edgeSansAcc.src.id = e.src.id - edgeSansAcc.dst.id = e.dst.id - if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) { - e.dst.data._2 = - if (e.dst.data._2.isEmpty) { - mapFunc(edgeSansAcc.dst.id, edgeSansAcc) - } else { - val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc) - if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2 - } - } - if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { - e.src.data._2 = - if (e.src.data._2.isEmpty) { - mapFunc(edgeSansAcc.src.id, edgeSansAcc) - } else { - val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc) - if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2 - } - } - } - vmap.long2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry => - (entry.getLongKey(), entry.getValue()._2) - } - } - .map{ case (vid, aOpt) => (vid, aOpt.get) } - .combineByKey((v: A) => v, reduceFunc, null, vertexPartitioner, false) - - this.leftJoinVertices(newVertices, (v: Vertex[VD], a: Option[A]) => (v.data, a)) - } - - override def leftJoinVertices[U: ClassManifest, VD2: ClassManifest]( - updates: RDD[(Vid, U)], - updateF: (Vertex[VD], Option[U]) => VD2) + override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest] + (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2) : Graph[VD2, ED] = { ClosureCleaner.clean(updateF) - val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter => - iter.map { case (vid, ((vdata, pids), update)) => - val newVdata = updateF(Vertex(vid, vdata), update) - (vid, (newVdata, pids)) - } - }, preservesPartitioning = true).cache() - - new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable) - } - - override def joinVertices[U: ClassManifest]( - updates: RDD[(Vid, U)], - updateF: (Vertex[VD], U) => VD) - : Graph[VD, ED] = { - - ClosureCleaner.clean(updateF) - - val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter => - iter.map { case (vid, ((vdata, pids), update)) => - if (update.isDefined) { - val newVdata = updateF(Vertex(vid, vdata), update.get) - (vid, (newVdata, pids)) - } else { - (vid, (vdata, pids)) - } - } - }, preservesPartitioning = true).cache() - - new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable) + val newVTable = vTable.leftOuterJoin(updates).mapValuesWithKeys{ + case (vid, (data, other)) => updateF(vid, data, other) + }.asInstanceOf[IndexedRDD[Vid,VD2]] + new GraphImpl(newVTable, vid2pid, eTable) } @@ -372,49 +431,130 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Internals hidden from callers ////////////////////////////////////////////////////////////////////////////////////////////////// - // TODO: Support non-hash partitioning schemes. - protected val vertexPartitioner = new HashPartitioner(numVertexPartitions) - protected val edgePartitioner = new HashPartitioner(numEdgePartitions) - /** Create a new graph but keep the current partitioning scheme. */ - protected def newGraph[VD2: ClassManifest, ED2: ClassManifest]( - vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = { - (new GraphImpl[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions) - } + - protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = { - if (_rawETable == null) { - createETable(_rawEdges, numEdgePartitions) - } else { - _rawETable - } - } - protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = { - if (_rawVTable == null) { - createVTable(_rawVertices, eTable, numVertexPartitions) - } else { - _rawVTable - } - } + // /** Create a new graph but keep the current partitioning scheme. */ + // protected def newGraph[VD2: ClassManifest, ED2: ClassManifest]( + // vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = { + // (new GraphImpl[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions) + // } - protected lazy val vTableReplicated: RDD[(Vid, VD)] = { - // Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get - // the shuffle id so we can use it on the slave. - vTable - .flatMap { case (vid, (vdata, pids)) => - pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } - } - .partitionBy(edgePartitioner) - .mapPartitions({ part => - part.map { message => (message.data._1, message.data._2) } - }, preservesPartitioning = true) - } + // protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = { + // if (_rawETable == null) { + // createETable(_rawEdges, numEdgePartitions) + // } else { + // _rawETable + // } + // } + + // protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = { + // if (_rawVTable == null) { + // createVTable(_rawVertices, eTable, numVertexPartitions) + // } else { + // _rawVTable + // } + // } + + // protected lazy val vTableReplicated: RDD[(Vid, VD)] = { + // // Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get + // // the shuffle id so we can use it on the slave. + // vTable + // .flatMap { case (vid, (vdata, pids)) => + // pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } + // } + // .partitionBy(edgePartitioner) + // .mapPartitions({ part => + // part.map { message => (message.data._1, message.data._2) } + // }, preservesPartitioning = true) + // } } + + + + + + + + + + + + + + object GraphImpl { +def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]]): + GraphImpl[VD,ED] = { + + apply(vertices, edges, + vertices.context.defaultParallelism, edges.context.defaultParallelism) + } + + + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], + numVPart: Int, numEPart: Int): GraphImpl[VD,ED] = { + + val vtable = vertices.indexed(numVPart) + val etable = createETable(edges, numEPart) + val vid2pid = createVid2Pid(etable, vtable.index) + + new GraphImpl(vtable, vid2pid, etable) + } + + + + /** + * Create the edge table RDD, which is much more efficient for Java heap storage than the + * normal edges data structure (RDD[(Vid, Vid, ED)]). + * + * The edge table contains multiple partitions, and each partition contains only one RDD + * key-value pair: the key is the partition id, and the value is an EdgePartition object + * containing all the edges in a partition. + */ + protected def createETable[ED: ClassManifest]( + edges: RDD[Edge[ED]], numPartitions: Int) + : IndexedRDD[Pid, EdgePartition[ED]] = { + val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt + edges + .map { e => + // Random partitioning based on the source vertex id. + // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions) + val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) + //val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) + + // Should we be using 3-tuple or an optimized class + MessageToPartition(part, (e.src, e.dst, e.data)) + } + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitionsWithIndex({ (pid, iter) => + val builder = new EdgePartitionBuilder[ED] + iter.foreach { message => + val data = message.data + builder.add(data._1, data._2, data._3) + } + Iterator((pid, builder.toEdgePartition)) + }, preservesPartitioning = true).indexed() + } + + + protected def createVid2Pid[ED: ClassManifest]( + eTable: IndexedRDD[Pid, EdgePartition[ED]], + vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Pid] = { + eTable.mapPartitions { iter => + val (pid, edgePartition) = iter.next() + val vSet = new VertexSet + edgePartition.foreach(e => {vSet.add(e.src); vSet.add(e.dst)}) + vSet.iterator.map { vid => (vid.toLong, pid) } + }.indexed(vTableIndex) + } + protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = { val mixingPrime: Vid = 1125899906842597L @@ -500,70 +640,44 @@ object GraphImpl { } - /** - * Create the edge table RDD, which is much more efficient for Java heap storage than the - * normal edges data structure (RDD[(Vid, Vid, ED)]). - * - * The edge table contains multiple partitions, and each partition contains only one RDD - * key-value pair: the key is the partition id, and the value is an EdgePartition object - * containing all the edges in a partition. - */ - protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int) - : RDD[(Pid, EdgePartition[ED])] = { - val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt - edges - .map { e => - // Random partitioning based on the source vertex id. - // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions) - val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) - //val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) - // Should we be using 3-tuple or an optimized class - MessageToPartition(part, (e.src, e.dst, e.data)) - // (math.abs(e.src) % numPartitions, (e.src, e.dst, e.data)) - } - .partitionBy(new HashPartitioner(numPartitions)) - .mapPartitionsWithIndex({ (pid, iter) => - val edgePartition = new EdgePartition[ED] - iter.foreach { message => - val data = message.data - edgePartition.add(data._1, data._2, data._3) - } - edgePartition.trim() - Iterator((pid, edgePartition)) - }, preservesPartitioning = true) - } - protected def createVTable[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[Vertex[VD]], - eTable: RDD[(Pid, EdgePartition[ED])], - numPartitions: Int) - : RDD[(Vid, (VD, Array[Pid]))] = { - val partitioner = new HashPartitioner(numPartitions) + // protected def createVTable[VD: ClassManifest, ED: ClassManifest]( + // eTable: IndexedRDD[Pid, EdgePartition[ED]], + // vid2pid: Index + // vertices: RDD[Vertex[VD]], - // A key-value RDD. The key is a vertex id, and the value is a list of - // partitions that contains edges referencing the vertex. - val vid2pid : RDD[(Vid, Seq[Pid])] = eTable.mapPartitions { iter => - val (pid, edgePartition) = iter.next() - val vSet = new VertexSet - var i = 0 - while (i < edgePartition.srcIds.size) { - vSet.add(edgePartition.srcIds.getLong(i)) - vSet.add(edgePartition.dstIds.getLong(i)) - i += 1 - } - vSet.iterator.map { vid => (vid.toLong, pid) } - }.groupByKey(partitioner) + // default: VD) : IndexedRDD[Vid, VD] = { - vertices - .map { v => (v.id, v.data) } - .partitionBy(partitioner) - .leftOuterJoin(vid2pid) - .mapValues { - case (vdata, None) => (vdata, Array.empty[Pid]) - case (vdata, Some(pids)) => (vdata, pids.toArray) - } - } + // // Compute all the vertices in the edge table. + // val vid2pid = createVid2Pid(eTable) + + // // Compute all the + // vertices.map(v => (v.id, v.data)).cogroup(vids) + + // // A key-value RDD. The key is a vertex id, and the value is a list of + // // partitions that contains edges referencing the vertex. + // val vid2pid : RDD[(Vid, Seq[Pid])] = eTable.mapPartitions { iter => + // val (pid, edgePartition) = iter.next() + // val vSet = new VertexSet + // var i = 0 + // while (i < edgePartition.srcIds.size) { + // vSet.add(edgePartition.srcIds.getLong(i)) + // vSet.add(edgePartition.dstIds.getLong(i)) + // i += 1 + // } + // vSet.iterator.map { vid => (vid.toLong, pid) } + // }.groupByKey(partitioner) + + // vertices + // .map { v => (v.id, v.data) } + // .partitionBy(partitioner) + // .leftOuterJoin(vid2pid) + // .mapValues { + // case (vdata, None) => (vdata, Array.empty[Pid]) + // case (vdata, Some(pids)) => (vdata, pids.toArray) + // } + // } } diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index d0583c48a8..01a04e9c39 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -37,7 +37,7 @@ object GraphGenerators { val host = "local[4]" val sc = new SparkContext(host, "Lognormal graph generator") - val lnGraph = lognormalGraph(sc, 10000) + val lnGraph = logNormalGraph(sc, 10000) val rmat = rmatGraph(sc, 1000, 3000) @@ -69,19 +69,21 @@ object GraphGenerators { // Right now it just generates a bunch of edges where // the edge data is the weight (default 1) - def lognormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = { + def logNormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = { // based on Pregel settings val mu = 4 val sigma = 1.3 //val vertsAndEdges = (0 until numVertices).flatMap { src => { - val vertices = (0 until numVertices).flatMap { src => - Array(Vertex(src, sampleLogNormal(mu, sigma, numVertices))) } - val edges = vertices.flatMap( { v => - generateRandomEdges(v.id.toInt, v.data, numVertices) }) + + val vertices: RDD[(Vid, Int)] = sc.parallelize(0 until numVertices).map{ + src => (src, sampleLogNormal(mu, sigma, numVertices)) + } + + val edges = vertices.flatMap{ + v => generateRandomEdges(v._1.toInt, v._2, numVertices) + } - - - new GraphImpl[Int, Int](sc.parallelize(vertices), sc.parallelize(edges)) + GraphImpl(vertices, edges) //println("Vertices:") //for (v <- vertices) { // println(v.id) @@ -161,8 +163,8 @@ object GraphGenerators { val vertices = edges.flatMap { edge => List((edge.src, 1)) } .reduceByKey(_ + _) - .map{ case (vid, degree) => Vertex(vid, degree) } - new GraphImpl[Int, ED](vertices, edges) + .map{ case (vid, degree) => (vid, degree) } + GraphImpl(vertices, edges) } /**