From 0f137e8b75497e61f8d9fec98896cd912f27c3ed Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 16 Dec 2013 23:52:34 -0800 Subject: [PATCH] Reimplement Graph.mask using innerJoin --- .../org/apache/spark/graph/EdgeRDD.scala | 23 ++++++++++- .../scala/org/apache/spark/graph/Graph.scala | 2 +- .../spark/graph/impl/EdgePartition.scala | 34 ++++++++++++++++ .../apache/spark/graph/impl/GraphImpl.scala | 39 +++---------------- .../org/apache/spark/graph/GraphSuite.scala | 23 +++++++++-- 5 files changed, 83 insertions(+), 38 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index ee368ebb41..63858db2ef 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -44,7 +44,7 @@ class EdgeRDD[@specialized ED: ClassManifest]( override def cache(): EdgeRDD[ED] = persist() def mapEdgePartitions[ED2: ClassManifest](f: EdgePartition[ED] => EdgePartition[ED2]) - : EdgeRDD[ED2]= { + : EdgeRDD[ED2] = { new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => val (pid, ep) = iter.next() Iterator(Tuple2(pid, f(ep))) @@ -60,6 +60,27 @@ class EdgeRDD[@specialized ED: ClassManifest]( } } + def zipEdgePartitions[ED2: ClassManifest, ED3: ClassManifest] + (other: EdgeRDD[ED2]) + (f: (EdgePartition[ED], EdgePartition[ED2]) => EdgePartition[ED3]): EdgeRDD[ED3] = { + new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, preservesPartitioning = true) { + (thisIter, otherIter) => + val (pid, thisEPart) = thisIter.next() + val (_, otherEPart) = otherIter.next() + Iterator(Tuple2(pid, f(thisEPart, otherEPart))) + }) + } + + def innerJoin[ED2: ClassManifest, ED3: ClassManifest] + (other: EdgeRDD[ED2]) + (f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = { + val ed2Manifest = classManifest[ED2] + val ed3Manifest = classManifest[ED3] + zipEdgePartitions(other) { (thisEPart, otherEPart) => + thisEPart.innerJoin(otherEPart)(f)(ed2Manifest, ed3Manifest) + } + } + def collectVids(): RDD[Vid] = { partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index e544650963..e8fa8e611c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -48,7 +48,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * along with their vertex data. * */ - val edges: RDD[Edge[ED]] + val edges: EdgeRDD[ED] /** * Get the edges with the vertex data associated with the adjacent diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index bfdafcc542..e97522feae 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -98,6 +98,40 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) builder.toEdgePartition } + /** + * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition + * containing the resulting edges. + * + * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for + * each edge, but each time it may be invoked on any corresponding edge in `other`. + * + * If there are multiple edges with the same src and dst in `other`, `f` will only be invoked + * once. + */ + def innerJoin[ED2: ClassManifest, ED3: ClassManifest] + (other: EdgePartition[ED2]) + (f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = { + val builder = new EdgePartitionBuilder[ED3] + var i = 0 + var j = 0 + // For i = index of each edge in `this`... + while (i < size && j < other.size) { + val srcId = this.srcIds(i) + val dstId = this.dstIds(i) + // ... forward j to the index of the corresponding edge in `other`, and... + while (j < other.size && other.srcIds(j) < srcId) { j += 1 } + if (j < other.size && other.srcIds(j) == srcId) { + while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 } + if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) { + // ... run `f` on the matching edge + builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j))) + } + } + i += 1 + } + builder.toEdgePartition + } + /** * The number of edges in this partition * diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 2fe02718e9..e7f975253a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -216,7 +216,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } // end of subgraph override def mask[VD2: ClassManifest, ED2: ClassManifest] ( - other: Graph[VD2, ED2]) : Graph[VD, ED] = GraphImpl.mask(this, other) + other: Graph[VD2, ED2]): Graph[VD, ED] = { + val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v } + val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v } + new GraphImpl(newVerts, newEdges) + + } override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { ClosureCleaner.clean(merge) @@ -379,38 +384,6 @@ object GraphImpl { new EdgeRDD(edges) } - def mask[VD: ClassManifest, ED: ClassManifest, VD2: ClassManifest, ED2: ClassManifest] ( - thisGraph: Graph[VD, ED], otherGraph: Graph[VD2, ED2]) : Graph[VD, ED] = { - // basically vertices.join(other.vertices) - // written this way to take advantage of fast join in VertexSetRDDs - val newVTable = VertexSetRDD( - thisGraph.vertices.leftJoin(otherGraph.vertices)((vid, v, w) => if (w.isEmpty) None else Some(v)) - .filter{case (vid, opt) => !opt.isEmpty} - .map{case (vid, opt) => (vid, opt.get)} - ) - - // TODO(amatsukawa): safer way to downcast? case matching perhaps? - val thisImpl = thisGraph.asInstanceOf[GraphImpl[VD, ED]] - val otherImpl = otherGraph.asInstanceOf[GraphImpl[VD2, ED2]] - val newETable = thisImpl.eTable.zipPartitions(otherImpl.eTable) { - // extract two edge partitions, keep all edges in in this partition that is - // also in the other partition - (thisIter, otherIter) => - val (_, otherEPart) = otherIter.next() - val otherEdges = otherEPart.iterator.map(e => (e.srcId, e.dstId)).toSet - val (pid, thisEPart) = thisIter.next() - val newEPartBuilder = new EdgePartitionBuilder[ED] - thisEPart.foreach { e => - if (otherEdges.contains((e.srcId, e.dstId))) - newEPartBuilder.add(e.srcId, e.dstId, e.attr) - } - Iterator((pid, newEPartBuilder.toEdgePartition)) - }.partitionBy(thisImpl.eTable.partitioner.get) - - val newVertexPlacement = new VertexPlacement(newETable, newVTable) - new GraphImpl(newVTable, newETable, newVertexPlacement, thisImpl.partitioner) - } - private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest]( edges: EdgeRDD[ED], defaultVertexAttr: VD): GraphImpl[VD, ED] = { diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index fae6eb5525..f6bb201a83 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -5,7 +5,9 @@ import scala.util.Random import org.scalatest.FunSuite import org.apache.spark.SparkContext +import org.apache.spark.graph.Graph._ import org.apache.spark.graph.LocalSparkContext._ +import org.apache.spark.graph.impl.EdgePartition import org.apache.spark.graph.impl.EdgePartitionBuilder import org.apache.spark.rdd._ @@ -183,7 +185,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test("projectGraph") { + test("mask") { withSpark(new SparkContext("local", "test")) { sc => val n = 5 val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) @@ -207,7 +209,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test ("filterGraph") { + test ("filter") { withSpark(new SparkContext("local", "test")) { sc => val n = 5 val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) @@ -215,7 +217,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val graph: Graph[Int, Int] = Graph(vertices, edges) val filteredGraph = graph.filter( graph => { - val degrees: VertexSetRDD[Int] = graph.outDegrees + val degrees: VertexRDD[Int] = graph.outDegrees graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} }, vpred = (vid: Vid, deg:Int) => deg > 0 @@ -278,4 +280,19 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0) assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) } + + test("EdgePartition.innerJoin") { + def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { + val builder = new EdgePartitionBuilder[A] + for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) } + builder.toEdgePartition + } + val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) + val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0)) + val a = makeEdgePartition(aList) + val b = makeEdgePartition(bList) + + assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList === + List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) + } }