Merge branch 'subgraph-test' of github.com:ankurdave/graphx into clean1

Conflicts:
	graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
This commit is contained in:
Reynold Xin 2013-11-30 14:48:43 -08:00
commit 8e790b7f7a
2 changed files with 13 additions and 3 deletions

View file

@ -2,7 +2,7 @@ package org.apache.spark.graph.impl
import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.Logging
import org.apache.spark.graph._
@ -184,8 +184,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
for ((k, v) <- this.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
}
// TODO: Is this a bug? Why are we using index.getBitSet here?
new VertexPartition(hashMap.keySet, hashMap._values, index.getBitSet)
new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
}
def iterator: Iterator[(Vid, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind)))

View file

@ -134,4 +134,15 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
test("subgraph") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 10
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "defaultValue")
val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0)
assert(subgraph.vertices.collect().toSet ===
(0 to n / 2).map(x => (x * 2, "defaultValue")).toSet)
assert(subgraph.edges.collect().toSet === (1 to n / 2).map(x => Edge(0, x * 2)).toSet)
}
}
}