From 77626d15071a19e5173cb682a07871382078873f Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Tue, 29 Oct 2013 11:05:42 -0700 Subject: [PATCH] Adding collect neighbors and documenting GraphOps. --- .../org/apache/spark/graph/GraphOps.scala | 94 ++++++++++++++++--- 1 file changed, 81 insertions(+), 13 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index cecd3ff291..d05711c9eb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -5,21 +5,62 @@ import org.apache.spark.SparkContext._ import org.apache.spark.util.ClosureCleaner + +/** + * `GraphOps` contains additional functionality (syntatic sugar) for the graph + * type and is implicitly constructed for each Graph object. All operations in + * `GraphOps` are expressed in terms of the efficient GraphX API. + * + * @tparam VD the vertex attribute type + * @tparam ED the edge attribute type + * + */ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { - - + /** + * Compute the number of edges in the graph. + */ lazy val numEdges: Long = graph.edges.count() + + /** + * Compute the number of vertices in the graph. + */ lazy val numVertices: Long = graph.vertices.count() + + /** + * Compute the in-degree of each vertex in the Graph returning an RDD. + * @note Vertices with no in edges are not returned in the resulting RDD. + */ lazy val inDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.In) + + /** + * Compute the out-degree of each vertex in the Graph returning an RDD. + * @note Vertices with no out edges are not returned in the resulting RDD. + */ lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out) + + /** + * Compute the degrees of each vertex in the Graph returning an RDD. + * @note Vertices with no edges are not returned in the resulting RDD. + */ lazy val degrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Both) + /** + * Compute the neighboring vertex degrees. + * + * @param edgeDirection the direction along which to collect neighboring + * vertex attributes. + */ + private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = { + graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection) + } + + /** * This function is used to compute a statistic for the neighborhood of each * vertex and returns a value for all vertices (including those without @@ -94,7 +135,16 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { } // end of aggregateNeighbors - def collectNeighborIds(edgeDirection: EdgeDirection) : VertexSetRDD[Array[Vid]] = { + /** + * Return the Ids of the neighboring vertices. + * + * @param edgeDirection the direction along which to collect + * neighboring vertices + * + * @return the vertex set of neighboring ids for each vertex. + */ + def collectNeighborIds(edgeDirection: EdgeDirection) : + VertexSetRDD[Array[Vid]] = { val nbrs = graph.aggregateNeighbors[Array[Vid]]( (vid, edge) => Some(Array(edge.otherVertexId(vid))), (a, b) => a ++ b, @@ -104,12 +154,35 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { case (_, Some(nbrs)) => nbrs case (_, None) => Array.empty[Vid] } - } + } // end of collectNeighborIds - private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = { - graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection) - } + /** + * Collect the neighbor vertex attributes for each vertex. + * + * @note This function could be highly inefficient on power-law + * graphs where high degree vertices may force a large ammount of + * information to be collected to a single location. + * + * @param edgeDirection the direction along which to collect + * neighboring vertices + * + * @return the vertex set of neighboring vertex attributes + * for each vertex. + */ + def collectNeighbors(edgeDirection: EdgeDirection) : + VertexSetRDD[ Array[(Vid, VD)] ] = { + val nbrs = graph.aggregateNeighbors[Array[(Vid,VD)]]( + (vid, edge) => + Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )), + (a, b) => a ++ b, + edgeDirection) + + graph.vertices.leftZipJoin(nbrs).mapValues{ + case (_, Some(nbrs)) => nbrs + case (_, None) => Array.empty[(Vid, VD)] + } + } // end of collectNeighbor /** @@ -139,11 +212,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { * (v, deg) => deg ) * }}} * - * @todo Should this function be curried to enable type inference? For - * example - * {{{ - * graph.joinVertices(tbl)( (v, row) => row ) - * }}} */ def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD) : Graph[VD, ED] = { @@ -158,4 +226,4 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { graph.outerJoinVertices(table)(uf) } -} +} // end of GraphOps