Implementing connected components on top of pregel like abstraction.

This commit is contained in:
Joseph E. Gonzalez 2013-10-27 10:42:11 -07:00
parent 6a0fbc0374
commit a2287ae138

View file

@ -4,14 +4,54 @@ import org.apache.spark._
/**
* The Analytics object contains a collection of basic graph analytics
* algorithms that operate largely on the graph structure.
*
* In addition the Analytics object contains a driver `main` which can
* be used to apply the various functions to graphs in standard formats.
*/
object Analytics extends Logging { object Analytics extends Logging {
/** /**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD * Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge attributes
* the normalized edge weight.
*
* The following PageRank fixed point is computed for each vertex.
*
* {{{
* var PR = Array.fill(n)( 1.0 )
* val oldPR = Array.fill(n)( 1.0 )
* for( iter <- 0 until numIter ) {
* swap(oldPR, PR)
* for( i <- 0 until n ) {
* PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
* }
* }
* }}}
*
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and `outDeg[j]`
* is the out degree of vertex `j`.
*
* Note that this is not the "normalized" PageRank and as a consequence
* pages that have no inlinks will have a PageRank of alpha.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param numIter the number of iterations of PageRank to run
* @param resetProb the random reset probability (alpha)
*
* @return the graph containing with each vertex containing the PageRank and
* each edge containing the normalized weight.
*
*/ */
def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], def pagerank[VD: Manifest, ED: Manifest](
numIter: Int, graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15):
resetProb: Double = 0.15): Graph[Double, Double] = { Graph[Double, Double] = {
/** /**
* Initialize the pagerankGraph with each edge attribute * Initialize the pagerankGraph with each edge attribute
@ -45,12 +85,42 @@ object Analytics extends Logging {
vertexProgram, sendMessage, messageCombiner) vertexProgram, sendMessage, messageCombiner)
} }
/** /**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD * Run a dynamic version of PageRank returning a graph with vertex attributes
* containing the PageRank and edge attributes containing the normalized
* edge weight.
*
* {{{
* var PR = Array.fill(n)( 1.0 )
* val oldPR = Array.fill(n)( 0.0 )
* while( max(abs(PR - oldPr)) > tol ) {
* swap(oldPR, PR)
* for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) {
* PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
* }
* }
* }}}
*
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and `outDeg[j]`
* is the out degree of vertex `j`.
*
* Note that this is not the "normalized" PageRank and as a consequence
* pages that have no inlinks will have a PageRank of alpha.
*
* @tparam VD the original vertex attribute (not used)
* @tparam ED the original edge attribute (not used)
*
* @param graph the graph on which to compute PageRank
* @param tol the tolerance allowed at convergence (smaller => more accurate).
* @param resetProb the random reset probability (alpha)
*
* @return the graph containing with each vertex containing the PageRank and
* each edge containing the normalized weight.
*/ */
def deltaPagerank[VD: Manifest, ED: Manifest]( def deltaPagerank[VD: Manifest, ED: Manifest](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = { graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15):
Graph[Double, Double] = {
/** /**
* Initialize the pagerankGraph with each edge attribute * Initialize the pagerankGraph with each edge attribute
@ -89,22 +159,7 @@ object Analytics extends Logging {
Pregel(pagerankGraph, initialMessage)( Pregel(pagerankGraph, initialMessage)(
vertexProgram, sendMessage, messageCombiner) vertexProgram, sendMessage, messageCombiner)
.mapVertices( (vid, attr) => attr._1 ) .mapVertices( (vid, attr) => attr._1 )
} // end of deltaPageRank
// // Compute the out degree of each vertex
// val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
// (id, data, degIter) => (degIter.sum, 1.0, 1.0)
// }
// // Run PageRank
// GraphLab.iterate(pagerankGraph)(
// (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather
// (a: Double, b: Double) => a + b,
// (id, data, a: Option[Double]) =>
// (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
// (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter
// maxIter).mapVertices { case (vid, data) => data._2 }
}
/** /**
@ -113,16 +168,30 @@ object Analytics extends Logging {
* lowest vertex id in the connected component containing * lowest vertex id in the connected component containing
* that vertex. * that vertex.
*/ */
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = {
val ccGraph = graph.mapVertices { case (vid, _) => vid } val ccGraph = graph.mapVertices { case (vid, _) => vid }
GraphLab.iterate(ccGraph)(
(me_id, edge) => edge.otherVertexAttr(me_id), // gather def sendMessage(id: Vid, edge: EdgeTriplet[Vid, ED]): Option[Vid] = {
(a: Vid, b: Vid) => math.min(a, b), // merge val thisAttr = edge.vertexAttr(id)
(id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply val otherAttr = edge.otherVertexAttr(id)
(me_id, edge) => (edge.vertexAttr(me_id) < edge.otherVertexAttr(me_id)), // scatter if(thisAttr < otherAttr) { Some(thisAttr) }
numIter, else { None }
gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both }
)
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage)(
(id, attr, msg) => math.min(attr, msg),
sendMessage,
(a,b) => math.min(a,b)
)
// GraphLab(ccGraph, gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both)(
// (me_id, edge) => edge.otherVertexAttr(me_id), // gather
// (a: Vid, b: Vid) => math.min(a, b), // merge
// (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply
// (me_id, edge) => (edge.vertexAttr(me_id) < edge.otherVertexAttr(me_id))
// )
} }
def main(args: Array[String]) = { def main(args: Array[String]) = {
@ -238,7 +307,7 @@ object Analytics extends Logging {
//val graph = GraphLoader.textFile(sc, fname, a => 1.0F) //val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
val graph = GraphLoader.textFile(sc, fname, a => 1.0F, val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val cc = Analytics.connectedComponents(graph, numIter) val cc = Analytics.connectedComponents(graph)
//val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
// else Analytics.connectedComponents(graph, numIter) // else Analytics.connectedComponents(graph, numIter)
println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())