added documentation to graph and did some minor renaming

This commit is contained in:
Joseph E. Gonzalez 2013-05-09 20:14:27 -07:00
parent 0711fab137
commit 0c24305b8d
4 changed files with 292 additions and 18 deletions

View file

@ -39,7 +39,7 @@ object Analytics extends Logging {
*/ */
def pregelPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { def pregelPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
// Compute the out degree of each vertex // Compute the out degree of each vertex
val pagerankGraph = graph.updateVertices[Int, (Int, Double)](graph.outDegrees, val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0) (vertex, deg) => (deg.getOrElse(0), 1.0)
) )
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(

View file

@ -3,46 +3,314 @@ package spark.graph
import spark.RDD import spark.RDD
/**
* The Graph abstractly represents a graph with arbitrary objects associated
* with vertices and edges. The graph provides basic operations to access and
* manipulate the data associated with vertices and edges as well as the
* underlying structure. Like Spark RDDs, the graph is a functional
* data-structure in which mutating operations return new graphs.
*
* @tparam VD The type of object associated with each vertex.
*
* @tparam ED The type of object associated with each edge
*/
abstract class Graph[VD: ClassManifest, ED: ClassManifest] { abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
/**
* Get the vertices and their data.
*
* @see Vertex for the vertex type.
*
* @todo should vertices return tuples instead of vertex objects?
*/
def vertices(): RDD[Vertex[VD]] def vertices(): RDD[Vertex[VD]]
/**
* Get the Edges and their data as an RDD. The entries in the RDD contain
* just the source id and target id along with the edge data.
*
*
* @see Edge for the edge type.
* @see edgesWithVertices to get an RDD which contains all the edges along
* with their vertex data.
*
* @todo Should edges return 3 tuples instead of Edge objects? In this case
* we could rename EdgeWithVertices to Edge?
*/
def edges(): RDD[Edge[ED]] def edges(): RDD[Edge[ED]]
/**
* Get the edges with the vertex data associated with the adjacent pair of
* vertices.
*
* @example This operation might be used to evaluate a graph coloring where
* we would like to check that both vertices are a different color.
* {{{
* type Color = Int
* val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv")
* val numInvalid = graph.edgesWithVertices()
* .map(e => if(e.src.data == e.dst.data) 1 else 0).sum
* }}}
*
* @see edges() If only the edge data and adjacent vertex ids are required.
*
*/
def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]]
/**
* Return a graph that is cached when first created. This is used to pin a
* graph in memory enabling multiple queries to reuse the same construction
* process.
*
* @see RDD.cache() for a more detailed explanation of caching.
*/
def cache(): Graph[VD, ED] def cache(): Graph[VD, ED]
def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] /**
* Construct a new graph where each vertex value has been transformed by the
* map function.
*
* @note This graph is not changed and that the new graph has the same
* structure. As a consequence the underlying index structures can be
* reused.
*
* @param map the function from a vertex object to a new vertex value.
*
* @tparam VD2 the new vertex data type
*
* @example We might use this operation to change the vertex values from one
* type to another to initialize an algorithm.
* {{{
* val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
* val root = 42
* var bfsGraph = rawGraph
* .mapVertices[Int](v => if(v.id == 0) 0 else Math.MaxValue)
* }}}
*
*/
def mapVertices[VD2: ClassManifest](map: Vertex[VD] => VD2): Graph[VD2, ED]
def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] /**
* Construct a new graph where each the value of each edge is transformed by
* the map operation. This function is not passed the vertex value for the
* vertices adjacent to the edge. If vertex values are desired use the
* mapEdgesWithVertices function.
*
* @note This graph is not changed and that the new graph has the same
* structure. As a consequence the underlying index structures can be
* reused.
*
* @param map the function from an edge object to a new edge value.
*
* @tparam ED2 the new edge data type
*
* @example This function might be used to initialize edge attributes.
*
*/
def mapEdges[ED2: ClassManifest](map: Edge[ED] => ED2): Graph[VD, ED2]
/** Return a new graph with its edge directions reversed. */ /**
* Construct a new graph where each the value of each edge is transformed by
* the map operation. This function passes vertex values for the adjacent
* vertices to the map function. If adjacent vertex values are not required,
* consider using the mapEdges function instead.
*
* @note This graph is not changed and that the new graph has the same
* structure. As a consequence the underlying index structures can be
* reused.
*
* @param map the function from an edge object to a new edge value.
*
* @tparam ED2 the new edge data type
*
* @example This function might be used to initialize edge attributes based
* on the attributes associated with each vertex.
* {{{
* val rawGraph: Graph[Int, Int] = someLoadFunction()
* val graph = rawGraph.mapEdgesWithVertices[Int]( edge =>
* edge.src.data - edge.dst.data)
* }}}
*
*/
def mapEdgesWithVertices[ED2: ClassManifest](
map: EdgeWithVertices[VD, ED] => ED2): Graph[VD, ED2]
/**
* Construct a new graph with all the edges reversed. If this graph contains
* an edge from a to b then the returned graph contains an edge from b to a.
*
*/
def reverse: Graph[VD, ED] def reverse: Graph[VD, ED]
/**
* This function is used to compute a statistic for the neighborhood of each
* vertex.
*
* This is one of the core functions in the Graph API in that enables
* neighborhood level computation. For example this function can be used to
* count neighbors satisfying a predicate or implement PageRank.
*
* @note The returned RDD may contain fewer entries than their are vertices
* in the graph. This is because some vertices may not have neighbors or the
* map function may return None for all neighbors.
*
* @param mapFunc the function applied to each edge adjacent to each vertex.
* The mapFunc can optionally return None in which case it does not
* contribute to the final sum.
* @param mergeFunc the function used to merge the results of each map
* operation.
* @param direction the direction of edges to consider (e.g., In, Out, Both).
* @tparam VD2 The returned type of the aggregation operation.
*
* @return A Spark.RDD containing tuples of vertex identifiers and thee
* resulting value. Note that the returned RDD may contain fewer vertices
* than in the original graph since some vertices may not have neighbors or
* the map function could return None for all neighbors.
*
* @example We can use this function to compute the average follower age for
* each user
* {{{
* val graph: Graph[Int,Int] = loadGraph()
* val averageFollowerAge: RDD[(Int, Int)] =
* graph.aggregateNeigbhros[(Int,Double)](
* (vid, edge) => (edge.otherVertex(vid).data, 1),
* (a, b) => (a._1 + b._1, a._2 + b._2),
* EdgeDirection.In)
* .mapValues{ case (sum,followers) => sum.toDouble / followers}
* }}}
*
*/
def aggregateNeighbors[VD2: ClassManifest]( def aggregateNeighbors[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2, mergeFunc: (VD2, VD2) => VD2,
gatherDirection: EdgeDirection) direction: EdgeDirection)
: RDD[(Vid, VD2)] : RDD[(Vid, VD2)]
/**
* This function is used to compute a statistic for the neighborhood of each
* vertex and returns a value for all vertices (including those without
* neighbors).
*
* This is one of the core functions in the Graph API in that enables
* neighborhood level computation. For example this function can be used to
* count neighbors satisfying a predicate or implement PageRank.
*
* @note Because the a default value is provided all vertices will have a
* corresponding entry in the returned RDD.
*
* @param mapFunc the function applied to each edge adjacent to each vertex.
* The mapFunc can optionally return None in which case it does not
* contribute to the final sum.
* @param mergeFunc the function used to merge the results of each map
* operation.
* @param default the default value to use for each vertex if it has no
* neighbors or the map function repeatedly evaluates to none
* @param direction the direction of edges to consider (e.g., In, Out, Both).
* @tparam VD2 The returned type of the aggregation operation.
*
* @return A Spark.RDD containing tuples of vertex identifiers and
* their resulting value. There will be exactly one entry for ever vertex in
* the original graph.
*
* @example We can use this function to compute the average follower age
* for each user
* {{{
* val graph: Graph[Int,Int] = loadGraph()
* val averageFollowerAge: RDD[(Int, Int)] =
* graph.aggregateNeigbhros[(Int,Double)](
* (vid, edge) => (edge.otherVertex(vid).data, 1),
* (a, b) => (a._1 + b._1, a._2 + b._2),
* -1,
* EdgeDirection.In)
* .mapValues{ case (sum,followers) => sum.toDouble / followers}
* }}}
*
* @todo Should this return a graph with the new vertex values?
*
*/
def aggregateNeighbors[VD2: ClassManifest]( def aggregateNeighbors[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2], mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2, reduceFunc: (VD2, VD2) => VD2,
default: VD2, // Should this be a function or a value? default: VD2, // Should this be a function or a value?
gatherDirection: EdgeDirection) direction: EdgeDirection)
: RDD[(Vid, VD2)] : RDD[(Vid, VD2)]
def updateVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)], /**
updateFunc: (Vertex[VD], Option[U]) => VD2) * Join the vertices with an RDD and then apply a function from the the
* vertex and RDD entry to a new vertex value and type. The input table should
* contain at most one entry for each vertex. If no entry is provided the
* map function is invoked passing none.
*
* @tparam U the type of entry in the table of updates
* @tparam VD2 the new vertex value type
* @param tlb the table to join with the vertices in the graph. The table
* should contain at most one entry for each vertex.
* @param mapFunc the function used to compute the new vertex values. The
* map function is invoked for all vertices, even those that do not have a
* corresponding entry in the table.
*
* @example This function is used to update the vertices with new values
* based on external data. For example we could add the out degree to each
* vertex record
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("webgraph")
* val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.leftJoinVertices[Int,Int](outDeg,
* (v, deg) => deg.getOrElse(0) )
* }}}
*
* @todo Should this function be curried to enable type inference? For
* example
* {{{
* graph.leftJoinVertices(tbl)( (v, row) => row.getOrElse(0) )
* }}}
* @todo Is leftJoinVertices the right name?
*/
def leftJoinVertices[U: ClassManifest, VD2: ClassManifest](
table: RDD[(Vid, U)],
mapFunc: (Vertex[VD], Option[U]) => VD2)
: Graph[VD2, ED] : Graph[VD2, ED]
// This one can be used to skip records when we can do in-place update. /**
// Annoying that we can't rename it ... * Join the vertices with an RDD and then apply a function from the the
def updateVertices2[U: ClassManifest]( * vertex and RDD entry to a new vertex value. The input table should
updates: RDD[(Vid, U)], * contain at most one entry for each vertex. If no entry is provided the
updateFunc: (Vertex[VD], U) => VD) * map function is skipped and the old value is used.
*
* @tparam U the type of entry in the table of updates
* @param tlb the table to join with the vertices in the graph. The table
* should contain at most one entry for each vertex.
* @param mapFunc the function used to compute the new vertex values. The
* map function is invoked only for vertices with a corresponding entry in
* the table otherwise the old vertex value is used.
*
* @note for small tables this function can be much more efficient than
* leftJoinVertices
*
* @example This function is used to update the vertices with new values
* based on external data. For example we could add the out degree to each
* vertex record
* {{{
* val rawGraph: Graph[Int,()] = Graph.textFile("webgraph")
* .mapVertices(v => 0)
* val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.leftJoinVertices[Int,Int](outDeg,
* (v, deg) => deg )
* }}}
*
* @todo Should this function be curried to enable type inference? For
* example
* {{{
* graph.joinVertices(tbl)( (v, row) => row )
* }}}
*/
def joinVertices[U: ClassManifest](
table: RDD[(Vid, U)],
mapFunc: (Vertex[VD], U) => VD)
: Graph[VD, ED] : Graph[VD, ED]
// Save a copy of the GraphOps object so there is always one unique GraphOps object // Save a copy of the GraphOps object so there is always one unique GraphOps object

View file

@ -25,7 +25,7 @@ object Pregel {
var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) } var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) }
while (i < numIter) { while (i < numIter) {
g = g.updateVertices(msgs, runProg).cache() g = g.leftJoinVertices(msgs, runProg).cache()
msgs = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) msgs = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
i += 1 i += 1
} }

View file

@ -90,6 +90,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e)))) newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e))))
} }
override def mapEdgesWithVertices[ED2: ClassManifest](f: EdgeWithVertices[VD, ED] => ED2):
Graph[VD, ED2] = {
newGraph(vertices, edgesWithVertices.map(e => Edge(e.src.id, e.dst.id, f(e))))
}
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
// Lower level transformation methods // Lower level transformation methods
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
@ -202,7 +208,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false) .combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
} }
override def updateVertices[U: ClassManifest, VD2: ClassManifest]( override def leftJoinVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)], updates: RDD[(Vid, U)],
updateF: (Vertex[VD], Option[U]) => VD2) updateF: (Vertex[VD], Option[U]) => VD2)
: Graph[VD2, ED] = { : Graph[VD2, ED] = {
@ -219,7 +225,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable) new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable)
} }
override def updateVertices2[U: ClassManifest]( override def joinVertices[U: ClassManifest](
updates: RDD[(Vid, U)], updates: RDD[(Vid, U)],
updateF: (Vertex[VD], U) => VD) updateF: (Vertex[VD], U) => VD)
: Graph[VD, ED] = { : Graph[VD, ED] = {