diff --git a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala index e787fe4e2c..f200f7cbf6 100644 --- a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala +++ b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala @@ -17,10 +17,10 @@ class EdgeWithVerticesPartition(idx: Int, val eTablePartition: Partition) extend * A RDD that brings together edge data with its associated vertex data. */ private[graph] -class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( +class EdgeWithVerticesRDD[VD: ClassManifest, ED: ClassManifest]( @transient vTable: RDD[(Vid, (VD, Array[Pid]))], eTable: RDD[(Pid, EdgePartition[ED])]) - extends RDD[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]](eTable.context, Nil) { + extends RDD[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])](eTable.context, Nil) { @transient private val shuffleDependency = { @@ -48,7 +48,7 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].eTablePartition) override def compute(s: Partition, context: TaskContext) - : Iterator[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]] = { + : Iterator[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])] = { val split = s.asInstanceOf[EdgeWithVerticesPartition] @@ -81,6 +81,6 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( e } } - (vmap, iter) + Iterator((vmap, iter)) } } diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 6e662ab1fb..5a1e06acdd 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -5,7 +5,8 @@ import scala.collection.mutable.ArrayBuffer import it.unimi.dsi.fastutil.ints.IntArrayList -import spark.{ClosureCleaner, HashPartitioner, SparkContext, RDD} +import spark.{ClosureCleaner, HashPartitioner, RDD} +import spark.SparkContext import spark.SparkContext._ import spark.graph.Graph.EdgePartition import spark.storage.StorageLevel @@ -13,7 +14,10 @@ import spark.storage.StorageLevel case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] ( var id: Vid = 0, - var data: VD = nullValue[VD]) + var data: VD = nullValue[VD]) { + + def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2) +} case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( @@ -37,37 +41,102 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl /** * A Graph RDD that supports computation on graphs. */ -class Graph[VD: Manifest, ED: Manifest]( - val rawVertices: RDD[Vertex[VD]], - val rawEdges: RDD[Edge[ED]]) { +class Graph[VD: ClassManifest, ED: ClassManifest] protected ( + _rawVertices: RDD[Vertex[VD]], + _rawEdges: RDD[Edge[ED]], + _rawVTable: RDD[(Vid, (VD, Array[Pid]))], + _rawETable: RDD[(Pid, EdgePartition[ED])]) { + + def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = this(vertices, edges, null, null) + + protected var _cached = false + + def cache(): Graph[VD, ED] = { + eTable.cache() + vTable.cache() + _cached = true + this + } var numEdgePartitions = 5 var numVertexPartitions = 5 - val vertexPartitioner = new HashPartitioner(numVertexPartitions) + protected val vertexPartitioner = new HashPartitioner(numVertexPartitions) - val edgePartitioner = new HashPartitioner(numEdgePartitions) + protected val edgePartitioner = new HashPartitioner(numEdgePartitions) - lazy val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable( - rawEdges, numEdgePartitions) + protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = { + if (_rawETable == null) { + Graph.createETable(_rawEdges, numEdgePartitions) + } else { + _rawETable + } + } - lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable( - rawVertices, eTable, numVertexPartitions) + protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = { + if (_rawVTable == null) { + Graph.createVTable(_rawVertices, eTable, numVertexPartitions) + } else { + _rawVTable + } + } - def vertices(): RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } + def vertices: RDD[Vertex[VD]] = { + if (!_cached && _rawVertices != null) { + _rawVertices + } else { + vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } + } + } - def edges(): RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator } + def edges: RDD[Edge[ED]] = { + if (!_cached && _rawEdges != null) { + _rawEdges + } else { + eTable.mapPartitions { iter => iter.next._2.iterator } + } + } - def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] = { - (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions { case(vmap, iter) => iter } + def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = { + (new EdgeWithVerticesRDD[VD, ED](vTable, eTable)).mapPartitions { part => part.next._2 } + } + + def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]) = { + ClosureCleaner.clean(f) + new Graph(vertices.map(f), edges) + } + + def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]) = { + ClosureCleaner.clean(f) + new Graph(vertices, edges.map(f)) + } + + def updateVertices[U: ClassManifest]( + updates: RDD[(Vid, U)], + updateFunc: (Vertex[VD], Seq[U]) => VD) + : Graph[VD, ED] = { + + ClosureCleaner.clean(updateFunc) + + val joined: RDD[(Vid, ((VD, Array[Pid]), Option[Seq[U]]))] = + vTable.leftOuterJoin(updates.groupByKey(vertexPartitioner)) + + val newVTable = (joined.mapPartitions({ iter => + iter.map { case (vid, ((vdata, pids), updates)) => + val newVdata = if (updates.isDefined) updateFunc(Vertex(vid, vdata), updates.get) else vdata + (vid, (newVdata, pids)) + } + }, preservesPartitioning = true)).cache() + + new Graph(null, null, newVTable, eTable) } def mapPartitions[U: ClassManifest]( - f: (VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U], + f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part => val (vmap, iter) = part.next() - iter.mapPartitions(f) + f(vmap, iter) }, preservesPartitioning) } @@ -76,11 +145,48 @@ class Graph[VD: Manifest, ED: Manifest]( object Graph { + /** + * Load an edge list from file initializing the Graph RDD + */ + def textFile[ED: ClassManifest](sc: SparkContext, + fname: String, edgeParser: Array[String] => ED) = { + + // Parse the edge data table + val edges = sc.textFile(fname).map { line => + val lineArray = line.split("\\s+") + if(lineArray.length < 2) { + println("Invalid line: " + line) + assert(false) + } + val source = lineArray(0) + val target = lineArray(1) + val tail = lineArray.drop(2) + val edata = edgeParser(tail) + Edge(source.trim.toInt, target.trim.toInt, edata) + }.cache() + + // Parse the vertex data table + val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) } + .reduceByKey(_ + _) + .map(new Vertex(_)) + + val graph = new Graph[Int, ED](vertices, edges) + graph.cache() + + println("Loaded graph:" + + "\n\t#edges: " + graph.edges.count + + "\n\t#vertices: " + graph.vertices.count) + + graph + } + + /** * A partition of edges in 3 large columnar arrays. */ private[graph] - class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: Manifest] { + class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED:ClassManifest] + { val srcIds: IntArrayList = new IntArrayList val dstIds: IntArrayList = new IntArrayList // TODO: Specialize data. @@ -104,7 +210,7 @@ object Graph { private var edge = new Edge[ED] private var pos = 0 - override def hasNext: Boolean = pos < size + override def hasNext: Boolean = pos < EdgePartition.this.size override def next(): Edge[ED] = { edge.src = srcIds.get(pos) @@ -117,7 +223,7 @@ object Graph { } private[graph] - def createVTable[VD: Manifest, ED: Manifest]( + def createVTable[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[Vertex[VD]], eTable: RDD[(Pid, EdgePartition[ED])], numPartitions: Int) = { @@ -145,7 +251,6 @@ object Graph { case (vdata, None) => (vdata, Array.empty[Pid]) case (vdata, Some(pids)) => (vdata, pids.toArray) } - .cache() } /** @@ -157,7 +262,7 @@ object Graph { * containing all the edges in a partition. */ private[graph] - def createETable[ED: Manifest](edges: RDD[Edge[ED]], numPartitions: Int) + def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int) : RDD[(Pid, EdgePartition[ED])] = { edges.map { e => @@ -170,7 +275,6 @@ object Graph { iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) } Iterator((pid, edgePartition)) }, preservesPartitioning = true) - .cache() } /** diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala index d90fdc3369..ce5b3f6e19 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -1,45 +1,57 @@ package spark.graph +import scala.collection.JavaConversions._ +import spark.RDD + object GraphLab { - def iterateGAS[A: Manifest, VD: Manifest, ED: Manifest]( + def iterateGAS[A: ClassManifest, VD: ClassManifest, ED: ClassManifest]( graph: Graph[VD, ED], gather: (Vid, EdgeWithVertices[VD, ED]) => A, merge: (A, A) => A, default: A, apply: (Vertex[VD], A) => VD, numIter: Int, - gatherEdges: EdgeDirection = EdgeDirection.In) = { + gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) = { - val g = new Graph[(VD, A), ED](graph.rawVertices.map(v => (v, default)), graph.rawEdges) + var g = graph.mapVertices(v => Vertex(v.id, VDataWithAcc(v.data, default))).cache() var i = 0 while (i < numIter) { - val gatherTable = g.mapPartitions { case(vmap, iter) => + val accUpdates: RDD[(Vid, A)] = g.mapPartitions({ case(vmap, iter) => val edgeSansAcc = new EdgeWithVertices[VD, ED]() - iter.map { edge: EdgeWithVertices[(VD, A), ED] => + iter.map { edge: EdgeWithVertices[VDataWithAcc[VD, A], ED] => edgeSansAcc.data = edge.data - edgeSansAcc.src.data = edge.src.data._1 - edgeSansAcc.dst.data = edge.dst.data._1 + edgeSansAcc.src.data = edge.src.data.vdata + edgeSansAcc.dst.data = edge.dst.data.vdata edgeSansAcc.src.id = edge.src.id edgeSansAcc.dst.id = edge.dst.id if (gatherEdges == EdgeDirection.In || gatherEdges == EdgeDirection.Both) { - edge.dst.data._2 = merge(edge.dst.data._2, gather(edgeSansAcc.dst.id, edgeSansAcc)) + edge.dst.data.acc = merge(edge.dst.data.acc, gather(edgeSansAcc.dst.id, edgeSansAcc)) } if (gatherEdges == EdgeDirection.Out || gatherEdges == EdgeDirection.Both) { - edge.src.data._2 = merge(edge.src.data._2, gather(edgeSansAcc.src.id, edgeSansAcc)) + edge.src.data.acc = merge(edge.src.data.acc, gather(edgeSansAcc.src.id, edgeSansAcc)) } } - vmap.int2ObjectEntrySet().fastIterator().map{ case (vid, (vdata, acc)) => (vid, acc) } - }.reduceByKey(graph.vertexPartitioner, false) + vmap.int2ObjectEntrySet().fastIterator().map{ entry => + (entry.getIntKey(), entry.getValue().acc) + } + })(classManifest[(Int, A)]) - gatherTable + def applyFunc(v: Vertex[VDataWithAcc[VD, A]], updates: Seq[A]): VDataWithAcc[VD, A] = { + VDataWithAcc(apply(Vertex(v.id, v.data.vdata), updates.reduce(merge)), default) + } + g = g.updateVertices(accUpdates, applyFunc).cache() i += 1 } } } + + +private[graph] +sealed case class VDataWithAcc[VD: ClassManifest, A](var vdata: VD, var acc: A) diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/spark/graph/package.scala index e900a27b27..6db050e6e1 100644 --- a/graph/src/main/scala/spark/graph/package.scala +++ b/graph/src/main/scala/spark/graph/package.scala @@ -4,8 +4,8 @@ package object graph { type Vid = Int type Pid = Int - type Status = Boolean - type VertexHashMap = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap + + type VertexHashMap[T] = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap[T] /** * Return the default null-like value for a data type T.