From 0791581346517c8fa55540703f667f30abba73a0 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 30 Jun 2013 23:07:40 -0700 Subject: [PATCH 1/3] More bug fixes --- .../main/scala/spark/graph/Analytics.scala | 2 +- graph/src/main/scala/spark/graph/Graph.scala | 23 +++++++++++-------- .../spark/graph/impl/EdgePartition.scala | 14 +++++++---- .../scala/spark/graph/impl/GraphImpl.scala | 11 +++++---- .../test/scala/spark/graph/GraphSuite.scala | 23 +++++++++++++++++++ 5 files changed, 52 insertions(+), 21 deletions(-) diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index 8f76622db0..8acf863ff8 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -51,7 +51,7 @@ object Analytics extends Logging { (me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather (a: Double, b: Double) => a + b, // merge 1.0, - numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } + numIter).mapVertices{ case Vertex(id, (outDeg, r)) => r } } // /** diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index ef058a1fb5..6724e4ede5 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -252,6 +252,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * * @tparam U the type of entry in the table of updates * @tparam VD2 the new vertex value type + * * @param table the table to join with the vertices in the graph. The table * should contain at most one entry for each vertex. * @param mapFunc the function used to compute the new vertex values. The @@ -329,24 +330,26 @@ object Graph { import spark.graph.impl._ import spark.SparkContext._ - def apply(rawEdges: RDD[(Int,Int)], uniqueEdges: Boolean = true): Graph[Int, Int] = { - // Reduce to unique edges - val edges = - if(uniqueEdges) rawEdges.map{ case (s,t) => ((s,t),1) }.reduceByKey( _ + _ ) - .map{ case ((s,t), cnt) => Edge(s,t,cnt) } - else rawEdges.map{ case (s,t) => Edge(s,t,1) } + def apply(rawEdges: RDD[(Int, Int)], uniqueEdges: Boolean = true): Graph[Int, Int] = { + // Reduce to unique edges. + val edges: RDD[Edge[Int]] = + if (uniqueEdges) { + rawEdges.map((_, 1)).reduceByKey(_ + _).map { case ((s, t), cnt) => Edge(s, t, cnt) } + } else { + rawEdges.map { case (s, t) => Edge(s, t, 1) } + } // Determine unique vertices - val vertices = edges.flatMap{ case Edge(s, t, cnt) => Array((s,1), (t,1)) }.reduceByKey( _ + _ ) + val vertices: RDD[Vertex[Int]] = edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) } + .reduceByKey(_ + _) .map{ case (id, deg) => Vertex(id, deg) } // Return graph new GraphImpl(vertices, edges) } - def apply[VD: ClassManifest, ED: ClassManifest](vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]): Graph[VD, ED] = { + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]): Graph[VD, ED] = { new GraphImpl(vertices, edges) - } - implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops } diff --git a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/spark/graph/impl/EdgePartition.scala index 0008534c0f..0e092541c9 100644 --- a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/spark/graph/impl/EdgePartition.scala @@ -1,6 +1,6 @@ package spark.graph.impl -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ArrayBuilder import it.unimi.dsi.fastutil.ints.IntArrayList @@ -13,21 +13,25 @@ import spark.graph._ private[graph] class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] { + private var _data: Array[ED] = _ + private var _dataBuilder = ArrayBuilder.make[ED] + val srcIds: IntArrayList = new IntArrayList val dstIds: IntArrayList = new IntArrayList - // TODO: Specialize data. - val data: ArrayBuffer[ED] = new ArrayBuffer[ED] + + def data: Array[ED] = _data /** Add a new edge to the partition. */ def add(src: Vid, dst: Vid, d: ED) { srcIds.add(src) dstIds.add(dst) - data += d + _dataBuilder += d } def trim() { srcIds.trim() dstIds.trim() + _data = _dataBuilder.result() } def size: Int = srcIds.size @@ -41,7 +45,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) override def next(): Edge[ED] = { edge.src = srcIds.get(pos) edge.dst = dstIds.get(pos) - edge.data = data(pos) + edge.data = _data(pos) pos += 1 edge } diff --git a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala index e18a2f6b09..3f5cbc3d75 100644 --- a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala @@ -150,7 +150,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } /** - * Same as mapReduceNeighborhood but map function can return none and there is no default value. + * Same as aggregateNeighbors but map function can return none and there is no default value. * As a consequence, the resulting table may be much smaller than the set of vertices. */ override def aggregateNeighbors[VD2: ClassManifest]( @@ -165,7 +165,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) } }, preservesPartitioning = true) - (new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable)) + new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable) .mapPartitions { part => val (vmap, edges) = part.next() val edgeSansAcc = new EdgeTriplet[VD, ED]() @@ -188,7 +188,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { e.dst.data._2 = - if (e.dst.data._2.isEmpty) { + if (e.src.data._2.isEmpty) { mapFunc(edgeSansAcc.src.id, edgeSansAcc) } else { val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc) @@ -218,7 +218,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } }, preservesPartitioning = true).cache() - new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable) + new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable) } override def joinVertices[U: ClassManifest]( @@ -239,7 +239,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } }, preservesPartitioning = true).cache() - new GraphImpl(newVTable.partitions.size, eTable.partitions.size, null, null, newVTable, eTable) + new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable) } @@ -307,6 +307,7 @@ object GraphImpl { .mapPartitionsWithIndex({ (pid, iter) => val edgePartition = new EdgePartition[ED] iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) } + edgePartition.trim() Iterator((pid, edgePartition)) }, preservesPartitioning = true) } diff --git a/graph/src/test/scala/spark/graph/GraphSuite.scala b/graph/src/test/scala/spark/graph/GraphSuite.scala index 3d250aa18c..87c8c158af 100644 --- a/graph/src/test/scala/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/spark/graph/GraphSuite.scala @@ -3,10 +3,33 @@ package spark.graph import org.scalatest.FunSuite import spark.SparkContext +import spark.graph.impl.GraphImpl class GraphSuite extends FunSuite with LocalSparkContext { + test("aggregateNeighbors") { + + } + + test("joinVertices") { + sc = new SparkContext("local", "test") + val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two"), Vertex(3, "three")), 2) + val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) + val g: Graph[String, String] = new GraphImpl(vertices, edges) + + val tbl = sc.parallelize(Seq((1, 10), (2, 20))) + val g1 = g.joinVertices(tbl, (v: Vertex[String], u: Int) => v.data + u) + + val v = g1.vertices.collect().sortBy(_.id) + assert(v(0).data === "one10") + assert(v(1).data === "two20") + assert(v(2).data === "three") + + val e = g1.edges.collect() + assert(e(0).data === "onetwo") + } + // test("graph partitioner") { // sc = new SparkContext("local", "test") // val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two"))) From 2943edf8eece20b21ff568b401cc5e8323ce9c07 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 1 Jul 2013 00:24:30 -0700 Subject: [PATCH 2/3] Fixed another bug .. --- graph/src/main/scala/spark/graph/impl/GraphImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala index 3f5cbc3d75..711446d8e0 100644 --- a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala @@ -110,7 +110,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) } }, preservesPartitioning = true) - (new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable)) + new EdgeTripletRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable) .mapPartitions { part => val (vmap, edges) = part.next() val edgeSansAcc = new EdgeTriplet[VD, ED]() @@ -187,7 +187,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } } if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) { - e.dst.data._2 = + e.src.data._2 = if (e.src.data._2.isEmpty) { mapFunc(edgeSansAcc.src.id, edgeSansAcc) } else { From 2f2c7e6a294ae9297ae7c9b84cfc352384d373aa Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 1 Jul 2013 16:23:23 -0700 Subject: [PATCH 3/3] Added a correctEdges function. --- graph/src/main/scala/spark/graph/Graph.scala | 1 + graph/src/main/scala/spark/graph/impl/GraphImpl.scala | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 6724e4ede5..f51171f0d6 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -141,6 +141,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { def mapTriplets[ED2: ClassManifest]( map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] + def correctEdges(): Graph[VD, ED] /** * Construct a new graph with all the edges reversed. If this graph contains diff --git a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala index 711446d8e0..775d0686e7 100644 --- a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/spark/graph/impl/GraphImpl.scala @@ -91,6 +91,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( newGraph(vertices, triplets.map(e => Edge(e.src.id, e.dst.id, f(e)))) } + override def correctEdges(): Graph[VD, ED] = { + val sc = vertices.context + val vset = sc.broadcast(vertices.map(_.id).collect().toSet) + val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst)) + Graph(vertices, newEdges) + } ////////////////////////////////////////////////////////////////////////////////////////////////// // Lower level transformation methods