diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index ee368ebb41..63858db2ef 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -44,7 +44,7 @@ class EdgeRDD[@specialized ED: ClassManifest]( override def cache(): EdgeRDD[ED] = persist() def mapEdgePartitions[ED2: ClassManifest](f: EdgePartition[ED] => EdgePartition[ED2]) - : EdgeRDD[ED2]= { + : EdgeRDD[ED2] = { new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => val (pid, ep) = iter.next() Iterator(Tuple2(pid, f(ep))) @@ -60,6 +60,27 @@ class EdgeRDD[@specialized ED: ClassManifest]( } } + def zipEdgePartitions[ED2: ClassManifest, ED3: ClassManifest] + (other: EdgeRDD[ED2]) + (f: (EdgePartition[ED], EdgePartition[ED2]) => EdgePartition[ED3]): EdgeRDD[ED3] = { + new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, preservesPartitioning = true) { + (thisIter, otherIter) => + val (pid, thisEPart) = thisIter.next() + val (_, otherEPart) = otherIter.next() + Iterator(Tuple2(pid, f(thisEPart, otherEPart))) + }) + } + + def innerJoin[ED2: ClassManifest, ED3: ClassManifest] + (other: EdgeRDD[ED2]) + (f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = { + val ed2Manifest = classManifest[ED2] + val ed3Manifest = classManifest[ED3] + zipEdgePartitions(other) { (thisEPart, otherEPart) => + thisEPart.innerJoin(otherEPart)(f)(ed2Manifest, ed3Manifest) + } + } + def collectVids(): RDD[Vid] = { partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 0dc5ec8b24..e8fa8e611c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -48,7 +48,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * along with their vertex data. * */ - val edges: RDD[Edge[ED]] + val edges: EdgeRDD[ED] /** * Get the edges with the vertex data associated with the adjacent @@ -197,6 +197,14 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] + /** + * Subgraph of this graph with only vertices and edges from the other graph. + * @param other the graph to project this graph onto + * @return a graph with vertices and edges that exists in both the current graph and other, + * with vertex and edge data from the current graph. + */ + def mask[VD2: ClassManifest, ED2: ClassManifest](other: Graph[VD2, ED2]): Graph[VD, ED] + /** * This function merges multiple edges between two vertices into a single Edge. For correct * results, the graph must have been partitioned using partitionBy. diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index 7b9eb88b7b..091c778275 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -237,4 +237,35 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { graph.outerJoinVertices(table)(uf) } + /** + * Filter the graph by computing some values to filter on, and applying the predicates. + * + * @param preprocess a function to compute new vertex and edge data before filtering + * @param epred edge pred to filter on after preprocess, see more details under Graph#subgraph + * @param vpred vertex pred to filter on after prerocess, see more details under Graph#subgraph + * @tparam VD2 vertex type the vpred operates on + * @tparam ED2 edge type the epred operates on + * @return a subgraph of the orginal graph, with its data unchanged + * + * @example This function can be used to filter the graph based on some property, without + * changing the vertex and edge values in your program. For example, we could remove the vertices + * in a graph with 0 outdegree + * + * {{{ + * graph.filter( + * graph => { + * val degrees: VertexSetRDD[Int] = graph.outDegrees + * graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} + * }, + * vpred = (vid: Vid, deg:Int) => deg > 0 + * ) + * }}} + * + */ + def filter[VD2: ClassManifest, ED2: ClassManifest]( + preprocess: Graph[VD, ED] => Graph[VD2, ED2], + epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true, + vpred: (Vid, VD2) => Boolean = (v:Vid, d:VD2) => true): Graph[VD, ED] = { + graph.mask(preprocess(graph).subgraph(epred, vpred)) + } } // end of GraphOps diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index bfdafcc542..e97522feae 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -98,6 +98,40 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) builder.toEdgePartition } + /** + * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition + * containing the resulting edges. + * + * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for + * each edge, but each time it may be invoked on any corresponding edge in `other`. + * + * If there are multiple edges with the same src and dst in `other`, `f` will only be invoked + * once. + */ + def innerJoin[ED2: ClassManifest, ED3: ClassManifest] + (other: EdgePartition[ED2]) + (f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = { + val builder = new EdgePartitionBuilder[ED3] + var i = 0 + var j = 0 + // For i = index of each edge in `this`... + while (i < size && j < other.size) { + val srcId = this.srcIds(i) + val dstId = this.dstIds(i) + // ... forward j to the index of the corresponding edge in `other`, and... + while (j < other.size && other.srcIds(j) < srcId) { j += 1 } + if (j < other.size && other.srcIds(j) == srcId) { + while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 } + if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) { + // ... run `f` on the matching edge + builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j))) + } + } + i += 1 + } + builder.toEdgePartition + } + /** * The number of edges in this partition * diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 0adc350187..e7f975253a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -215,6 +215,14 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( new GraphImpl(newVTable, newETable) } // end of subgraph + override def mask[VD2: ClassManifest, ED2: ClassManifest] ( + other: Graph[VD2, ED2]): Graph[VD, ED] = { + val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v } + val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v } + new GraphImpl(newVerts, newEdges) + + } + override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { ClosureCleaner.clean(merge) val newETable = edges.mapEdgePartitions(_.groupEdges(merge)) diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index a85a31f79d..f6bb201a83 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -5,7 +5,9 @@ import scala.util.Random import org.scalatest.FunSuite import org.apache.spark.SparkContext +import org.apache.spark.graph.Graph._ import org.apache.spark.graph.LocalSparkContext._ +import org.apache.spark.graph.impl.EdgePartition import org.apache.spark.graph.impl.EdgePartitionBuilder import org.apache.spark.rdd._ @@ -183,6 +185,53 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("mask") { + withSpark(new SparkContext("local", "test")) { sc => + val n = 5 + val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) + val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) + val graph: Graph[Int, Int] = Graph(vertices, edges) + + val subgraph = graph.subgraph( + e => e.dstId != 4L, + (vid, vdata) => vid != 3L + ).mapVertices((vid, vdata) => -1).mapEdges(e => -1) + + val projectedGraph = graph.mask(subgraph) + + val v = projectedGraph.vertices.collect().toSet + assert(v === Set((0,0), (1,1), (2,2), (4,4), (5,5))) + + // the map is necessary because of object-reuse in the edge iterator + val e = projectedGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet + assert(e === Set(Edge(0,1,1), Edge(0,2,2), Edge(0,5,5))) + + } + } + + test ("filter") { + withSpark(new SparkContext("local", "test")) { sc => + val n = 5 + val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) + val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) + val graph: Graph[Int, Int] = Graph(vertices, edges) + val filteredGraph = graph.filter( + graph => { + val degrees: VertexRDD[Int] = graph.outDegrees + graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} + }, + vpred = (vid: Vid, deg:Int) => deg > 0 + ) + + val v = filteredGraph.vertices.collect().toSet + assert(v === Set((0,0))) + + // the map is necessary because of object-reuse in the edge iterator + val e = filteredGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet + assert(e.isEmpty) + } + } + test("VertexSetRDD") { withSpark(new SparkContext("local", "test")) { sc => val n = 100 @@ -231,4 +280,19 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0) assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) } + + test("EdgePartition.innerJoin") { + def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { + val builder = new EdgePartitionBuilder[A] + for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) } + builder.toEdgePartition + } + val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) + val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0)) + val a = makeEdgePartition(aList) + val b = makeEdgePartition(bList) + + assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList === + List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) + } }