diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index d7e8282a2d..a5e3b050e5 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -205,7 +205,17 @@ object Analytics extends Logging { /** * Compute the number of triangles passing through each vertex. * - * @param graph + * The algorithm is relatively straightforward and can be computed in + * three steps: + * + * 1) Compute the set of neighbors for each vertex + * 2) For each edge compute the intersection of the sets and send the + * count to both vertices. + * 3) Compute the sum at each vertex and divide by two since each + * triangle is counted twice. + * + * + * @param graph a graph with `sourceId` less than `destId` * @tparam VD * @tparam ED * @return @@ -218,11 +228,13 @@ object Analytics extends Logging { // Construct set representations of the neighborhoods val nbrSets: VertexSetRDD[VertexSet] = graph.collectNeighborIds(EdgeDirection.Both).mapValuesWithKeys { (vid, nbrs) => - val set = new VertexSet//(math.ceil(nbrs.size/0.7).toInt) + val set = new VertexSet var i = 0 while (i < nbrs.size) { // prevent self cycle - if(nbrs(i) != vid) set.add(nbrs(i)) + if(nbrs(i) != vid) { + set.add(nbrs(i)) + } i += 1 } set @@ -235,9 +247,11 @@ object Analytics extends Logging { def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Array[(Vid, Int)] = { assert(et.srcAttr != null) assert(et.dstAttr != null) - val (smallSet, largeSet) = - if (et.srcAttr.size < et.dstAttr.size) { (et.srcAttr, et.dstAttr) } - else { (et.dstAttr, et.srcAttr) } + val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { + (et.srcAttr, et.dstAttr) + } else { + (et.dstAttr, et.srcAttr) + } val iter = smallSet.iterator() var counter: Int = 0 while (iter.hasNext) { @@ -247,14 +261,14 @@ object Analytics extends Logging { Array((et.srcId, counter), (et.dstId, counter)) } // compute the intersection along edges - val counters: VertexSetRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _+_) + val counters: VertexSetRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _) // Merge counters with the graph and divide by two since each triangle is counted twice graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) => val dblCount = optCounter.getOrElse(0) // double count should be even (divisible by two) - assert((dblCount & 1) == 0 ) - dblCount/2 + assert((dblCount & 1) == 0) + dblCount / 2 } } // end of TriangleCount diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 848e580ebd..a953f2cb97 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -61,7 +61,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * type Color = Int * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv") * val numInvalid = graph.edgesWithVertices() - * .map(e => if(e.src.data == e.dst.data) 1 else 0).sum + * .map(e => if (e.src.data == e.dst.data) 1 else 0).sum * }}} * * @see edges() If only the edge data and adjacent vertex ids are @@ -110,7 +110,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") * val root = 42 * var bfsGraph = rawGraph - * .mapVertices[Int]((vid, data) => if(vid == root) 0 else Math.MaxValue) + * .mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue) * }}} * */ @@ -349,16 +349,15 @@ object Graph { } /** - * Construct a graph from a collection of edges encoded as vertex id - * pairs. + * Construct a graph from a collection of edges encoded as vertex id pairs. * * @param rawEdges the RDD containing the set of edges in the graph * - * @return a graph with edge attributes containing the count of - * duplicate edges. + * @return a graph with edge attributes containing the count of duplicate edges. */ - def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): - Graph[VD, Int] = { Graph(rawEdges, defaultValue, false) } + def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): Graph[VD, Int] = { + Graph(rawEdges, defaultValue, false) + } /** * Construct a graph from a collection of edges encoded as vertex id @@ -377,7 +376,7 @@ object Graph { def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD, uniqueEdges: Boolean): Graph[VD, Int] = { val graph = GraphImpl(rawEdges.map(p => Edge(p._1, p._2, 1)), defaultValue) - if(uniqueEdges) { + if (uniqueEdges) { graph.groupEdges((a,b) => a+b) } else { graph diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index 11659ce66d..b5047cad11 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -158,13 +158,18 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { VertexSetRDD[Array[Vid]] = { val nbrs = if (edgeDirection == EdgeDirection.Both) { - graph.mapReduceTriplets[Array[Vid]] ( - et => Array( (et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), _ ++ _ + graph.mapReduceTriplets[Array[Vid]]( + mapFunc = et => Array((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), + reduceFunc = _ ++ _ ) } else if (edgeDirection == EdgeDirection.Out) { - graph.mapReduceTriplets[Array[Vid]](et => Array((et.srcId, Array(et.dstId))), _ ++ _) + graph.mapReduceTriplets[Array[Vid]]( + mapFunc = et => Array((et.srcId, Array(et.dstId))), + reduceFunc = _ ++ _) } else if (edgeDirection == EdgeDirection.In) { - graph.mapReduceTriplets[Array[Vid]](et => Array((et.dstId, Array(et.srcId))), _ ++ _) + graph.mapReduceTriplets[Array[Vid]]( + mapFunc = et => Array((et.dstId, Array(et.srcId))), + reduceFunc = _ ++ _) } else { throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.") } diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 4a056911fd..7e7382b7b5 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -171,7 +171,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( var ind = bs.nextSetBit(0) while(ind >= 0) { val k = index.getValueSafe(ind) - if( cleanPred( (k, oldValues(ind)) ) ) { + if (cleanPred((k, oldValues(ind)))) { newBS.set(ind) } ind = bs.nextSetBit(ind+1) @@ -278,7 +278,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( def zipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, W) => Z): VertexSetRDD[Z] = { val cleanF = index.rdd.context.clean(f) - if(index != other.index) { + if (index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } val newValuesRDD: RDD[ (Array[Z], BitSet) ] = @@ -315,7 +315,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]): RDD[Z] = { val cleanF = index.rdd.context.clean(f) - if(index != other.index) { + if (index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) => @@ -351,7 +351,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( */ def leftZipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, Option[W]) => Z): VertexSetRDD[Z] = { - if(index != other.index) { + if (index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } val cleanF = index.rdd.context.clean(f) diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index ac8415a000..e1ff8df4ea 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -12,30 +12,30 @@ import org.apache.spark.graph.util.GraphGenerators object GridPageRank { - def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) val outDegree = Array.fill(nRows * nCols)(0) // Convert row column address into vertex ids (row major order) - def sub2ind(r: Int, c: Int): Int = r * nCols + c + def sub2ind(r: Int, c: Int): Int = r * nCols + c // Make the grid graph - for(r <- 0 until nRows; c <- 0 until nCols){ + for (r <- 0 until nRows; c <- 0 until nCols) { val ind = sub2ind(r,c) - if(r+1 < nRows) { + if (r+1 < nRows) { outDegree(ind) += 1 - inNbrs(sub2ind(r+1,c)) += ind + inNbrs(sub2ind(r+1,c)) += ind } - if(c+1 < nCols) { + if (c+1 < nCols) { outDegree(ind) += 1 - inNbrs(sub2ind(r,c+1)) += ind + inNbrs(sub2ind(r,c+1)) += ind } } // compute the pagerank var pr = Array.fill(nRows * nCols)(resetProb) - for(iter <- 0 until nIter) { + for (iter <- 0 until nIter) { val oldPr = pr pr = new Array[Double](nRows * nCols) - for(ind <- 0 until (nRows * nCols)) { - pr(ind) = resetProb + (1.0 - resetProb) * + for (ind <- 0 until (nRows * nCols)) { + pr(ind) = resetProb + (1.0 - resetProb) * inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum } } @@ -58,13 +58,13 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { val resetProb = 0.15 val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb) val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb) - + val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) => if (pr1 != pr2) { 1 } else { 0 } }.map { case (vid, test) => test }.sum assert(notMatching === 0) prGraph2.vertices.foreach(println(_)) - val errors = prGraph2.vertices.map{ case (vid, pr) => + val errors = prGraph2.vertices.map { case (vid, pr) => val correct = (vid > 0 && pr == resetProb) || (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) if ( !correct ) { 1 } else { 0 } @@ -141,9 +141,12 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } val ccMap = vertices.toMap println(ccMap) - for( id <- 0 until 20 ) { - if(id < 10) { assert(ccMap(id) === 0) } - else { assert(ccMap(id) === 10) } + for (id <- 0 until 20) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } } } } // end of chain connected components @@ -157,14 +160,20 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { val ccGraph = Analytics.connectedComponents(twoChains).cache() val vertices = ccGraph.vertices.collect for ( (id, cc) <- vertices ) { - if(id < 10) { assert(cc === 0) } - else { assert(cc === 10) } + if (id < 10) { + assert(cc === 0) + } else { + assert(cc === 10) + } } val ccMap = vertices.toMap println(ccMap) - for( id <- 0 until 20 ) { - if(id < 10) { assert(ccMap(id) === 0) } - else { assert(ccMap(id) === 10) } + for ( id <- 0 until 20 ) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } } } } // end of reverse chain connected components @@ -181,15 +190,18 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Count two triangles") { withSpark(new SparkContext("local", "test")) { sc => - val triangles = Array( 0L -> 1L, 1L -> 2L, 2L -> 0L ) ++ - Array( 0L -> -1L, -1L -> -2L, -2L -> 0L ) + val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> -1L, -1L -> -2L, -2L -> 0L) val rawEdges = sc.parallelize(triangles, 2) val graph = Graph(rawEdges, true).cache val triangleCount = Analytics.triangleCount(graph) val verts = triangleCount.vertices verts.collect.foreach { case (vid, count) => - if(vid == 0) { assert(count === 2) } - else { assert(count === 1) } + if (vid == 0) { + assert(count === 2) + } else { + assert(count === 1) + } } } } @@ -197,24 +209,27 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Count two triangles with bi-directed edges") { withSpark(new SparkContext("local", "test")) { sc => val triangles = - Array( 0L -> 1L, 1L -> 2L, 2L -> 0L ) ++ - Array( 0L -> -1L, -1L -> -2L, -2L -> 0L ) + Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> -1L, -1L -> -2L, -2L -> 0L) val revTriangles = triangles.map { case (a,b) => (b,a) } - val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) val graph = Graph(rawEdges, true).cache val triangleCount = Analytics.triangleCount(graph) val verts = triangleCount.vertices verts.collect.foreach { case (vid, count) => - if(vid == 0) { assert(count === 4) } - else { assert(count === 2) } + if (vid == 0) { + assert(count === 4) + } else { + assert(count === 2) + } } } } test("Count a single triangle with duplicate edges") { withSpark(new SparkContext("local", "test")) { sc => - val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ) ++ Array( 0L->1L, 1L->2L, 2L->0L ), 2) + val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) val graph = Graph(rawEdges, true).cache val triangleCount = Analytics.triangleCount(graph) val verts = triangleCount.vertices diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index b0d248b651..ff1cd56599 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -16,7 +16,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L) val edges = sc.parallelize(rawEdges) val graph = Graph(edges, 1.0F) - assert( graph.edges.count() === rawEdges.size ) + assert(graph.edges.count() === rawEdges.size) } } @@ -29,8 +29,8 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert( graph.edges.count() === rawEdges.size ) assert( graph.vertices.count() === 100) graph.triplets.map { et => - assert( (et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr) ) - assert( (et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr) ) + assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr)) + assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr)) } } } @@ -111,7 +111,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { nbrs.collect.foreach { case (vid, nbrs) => val s = nbrs.toSet assert(s.contains((vid + 1) % 100)) - assert(s.contains(if (vid > 0) { vid - 1 } else { 99 })) + assert(s.contains(if (vid > 0) vid - 1 else 99 )) } } }