From cb0efe92d1e706472aa219617ec02df3b77dc9d3 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 3 Apr 2013 23:12:01 +0800 Subject: [PATCH] Oh wow it finally compiles! --- .../spark/graph/EdgeWithVerticesRDD.scala | 8 +- graph/src/main/scala/spark/graph/Graph.scala | 106 ++++++++++++++---- .../src/main/scala/spark/graph/GraphLab.scala | 37 ++++-- .../src/main/scala/spark/graph/package.scala | 2 +- 4 files changed, 115 insertions(+), 38 deletions(-) diff --git a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala index e787fe4e2c..f200f7cbf6 100644 --- a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala +++ b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala @@ -17,10 +17,10 @@ class EdgeWithVerticesPartition(idx: Int, val eTablePartition: Partition) extend * A RDD that brings together edge data with its associated vertex data. */ private[graph] -class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( +class EdgeWithVerticesRDD[VD: ClassManifest, ED: ClassManifest]( @transient vTable: RDD[(Vid, (VD, Array[Pid]))], eTable: RDD[(Pid, EdgePartition[ED])]) - extends RDD[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]](eTable.context, Nil) { + extends RDD[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])](eTable.context, Nil) { @transient private val shuffleDependency = { @@ -48,7 +48,7 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].eTablePartition) override def compute(s: Partition, context: TaskContext) - : Iterator[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]] = { + : Iterator[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])] = { val split = s.asInstanceOf[EdgeWithVerticesPartition] @@ -81,6 +81,6 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( e } } - (vmap, iter) + Iterator((vmap, iter)) } } diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 4f1b3b44d5..85345c28b0 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer import it.unimi.dsi.fastutil.ints.IntArrayList -import spark.{ClosureCleaner, HashPartitioner, SparkContext, RDD} +import spark.{ClosureCleaner, HashPartitioner, RDD} import spark.SparkContext._ import spark.graph.Graph.EdgePartition import spark.storage.StorageLevel @@ -37,37 +37,102 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl /** * A Graph RDD that supports computation on graphs. */ -class Graph[VD: Manifest, ED: Manifest]( - val rawVertices: RDD[Vertex[VD]], - val rawEdges: RDD[Edge[ED]]) { +class Graph[VD: ClassManifest, ED: ClassManifest] protected ( + _rawVertices: RDD[Vertex[VD]], + _rawEdges: RDD[Edge[ED]], + _rawVTable: RDD[(Vid, (VD, Array[Pid]))], + _rawETable: RDD[(Pid, EdgePartition[ED])]) { + + def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = this(vertices, edges, null, null) + + protected var _cached = false + + def cache(): Graph[VD, ED] = { + eTable.cache() + vTable.cache() + _cached = true + this + } var numEdgePartitions = 5 var numVertexPartitions = 5 - val vertexPartitioner = new HashPartitioner(numVertexPartitions) + protected val vertexPartitioner = new HashPartitioner(numVertexPartitions) - val edgePartitioner = new HashPartitioner(numEdgePartitions) + protected val edgePartitioner = new HashPartitioner(numEdgePartitions) - lazy val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable( - rawEdges, numEdgePartitions) + protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = { + if (_rawETable == null) { + Graph.createETable(_rawEdges, numEdgePartitions) + } else { + _rawETable + } + } - lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable( - rawVertices, eTable, numVertexPartitions) + protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = { + if (_rawVTable == null) { + Graph.createVTable(_rawVertices, eTable, numVertexPartitions) + } else { + _rawVTable + } + } - def vertices(): RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } + def vertices: RDD[Vertex[VD]] = { + if (!_cached && _rawVertices != null) { + _rawVertices + } else { + vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } + } + } - def edges(): RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator } + def edges: RDD[Edge[ED]] = { + if (!_cached && _rawEdges != null) { + _rawEdges + } else { + eTable.mapPartitions { iter => iter.next._2.iterator } + } + } - def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] = { - (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions { case(vmap, iter) => iter } + def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = { + (new EdgeWithVerticesRDD[VD, ED](vTable, eTable)).mapPartitions { part => part.next._2 } + } + + def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]) = { + ClosureCleaner.clean(f) + new Graph(vertices.map(f), edges) + } + + def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]) = { + ClosureCleaner.clean(f) + new Graph(vertices, edges.map(f)) + } + + def updateVertices[U: ClassManifest]( + updates: RDD[(Vid, U)], + updateFunc: (Vertex[VD], Seq[U]) => VD) + : Graph[VD, ED] = { + + ClosureCleaner.clean(updateFunc) + + val joined: RDD[(Vid, ((VD, Array[Pid]), Option[Seq[U]]))] = + vTable.leftOuterJoin(updates.groupByKey(vertexPartitioner)) + + val newVTable = (joined.mapPartitions({ iter => + iter.map { case (vid, ((vdata, pids), updates)) => + val newVdata = if (updates.isDefined) updateFunc(Vertex(vid, vdata), updates.get) else vdata + (vid, (newVdata, pids)) + } + }, preservesPartitioning = true)).cache() + + new Graph(null, null, newVTable, eTable) } def mapPartitions[U: ClassManifest]( - f: (VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U], + f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part => val (vmap, iter) = part.next() - iter.mapPartitions(f) + f(vmap, iter) }, preservesPartitioning) } @@ -80,7 +145,8 @@ object Graph { * A partition of edges in 3 large columnar arrays. */ private[graph] - class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: Manifest] { + class EdgePartition [@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED:ClassManifest] + { val srcIds: IntArrayList = new IntArrayList val dstIds: IntArrayList = new IntArrayList // TODO: Specialize data. @@ -117,7 +183,7 @@ object Graph { } private[graph] - def createVTable[VD: Manifest, ED: Manifest]( + def createVTable[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[Vertex[VD]], eTable: RDD[(Pid, EdgePartition[ED])], numPartitions: Int) = { @@ -145,7 +211,6 @@ object Graph { case (vdata, None) => (vdata, Array.empty[Pid]) case (vdata, Some(pids)) => (vdata, pids.toArray) } - .cache() } /** @@ -157,7 +222,7 @@ object Graph { * containing all the edges in a partition. */ private[graph] - def createETable[ED: Manifest](edges: RDD[Edge[ED]], numPartitions: Int) + def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int) : RDD[(Pid, EdgePartition[ED])] = { edges.map { e => @@ -170,7 +235,6 @@ object Graph { iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) } Iterator((pid, edgePartition)) }, preservesPartitioning = true) - .cache() } } diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index ebf001937b..3d3ce99a4b 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -1,45 +1,58 @@ package spark.graph +import scala.collection.JavaConversions._ + +import spark.RDD + class GraphLab { - def iterateGAS[A: Manifest, VD: Manifest, ED: Manifest]( + def iterateGAS[A: ClassManifest, VD: ClassManifest, ED: ClassManifest]( graph: Graph[VD, ED], gather: (Vid, EdgeWithVertices[VD, ED]) => A, merge: (A, A) => A, default: A, apply: (Vertex[VD], A) => VD, numIter: Int, - gatherEdges: EdgeDirection = EdgeDirection.In) = { + gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) = { - val g = new Graph[(VD, A), ED](graph.rawVertices.map(v => (v, default)), graph.rawEdges) + var g = graph.mapVertices(v => Vertex(v.id, VDataWithAcc(v.data, default))).cache() var i = 0 while (i < numIter) { - val gatherTable = g.mapPartitions { case(vmap, iter) => + val accUpdates: RDD[(Vid, A)] = g.mapPartitions({ case(vmap, iter) => val edgeSansAcc = new EdgeWithVertices[VD, ED]() - iter.map { edge: EdgeWithVertices[(VD, A), ED] => + iter.map { edge: EdgeWithVertices[VDataWithAcc[VD, A], ED] => edgeSansAcc.data = edge.data - edgeSansAcc.src.data = edge.src.data._1 - edgeSansAcc.dst.data = edge.dst.data._1 + edgeSansAcc.src.data = edge.src.data.vdata + edgeSansAcc.dst.data = edge.dst.data.vdata edgeSansAcc.src.id = edge.src.id edgeSansAcc.dst.id = edge.dst.id if (gatherEdges == EdgeDirection.In || gatherEdges == EdgeDirection.Both) { - edge.dst.data._2 = merge(edge.dst.data._2, gather(edgeSansAcc.dst.id, edgeSansAcc)) + edge.dst.data.acc = merge(edge.dst.data.acc, gather(edgeSansAcc.dst.id, edgeSansAcc)) } if (gatherEdges == EdgeDirection.Out || gatherEdges == EdgeDirection.Both) { - edge.src.data._2 = merge(edge.src.data._2, gather(edgeSansAcc.src.id, edgeSansAcc)) + edge.src.data.acc = merge(edge.src.data.acc, gather(edgeSansAcc.src.id, edgeSansAcc)) } } - vmap.int2ObjectEntrySet().fastIterator().map{ case (vid, (vdata, acc)) => (vid, acc) } - }.reduceByKey(graph.vertexPartitioner, false) + vmap.int2ObjectEntrySet().fastIterator().map{ entry => + (entry.getIntKey(), entry.getValue().acc) + } + })(classManifest[(Int, A)]) - gatherTable + def applyFunc(v: Vertex[VDataWithAcc[VD, A]], updates: Seq[A]): VDataWithAcc[VD, A] = { + VDataWithAcc(apply(Vertex(v.id, v.data.vdata), updates.reduce(merge)), default) + } + g = g.updateVertices(accUpdates, applyFunc).cache() i += 1 } } } + + +private[graph] +sealed case class VDataWithAcc[VD: ClassManifest, A](var vdata: VD, var acc: A) diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/spark/graph/package.scala index e900a27b27..fa63e488fc 100644 --- a/graph/src/main/scala/spark/graph/package.scala +++ b/graph/src/main/scala/spark/graph/package.scala @@ -5,7 +5,7 @@ package object graph { type Vid = Int type Pid = Int type Status = Boolean - type VertexHashMap = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap + type VertexHashMap[T] = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap[T] /** * Return the default null-like value for a data type T.