Added some documentation.
This commit is contained in:
parent
3f3d28c73f
commit
3a40a5eb30
|
@ -184,13 +184,42 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
|
|||
|
||||
|
||||
/**
|
||||
* @todo document function
|
||||
* groupEdgeTriplets is used to merge multiple edges that have the
|
||||
* same source and destination vertex into a single edge. The user
|
||||
* supplied function is applied to each directed pair of vertices (u, v) and
|
||||
* has access to all EdgeTriplets
|
||||
*
|
||||
* {e: for all e in E where e.src = u and e.dst = v}
|
||||
*
|
||||
* This function is identical to [[org.apache.spark.graph.Graph.groupEdges]]
|
||||
* except that this function
|
||||
* provides the user-supplied function with an iterator over EdgeTriplets,
|
||||
* which contain the vertex data, whereas groupEdges provides the user-supplied
|
||||
* function with an iterator over Edges, which only contain the vertex IDs.
|
||||
*
|
||||
* @tparam ED2 the type of the resulting edge data after grouping
|
||||
*
|
||||
* @param f the user supplied function to merge multiple EdgeTriplets
|
||||
* into a single ED2 object
|
||||
*
|
||||
* @return Graph[VD,ED2] The resulting graph with a single Edge for each
|
||||
* source, dest vertex pair.
|
||||
*
|
||||
*/
|
||||
def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2]
|
||||
|
||||
|
||||
/**
|
||||
* @todo document function
|
||||
* This function merges multiple edges between two vertices into a single
|
||||
* Edge. See [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more detail.
|
||||
*
|
||||
* @tparam ED2 the type of the resulting edge data after grouping.
|
||||
*
|
||||
* @param f the user supplied function to merge multiple Edges
|
||||
* into a single ED2 object.
|
||||
*
|
||||
* @return Graph[VD,ED2] The resulting graph with a single Edge for each
|
||||
* source, dest vertex pair.
|
||||
*/
|
||||
def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): Graph[VD,ED2]
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import scala.collection.JavaConversions._
|
|||
import org.apache.spark.rdd.RDD
|
||||
|
||||
/**
|
||||
* This object implement the graphlab gather-apply-scatter api.
|
||||
* This object implements the GraphLab gather-apply-scatter api.
|
||||
*/
|
||||
object GraphLab {
|
||||
|
||||
|
|
|
@ -9,7 +9,18 @@ import org.apache.spark.graph.impl.GraphImpl
|
|||
object GraphLoader {
|
||||
|
||||
/**
|
||||
* Load an edge list from file initializing the Graph RDD
|
||||
* Load an edge list from file initializing the Graph
|
||||
*
|
||||
* @tparam ED the type of the edge data of the resulting Graph
|
||||
*
|
||||
* @param sc the SparkContext used to construct RDDs
|
||||
* @param path the path to the text file containing the edge list
|
||||
* @param edgeParser a function that takes an array of strings and
|
||||
* returns an ED object
|
||||
* @param minEdgePartitions the number of partitions for the
|
||||
* the Edge RDD
|
||||
*
|
||||
* @todo remove minVertexPartitions arg
|
||||
*/
|
||||
def textFile[ED: ClassManifest](
|
||||
sc: SparkContext,
|
||||
|
@ -38,14 +49,10 @@ object GraphLoader {
|
|||
}.cache()
|
||||
|
||||
val graph = fromEdges(edges)
|
||||
// println("Loaded graph:" +
|
||||
// "\n\t#edges: " + graph.numEdges +
|
||||
// "\n\t#vertices: " + graph.numVertices)
|
||||
|
||||
graph
|
||||
}
|
||||
|
||||
def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
|
||||
private def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
|
||||
val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) }
|
||||
.reduceByKey(_ + _)
|
||||
.map{ case (vid, degree) => (vid, degree) }
|
||||
|
|
|
@ -94,11 +94,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
|
|||
} // end of aggregateNeighbors
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
|
||||
val nbrs = graph.aggregateNeighbors[Array[Vid]](
|
||||
(vid, edge) => Some(Array(edge.otherVertexId(vid))),
|
||||
|
|
|
@ -3,8 +3,37 @@ package org.apache.spark.graph
|
|||
import org.apache.spark.rdd.RDD
|
||||
|
||||
|
||||
/**
|
||||
* This object implements the Pregel bulk-synchronous
|
||||
* message-passing API.
|
||||
*/
|
||||
object Pregel {
|
||||
|
||||
|
||||
/**
|
||||
* Execute the Pregel program.
|
||||
*
|
||||
* @tparam VD the vertex data type
|
||||
* @tparam ED the edge data type
|
||||
* @tparam A the Pregel message type
|
||||
*
|
||||
* @param vprog a user supplied function that acts as the vertex program for
|
||||
* the Pregel computation. It takes the vertex ID of the vertex it is running on,
|
||||
* the accompanying data for that vertex, and the incoming data and returns the
|
||||
* new vertex value.
|
||||
* @param sendMsg a user supplied function that takes the current vertex ID and an EdgeTriplet
|
||||
* between the vertex and one of its neighbors and produces a message to send
|
||||
* to that neighbor.
|
||||
* @param mergeMsg a user supplied function that takes two incoming messages of type A and merges
|
||||
* them into a single message of type A. ''This function must be commutative and
|
||||
* associative.''
|
||||
* @param initialMsg the message each vertex will receive at the beginning of the
|
||||
* first iteration.
|
||||
* @param numIter the number of iterations to run this computation for.
|
||||
*
|
||||
* @return the resulting graph at the end of the computation
|
||||
*
|
||||
*/
|
||||
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
|
||||
vprog: (Vid, VD, A) => VD,
|
||||
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
|
||||
|
|
|
@ -261,71 +261,27 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
}
|
||||
|
||||
|
||||
// Because of the edgepartitioner, we know that all edges with the same src and dst
|
||||
// will be in the same partition
|
||||
|
||||
// We will want to keep the same partitioning scheme. Use newGraph() rather than
|
||||
// new GraphImpl()
|
||||
// TODO(crankshaw) is there a better way to do this using RDD.groupBy()
|
||||
// functions?
|
||||
|
||||
override def groupEdgeTriplets[ED2: ClassManifest](
|
||||
f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = {
|
||||
//override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ):
|
||||
|
||||
// I think that
|
||||
// myRDD.mapPartitions { part =>
|
||||
// val (vmap, edges) = part.next()
|
||||
// gives me access to the vertex map and the set of
|
||||
// edges within that partition
|
||||
|
||||
// This is what happens during mapPartitions
|
||||
// The iterator iterates over all partitions
|
||||
// val result: RDD[U] = new RDD[T]().mapPartitions(f: Iterator[T] => Iterator[U])
|
||||
|
||||
// TODO(crankshaw) figure out how to actually get the new Edge RDD and what
|
||||
// type that should have
|
||||
val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter =>
|
||||
// toList lets us operate on all EdgeTriplets in a single partition at once
|
||||
partIter
|
||||
// TODO(crankshaw) toList requires that the entire edge partition
|
||||
// can fit in memory right now.
|
||||
.toList
|
||||
// groups all ETs in this partition that have the same src and dst
|
||||
// Because all ETs with the same src and dst will live on the same
|
||||
// partition due to the EdgePartitioner, this guarantees that these
|
||||
// ET groups will be complete.
|
||||
.groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) }
|
||||
//.groupBy { e => (e.src, e.dst) }
|
||||
// Apply the user supplied supplied edge group function to
|
||||
// each group of edges
|
||||
// The result of this line is Map[(Long, Long, ED2]
|
||||
.mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
|
||||
// convert the resulting map back to a list of tuples
|
||||
.toList
|
||||
// TODO(crankshaw) needs an iterator over the tuples?
|
||||
// Why can't I map over the list?
|
||||
.toIterator
|
||||
// map over those tuples that contain src and dst info plus the
|
||||
// new edge data to make my new edges
|
||||
.map { case ((src, dst), data) => Edge(src, dst, data) }
|
||||
|
||||
// How do I convert from a scala map to a list?
|
||||
// I want to be able to apply a function like:
|
||||
// f: (key, value): (K, V) => result: [R]
|
||||
// so that I can transfrom a Map[K, V] to List[R]
|
||||
|
||||
// Maybe look at collections.breakOut
|
||||
// see http://stackoverflow.com/questions/1715681/scala-2-8-breakout
|
||||
// and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list
|
||||
|
||||
}
|
||||
|
||||
// @todo eliminate the need to call createETable
|
||||
//TODO(crankshaw) eliminate the need to call createETable
|
||||
val newETable = createETable(newEdges,
|
||||
eTable.index.partitioner.numPartitions)
|
||||
|
||||
|
||||
new GraphImpl(vTable, vid2pid, localVidMap, newETable)
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -340,7 +296,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
.toIterator
|
||||
.map { case ((src, dst), data) => Edge(src, dst, data) }
|
||||
}
|
||||
// @todo eliminate the need to call createETable
|
||||
// TODO(crankshaw) eliminate the need to call createETable
|
||||
val newETable = createETable(newEdges,
|
||||
eTable.index.partitioner.numPartitions)
|
||||
|
||||
|
@ -654,7 +610,8 @@ object GraphImpl {
|
|||
|
||||
|
||||
/**
|
||||
* @todo(crankshaw) how does this effect load balancing?
|
||||
* @todo This will only partition edges to the upper diagonal
|
||||
* of the 2D processor space.
|
||||
*/
|
||||
protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
|
||||
numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
|
||||
|
|
|
@ -16,9 +16,9 @@ import org.apache.spark.graph.Graph
|
|||
import org.apache.spark.graph.Edge
|
||||
import org.apache.spark.graph.impl.GraphImpl
|
||||
|
||||
|
||||
// TODO(crankshaw) I might want to pull at least RMAT out into a separate class.
|
||||
// Might simplify the code to have classwide variables and such.
|
||||
/**
|
||||
* @todo(crankshaw) cleanup and modularize code
|
||||
*/
|
||||
object GraphGenerators {
|
||||
|
||||
val RMATa = 0.45
|
||||
|
@ -236,8 +236,6 @@ object GraphGenerators {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue