commit
ac70b8f234
|
@ -83,8 +83,32 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
// partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
|
||||
// the same partition
|
||||
assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
|
||||
// TODO(ankurdave): Test EdgePartition2D by checking the 2 * sqrt(p) bound on vertex
|
||||
// replication
|
||||
// partitionBy(EdgePartition2D) puts identical edges in the same partition
|
||||
assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)
|
||||
|
||||
// partitionBy(EdgePartition2D) ensures that vertices need only be replicated to 2 * sqrt(p)
|
||||
// partitions
|
||||
val n = 100
|
||||
val p = 100
|
||||
val verts = 1 to n
|
||||
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
|
||||
verts.filter(y => y % x == 0).map(y => (x: Vid, y: Vid))), p), 0)
|
||||
assert(graph.edges.partitions.length === p)
|
||||
val partitionedGraph = graph.partitionBy(EdgePartition2D)
|
||||
assert(graph.edges.partitions.length === p)
|
||||
val bound = 2 * math.sqrt(p)
|
||||
// Each vertex should be replicated to at most 2 * sqrt(p) partitions
|
||||
val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter =>
|
||||
val part = iter.next()._2
|
||||
Iterator((part.srcIds ++ part.dstIds).toSet)
|
||||
}.collect
|
||||
assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
|
||||
// This should not be true for the default hash partitioning
|
||||
val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
|
||||
val part = iter.next()._2
|
||||
Iterator((part.srcIds ++ part.dstIds).toSet)
|
||||
}.collect
|
||||
assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,7 +138,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
|
||||
test("mapTriplets") {
|
||||
// TODO(ankurdave): Write the test
|
||||
withSpark { sc =>
|
||||
val n = 5
|
||||
val star = starGraph(sc, n)
|
||||
assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
|
||||
(1L to n).map(x => Edge(0, x, "vv")).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
test("reverse") {
|
||||
|
@ -223,12 +252,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
withSpark { sc =>
|
||||
val n = 5
|
||||
val reverseStar = starGraph(sc, n).reverse
|
||||
// outerJoinVertices changing type
|
||||
val reverseStarDegrees =
|
||||
reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
|
||||
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
|
||||
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
|
||||
(a: Int, b: Int) => a + b).collect.toSet
|
||||
assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0)))
|
||||
// outerJoinVertices preserving type
|
||||
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
|
||||
val newReverseStar =
|
||||
reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
|
||||
assert(newReverseStar.vertices.map(_._2).collect.toSet ===
|
||||
(0 to n).map(x => "v%d".format(x)).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,15 +4,46 @@ import scala.util.Random
|
|||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.graph.Graph._
|
||||
import org.apache.spark.graph._
|
||||
import org.apache.spark.rdd._
|
||||
|
||||
|
||||
class EdgePartitionSuite extends FunSuite {
|
||||
|
||||
test("sort") {
|
||||
test("reverse") {
|
||||
val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
|
||||
val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0))
|
||||
val builder = new EdgePartitionBuilder[Int]
|
||||
for (e <- edges) {
|
||||
builder.add(e.srcId, e.dstId, e.attr)
|
||||
}
|
||||
val edgePartition = builder.toEdgePartition
|
||||
assert(edgePartition.reverse.iterator.map(_.copy()).toList === reversedEdges)
|
||||
assert(edgePartition.reverse.reverse.iterator.map(_.copy()).toList === edges)
|
||||
}
|
||||
|
||||
test("map") {
|
||||
val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
|
||||
val builder = new EdgePartitionBuilder[Int]
|
||||
for (e <- edges) {
|
||||
builder.add(e.srcId, e.dstId, e.attr)
|
||||
}
|
||||
val edgePartition = builder.toEdgePartition
|
||||
assert(edgePartition.map(e => e.srcId + e.dstId).iterator.map(_.copy()).toList ===
|
||||
edges.map(e => e.copy(attr = e.srcId + e.dstId)))
|
||||
}
|
||||
|
||||
test("groupEdges") {
|
||||
val edges = List(
|
||||
Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32))
|
||||
val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36))
|
||||
val builder = new EdgePartitionBuilder[Int]
|
||||
for (e <- edges) {
|
||||
builder.add(e.srcId, e.dstId, e.attr)
|
||||
}
|
||||
val edgePartition = builder.toEdgePartition
|
||||
assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
|
||||
}
|
||||
|
||||
test("indexIterator") {
|
||||
val edgesFrom0 = List(Edge(0, 1, 0))
|
||||
val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
|
||||
val sortedEdges = edgesFrom0 ++ edgesFrom1
|
||||
|
|
Loading…
Reference in a new issue