diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index dc1955a835..8feb42490d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -4,14 +4,54 @@ import org.apache.spark._ +/** + * The Analytics object contains a collection of basic graph analytics + * algorithms that operate largely on the graph structure. + * + * In addition the Analytics object contains a driver `main` which can + * be used to apply the various functions to graphs in standard formats. + */ object Analytics extends Logging { /** - * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD + * Run PageRank for a fixed number of iterations returning a graph + * with vertex attributes containing the PageRank and edge attributes + * the normalized edge weight. + * + * The following PageRank fixed point is computed for each vertex. + * + * {{{ + * var PR = Array.fill(n)( 1.0 ) + * val oldPR = Array.fill(n)( 1.0 ) + * for( iter <- 0 until numIter ) { + * swap(oldPR, PR) + * for( i <- 0 until n ) { + * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum + * } + * } + * }}} + * + * where `alpha` is the random reset probability (typically 0.15), + * `inNbrs[i]` is the set of neighbors whick link to `i` and `outDeg[j]` + * is the out degree of vertex `j`. + * + * Note that this is not the "normalized" PageRank and as a consequence + * pages that have no inlinks will have a PageRank of alpha. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param numIter the number of iterations of PageRank to run + * @param resetProb the random reset probability (alpha) + * + * @return the graph containing with each vertex containing the PageRank and + * each edge containing the normalized weight. + * */ - def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], - numIter: Int, - resetProb: Double = 0.15): Graph[Double, Double] = { + def pagerank[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): + Graph[Double, Double] = { /** * Initialize the pagerankGraph with each edge attribute @@ -45,12 +85,42 @@ object Analytics extends Logging { vertexProgram, sendMessage, messageCombiner) } - /** - * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD + * Run a dynamic version of PageRank returning a graph with vertex attributes + * containing the PageRank and edge attributes containing the normalized + * edge weight. + * + * {{{ + * var PR = Array.fill(n)( 1.0 ) + * val oldPR = Array.fill(n)( 0.0 ) + * while( max(abs(PR - oldPr)) > tol ) { + * swap(oldPR, PR) + * for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) { + * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum + * } + * } + * }}} + * + * where `alpha` is the random reset probability (typically 0.15), + * `inNbrs[i]` is the set of neighbors whick link to `i` and `outDeg[j]` + * is the out degree of vertex `j`. + * + * Note that this is not the "normalized" PageRank and as a consequence + * pages that have no inlinks will have a PageRank of alpha. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param tol the tolerance allowed at convergence (smaller => more accurate). + * @param resetProb the random reset probability (alpha) + * + * @return the graph containing with each vertex containing the PageRank and + * each edge containing the normalized weight. */ def deltaPagerank[VD: Manifest, ED: Manifest]( - graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = { + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): + Graph[Double, Double] = { /** * Initialize the pagerankGraph with each edge attribute @@ -89,22 +159,7 @@ object Analytics extends Logging { Pregel(pagerankGraph, initialMessage)( vertexProgram, sendMessage, messageCombiner) .mapVertices( (vid, attr) => attr._1 ) - - - // // Compute the out degree of each vertex - // val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ - // (id, data, degIter) => (degIter.sum, 1.0, 1.0) - // } - - // // Run PageRank - // GraphLab.iterate(pagerankGraph)( - // (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather - // (a: Double, b: Double) => a + b, - // (id, data, a: Option[Double]) => - // (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply - // (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter - // maxIter).mapVertices { case (vid, data) => data._2 } - } + } // end of deltaPageRank /** @@ -113,16 +168,30 @@ object Analytics extends Logging { * lowest vertex id in the connected component containing * that vertex. */ - def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { + def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = { val ccGraph = graph.mapVertices { case (vid, _) => vid } - GraphLab.iterate(ccGraph)( - (me_id, edge) => edge.otherVertexAttr(me_id), // gather - (a: Vid, b: Vid) => math.min(a, b), // merge - (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply - (me_id, edge) => (edge.vertexAttr(me_id) < edge.otherVertexAttr(me_id)), // scatter - numIter, - gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both - ) + + def sendMessage(id: Vid, edge: EdgeTriplet[Vid, ED]): Option[Vid] = { + val thisAttr = edge.vertexAttr(id) + val otherAttr = edge.otherVertexAttr(id) + if(thisAttr < otherAttr) { Some(thisAttr) } + else { None } + } + + val initialMessage = Long.MaxValue + Pregel(ccGraph, initialMessage)( + (id, attr, msg) => math.min(attr, msg), + sendMessage, + (a,b) => math.min(a,b) + ) + + // GraphLab(ccGraph, gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both)( + // (me_id, edge) => edge.otherVertexAttr(me_id), // gather + // (a: Vid, b: Vid) => math.min(a, b), // merge + // (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply + // (me_id, edge) => (edge.vertexAttr(me_id) < edge.otherVertexAttr(me_id)) + // ) + } def main(args: Array[String]) = { @@ -238,7 +307,7 @@ object Analytics extends Logging { //val graph = GraphLoader.textFile(sc, fname, a => 1.0F) val graph = GraphLoader.textFile(sc, fname, a => 1.0F, minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() - val cc = Analytics.connectedComponents(graph, numIter) + val cc = Analytics.connectedComponents(graph) //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) // else Analytics.connectedComponents(graph, numIter) println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())