From 0c24305b8dbda5278c2f03d02bbee49e1df5ee44 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 9 May 2013 20:14:27 -0700 Subject: [PATCH] added documentation to graph and did some minor renaming --- .../main/scala/spark/graph/Analytics.scala | 2 +- graph/src/main/scala/spark/graph/Graph.scala | 296 +++++++++++++++++- graph/src/main/scala/spark/graph/Pregel.scala | 2 +- .../scala/spark/graph/impl/GraphImpl.scala | 10 +- 4 files changed, 292 insertions(+), 18 deletions(-) diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index 8b3271729b..05275bec68 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -39,7 +39,7 @@ object Analytics extends Logging { */ def pregelPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { // Compute the out degree of each vertex - val pagerankGraph = graph.updateVertices[Int, (Int, Double)](graph.outDegrees, + val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees, (vertex, deg) => (deg.getOrElse(0), 1.0) ) Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 04b8c840fd..421055d319 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -3,46 +3,314 @@ package spark.graph import spark.RDD + +/** + * The Graph abstractly represents a graph with arbitrary objects associated + * with vertices and edges. The graph provides basic operations to access and + * manipulate the data associated with vertices and edges as well as the + * underlying structure. Like Spark RDDs, the graph is a functional + * data-structure in which mutating operations return new graphs. + * + * @tparam VD The type of object associated with each vertex. + * + * @tparam ED The type of object associated with each edge + */ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { + /** + * Get the vertices and their data. + * + * @see Vertex for the vertex type. + * + * @todo should vertices return tuples instead of vertex objects? + */ def vertices(): RDD[Vertex[VD]] + /** + * Get the Edges and their data as an RDD. The entries in the RDD contain + * just the source id and target id along with the edge data. + * + * + * @see Edge for the edge type. + * @see edgesWithVertices to get an RDD which contains all the edges along + * with their vertex data. + * + * @todo Should edges return 3 tuples instead of Edge objects? In this case + * we could rename EdgeWithVertices to Edge? + */ def edges(): RDD[Edge[ED]] + /** + * Get the edges with the vertex data associated with the adjacent pair of + * vertices. + * + * @example This operation might be used to evaluate a graph coloring where + * we would like to check that both vertices are a different color. + * {{{ + * type Color = Int + * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv") + * val numInvalid = graph.edgesWithVertices() + * .map(e => if(e.src.data == e.dst.data) 1 else 0).sum + * }}} + * + * @see edges() If only the edge data and adjacent vertex ids are required. + * + */ def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] + /** + * Return a graph that is cached when first created. This is used to pin a + * graph in memory enabling multiple queries to reuse the same construction + * process. + * + * @see RDD.cache() for a more detailed explanation of caching. + */ def cache(): Graph[VD, ED] - def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] + /** + * Construct a new graph where each vertex value has been transformed by the + * map function. + * + * @note This graph is not changed and that the new graph has the same + * structure. As a consequence the underlying index structures can be + * reused. + * + * @param map the function from a vertex object to a new vertex value. + * + * @tparam VD2 the new vertex data type + * + * @example We might use this operation to change the vertex values from one + * type to another to initialize an algorithm. + * {{{ + * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") + * val root = 42 + * var bfsGraph = rawGraph + * .mapVertices[Int](v => if(v.id == 0) 0 else Math.MaxValue) + * }}} + * + */ + def mapVertices[VD2: ClassManifest](map: Vertex[VD] => VD2): Graph[VD2, ED] - def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] + /** + * Construct a new graph where each the value of each edge is transformed by + * the map operation. This function is not passed the vertex value for the + * vertices adjacent to the edge. If vertex values are desired use the + * mapEdgesWithVertices function. + * + * @note This graph is not changed and that the new graph has the same + * structure. As a consequence the underlying index structures can be + * reused. + * + * @param map the function from an edge object to a new edge value. + * + * @tparam ED2 the new edge data type + * + * @example This function might be used to initialize edge attributes. + * + */ + def mapEdges[ED2: ClassManifest](map: Edge[ED] => ED2): Graph[VD, ED2] - /** Return a new graph with its edge directions reversed. */ + /** + * Construct a new graph where each the value of each edge is transformed by + * the map operation. This function passes vertex values for the adjacent + * vertices to the map function. If adjacent vertex values are not required, + * consider using the mapEdges function instead. + * + * @note This graph is not changed and that the new graph has the same + * structure. As a consequence the underlying index structures can be + * reused. + * + * @param map the function from an edge object to a new edge value. + * + * @tparam ED2 the new edge data type + * + * @example This function might be used to initialize edge attributes based + * on the attributes associated with each vertex. + * {{{ + * val rawGraph: Graph[Int, Int] = someLoadFunction() + * val graph = rawGraph.mapEdgesWithVertices[Int]( edge => + * edge.src.data - edge.dst.data) + * }}} + * + */ + def mapEdgesWithVertices[ED2: ClassManifest]( + map: EdgeWithVertices[VD, ED] => ED2): Graph[VD, ED2] + + + /** + * Construct a new graph with all the edges reversed. If this graph contains + * an edge from a to b then the returned graph contains an edge from b to a. + * + */ def reverse: Graph[VD, ED] + /** + * This function is used to compute a statistic for the neighborhood of each + * vertex. + * + * This is one of the core functions in the Graph API in that enables + * neighborhood level computation. For example this function can be used to + * count neighbors satisfying a predicate or implement PageRank. + * + * @note The returned RDD may contain fewer entries than their are vertices + * in the graph. This is because some vertices may not have neighbors or the + * map function may return None for all neighbors. + * + * @param mapFunc the function applied to each edge adjacent to each vertex. + * The mapFunc can optionally return None in which case it does not + * contribute to the final sum. + * @param mergeFunc the function used to merge the results of each map + * operation. + * @param direction the direction of edges to consider (e.g., In, Out, Both). + * @tparam VD2 The returned type of the aggregation operation. + * + * @return A Spark.RDD containing tuples of vertex identifiers and thee + * resulting value. Note that the returned RDD may contain fewer vertices + * than in the original graph since some vertices may not have neighbors or + * the map function could return None for all neighbors. + * + * @example We can use this function to compute the average follower age for + * each user + * {{{ + * val graph: Graph[Int,Int] = loadGraph() + * val averageFollowerAge: RDD[(Int, Int)] = + * graph.aggregateNeigbhros[(Int,Double)]( + * (vid, edge) => (edge.otherVertex(vid).data, 1), + * (a, b) => (a._1 + b._1, a._2 + b._2), + * EdgeDirection.In) + * .mapValues{ case (sum,followers) => sum.toDouble / followers} + * }}} + * + */ def aggregateNeighbors[VD2: ClassManifest]( mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], - reduceFunc: (VD2, VD2) => VD2, - gatherDirection: EdgeDirection) + mergeFunc: (VD2, VD2) => VD2, + direction: EdgeDirection) : RDD[(Vid, VD2)] + + /** + * This function is used to compute a statistic for the neighborhood of each + * vertex and returns a value for all vertices (including those without + * neighbors). + * + * This is one of the core functions in the Graph API in that enables + * neighborhood level computation. For example this function can be used to + * count neighbors satisfying a predicate or implement PageRank. + * + * @note Because the a default value is provided all vertices will have a + * corresponding entry in the returned RDD. + * + * @param mapFunc the function applied to each edge adjacent to each vertex. + * The mapFunc can optionally return None in which case it does not + * contribute to the final sum. + * @param mergeFunc the function used to merge the results of each map + * operation. + * @param default the default value to use for each vertex if it has no + * neighbors or the map function repeatedly evaluates to none + * @param direction the direction of edges to consider (e.g., In, Out, Both). + * @tparam VD2 The returned type of the aggregation operation. + * + * @return A Spark.RDD containing tuples of vertex identifiers and + * their resulting value. There will be exactly one entry for ever vertex in + * the original graph. + * + * @example We can use this function to compute the average follower age + * for each user + * {{{ + * val graph: Graph[Int,Int] = loadGraph() + * val averageFollowerAge: RDD[(Int, Int)] = + * graph.aggregateNeigbhros[(Int,Double)]( + * (vid, edge) => (edge.otherVertex(vid).data, 1), + * (a, b) => (a._1 + b._1, a._2 + b._2), + * -1, + * EdgeDirection.In) + * .mapValues{ case (sum,followers) => sum.toDouble / followers} + * }}} + * + * @todo Should this return a graph with the new vertex values? + * + */ def aggregateNeighbors[VD2: ClassManifest]( mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], reduceFunc: (VD2, VD2) => VD2, default: VD2, // Should this be a function or a value? - gatherDirection: EdgeDirection) + direction: EdgeDirection) : RDD[(Vid, VD2)] - def updateVertices[U: ClassManifest, VD2: ClassManifest]( - updates: RDD[(Vid, U)], - updateFunc: (Vertex[VD], Option[U]) => VD2) + + /** + * Join the vertices with an RDD and then apply a function from the the + * vertex and RDD entry to a new vertex value and type. The input table should + * contain at most one entry for each vertex. If no entry is provided the + * map function is invoked passing none. + * + * @tparam U the type of entry in the table of updates + * @tparam VD2 the new vertex value type + * @param tlb the table to join with the vertices in the graph. The table + * should contain at most one entry for each vertex. + * @param mapFunc the function used to compute the new vertex values. The + * map function is invoked for all vertices, even those that do not have a + * corresponding entry in the table. + * + * @example This function is used to update the vertices with new values + * based on external data. For example we could add the out degree to each + * vertex record + * {{{ + * val rawGraph: Graph[(),()] = Graph.textFile("webgraph") + * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees() + * val graph = rawGraph.leftJoinVertices[Int,Int](outDeg, + * (v, deg) => deg.getOrElse(0) ) + * }}} + * + * @todo Should this function be curried to enable type inference? For + * example + * {{{ + * graph.leftJoinVertices(tbl)( (v, row) => row.getOrElse(0) ) + * }}} + * @todo Is leftJoinVertices the right name? + */ + def leftJoinVertices[U: ClassManifest, VD2: ClassManifest]( + table: RDD[(Vid, U)], + mapFunc: (Vertex[VD], Option[U]) => VD2) : Graph[VD2, ED] - // This one can be used to skip records when we can do in-place update. - // Annoying that we can't rename it ... - def updateVertices2[U: ClassManifest]( - updates: RDD[(Vid, U)], - updateFunc: (Vertex[VD], U) => VD) + /** + * Join the vertices with an RDD and then apply a function from the the + * vertex and RDD entry to a new vertex value. The input table should + * contain at most one entry for each vertex. If no entry is provided the + * map function is skipped and the old value is used. + * + * @tparam U the type of entry in the table of updates + * @param tlb the table to join with the vertices in the graph. The table + * should contain at most one entry for each vertex. + * @param mapFunc the function used to compute the new vertex values. The + * map function is invoked only for vertices with a corresponding entry in + * the table otherwise the old vertex value is used. + * + * @note for small tables this function can be much more efficient than + * leftJoinVertices + * + * @example This function is used to update the vertices with new values + * based on external data. For example we could add the out degree to each + * vertex record + * {{{ + * val rawGraph: Graph[Int,()] = Graph.textFile("webgraph") + * .mapVertices(v => 0) + * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees() + * val graph = rawGraph.leftJoinVertices[Int,Int](outDeg, + * (v, deg) => deg ) + * }}} + * + * @todo Should this function be curried to enable type inference? For + * example + * {{{ + * graph.joinVertices(tbl)( (v, row) => row ) + * }}} + */ + def joinVertices[U: ClassManifest]( + table: RDD[(Vid, U)], + mapFunc: (Vertex[VD], U) => VD) : Graph[VD, ED] // Save a copy of the GraphOps object so there is always one unique GraphOps object diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/spark/graph/Pregel.scala index 7f8849e442..cd2400e5fa 100644 --- a/graph/src/main/scala/spark/graph/Pregel.scala +++ b/graph/src/main/scala/spark/graph/Pregel.scala @@ -25,7 +25,7 @@ object Pregel { var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) } while (i < numIter) { - g = g.updateVertices(msgs, runProg).cache() + g = g.leftJoinVertices(msgs, runProg).cache() msgs = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) i += 1 } diff --git a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala index c362746d22..4565f94e83 100644 --- a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala @@ -90,6 +90,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e)))) } + override def mapEdgesWithVertices[ED2: ClassManifest](f: EdgeWithVertices[VD, ED] => ED2): + Graph[VD, ED2] = { + newGraph(vertices, edgesWithVertices.map(e => Edge(e.src.id, e.dst.id, f(e)))) + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// // Lower level transformation methods ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -202,7 +208,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( .combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false) } - override def updateVertices[U: ClassManifest, VD2: ClassManifest]( + override def leftJoinVertices[U: ClassManifest, VD2: ClassManifest]( updates: RDD[(Vid, U)], updateF: (Vertex[VD], Option[U]) => VD2) : Graph[VD2, ED] = { @@ -219,7 +225,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable) } - override def updateVertices2[U: ClassManifest]( + override def joinVertices[U: ClassManifest]( updates: RDD[(Vid, U)], updateF: (Vertex[VD], U) => VD) : Graph[VD, ED] = {