From c3ec91a462c1e9582b9cb08a231f2aad10e4e52e Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 20 Dec 2013 00:30:21 -0800 Subject: [PATCH 1/3] Write mapTriplets test --- .../scala/org/apache/spark/graph/GraphSuite.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 487d949e1f..6494ef8900 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -114,7 +114,12 @@ class GraphSuite extends FunSuite with LocalSparkContext { } test("mapTriplets") { - // TODO(ankurdave): Write the test + withSpark { sc => + val n = 5 + val star = starGraph(sc, n) + assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet === + (1L to n).map(x => Edge(0, x, "vv")).toSet) + } } test("reverse") { @@ -223,12 +228,19 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark { sc => val n = 5 val reverseStar = starGraph(sc, n).reverse + // outerJoinVertices changing type val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) } val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), (a: Int, b: Int) => a + b).collect.toSet assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0))) + // outerJoinVertices preserving type + val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString } + val newReverseStar = + reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") } + assert(newReverseStar.vertices.map(_._2).collect.toSet === + (0 to n).map(x => "v%d".format(x)).toSet) } } From 752c0106e8d935875a06ec21b05de2c1f3dbc56a Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 20 Dec 2013 01:06:49 -0800 Subject: [PATCH 2/3] Test EdgePartition2D --- .../org/apache/spark/graph/GraphSuite.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 6494ef8900..e6c19dbc40 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -83,8 +83,32 @@ class GraphSuite extends FunSuite with LocalSparkContext { // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into // the same partition assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1) - // TODO(ankurdave): Test EdgePartition2D by checking the 2 * sqrt(p) bound on vertex - // replication + // partitionBy(EdgePartition2D) puts identical edges in the same partition + assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1) + + // partitionBy(EdgePartition2D) ensures that vertices need only be replicated to 2 * sqrt(p) + // partitions + val n = 100 + val p = 100 + val verts = 1 to n + val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x => + verts.filter(y => y % x == 0).map(y => (x: Vid, y: Vid))), p), 0) + assert(graph.edges.partitions.length === p) + val partitionedGraph = graph.partitionBy(EdgePartition2D) + assert(graph.edges.partitions.length === p) + val bound = 2 * math.sqrt(p) + // Each vertex should be replicated to at most 2 * sqrt(p) partitions + val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter => + val part = iter.next()._2 + Iterator((part.srcIds ++ part.dstIds).toSet) + }.collect + assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) + // This should not be true for the default hash partitioning + val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter => + val part = iter.next()._2 + Iterator((part.srcIds ++ part.dstIds).toSet) + }.collect + assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound)) } } From efc765cf1a287c398e3321c374263a740200fe89 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 20 Dec 2013 01:30:33 -0800 Subject: [PATCH 3/3] Test all methods on EdgePartition --- .../spark/graph/impl/EdgePartitionSuite.scala | 41 ++++++++++++++++--- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala index caedb55ea2..a52a5653e2 100644 --- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala @@ -4,15 +4,46 @@ import scala.util.Random import org.scalatest.FunSuite -import org.apache.spark.SparkContext -import org.apache.spark.graph.Graph._ import org.apache.spark.graph._ -import org.apache.spark.rdd._ - class EdgePartitionSuite extends FunSuite { - test("sort") { + test("reverse") { + val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0)) + val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0)) + val builder = new EdgePartitionBuilder[Int] + for (e <- edges) { + builder.add(e.srcId, e.dstId, e.attr) + } + val edgePartition = builder.toEdgePartition + assert(edgePartition.reverse.iterator.map(_.copy()).toList === reversedEdges) + assert(edgePartition.reverse.reverse.iterator.map(_.copy()).toList === edges) + } + + test("map") { + val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0)) + val builder = new EdgePartitionBuilder[Int] + for (e <- edges) { + builder.add(e.srcId, e.dstId, e.attr) + } + val edgePartition = builder.toEdgePartition + assert(edgePartition.map(e => e.srcId + e.dstId).iterator.map(_.copy()).toList === + edges.map(e => e.copy(attr = e.srcId + e.dstId))) + } + + test("groupEdges") { + val edges = List( + Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32)) + val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36)) + val builder = new EdgePartitionBuilder[Int] + for (e <- edges) { + builder.add(e.srcId, e.dstId, e.attr) + } + val edgePartition = builder.toEdgePartition + assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges) + } + + test("indexIterator") { val edgesFrom0 = List(Edge(0, 1, 0)) val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0)) val sortedEdges = edgesFrom0 ++ edgesFrom1