merged with trunk

This commit is contained in:
Joseph E. Gonzalez 2013-04-03 08:47:49 -07:00
commit c649073b5f
4 changed files with 157 additions and 41 deletions

View file

@ -17,10 +17,10 @@ class EdgeWithVerticesPartition(idx: Int, val eTablePartition: Partition) extend
* A RDD that brings together edge data with its associated vertex data. * A RDD that brings together edge data with its associated vertex data.
*/ */
private[graph] private[graph]
class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( class EdgeWithVerticesRDD[VD: ClassManifest, ED: ClassManifest](
@transient vTable: RDD[(Vid, (VD, Array[Pid]))], @transient vTable: RDD[(Vid, (VD, Array[Pid]))],
eTable: RDD[(Pid, EdgePartition[ED])]) eTable: RDD[(Pid, EdgePartition[ED])])
extends RDD[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]](eTable.context, Nil) { extends RDD[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])](eTable.context, Nil) {
@transient @transient
private val shuffleDependency = { private val shuffleDependency = {
@ -48,7 +48,7 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].eTablePartition) eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].eTablePartition)
override def compute(s: Partition, context: TaskContext) override def compute(s: Partition, context: TaskContext)
: Iterator[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]] = { : Iterator[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])] = {
val split = s.asInstanceOf[EdgeWithVerticesPartition] val split = s.asInstanceOf[EdgeWithVerticesPartition]
@ -81,6 +81,6 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
e e
} }
} }
(vmap, iter) Iterator((vmap, iter))
} }
} }

View file

@ -5,7 +5,8 @@ import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntArrayList import it.unimi.dsi.fastutil.ints.IntArrayList
import spark.{ClosureCleaner, HashPartitioner, SparkContext, RDD} import spark.{ClosureCleaner, HashPartitioner, RDD}
import spark.SparkContext
import spark.SparkContext._ import spark.SparkContext._
import spark.graph.Graph.EdgePartition import spark.graph.Graph.EdgePartition
import spark.storage.StorageLevel import spark.storage.StorageLevel
@ -13,7 +14,10 @@ import spark.storage.StorageLevel
case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] ( case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] (
var id: Vid = 0, var id: Vid = 0,
var data: VD = nullValue[VD]) var data: VD = nullValue[VD]) {
def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2)
}
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
@ -37,37 +41,102 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl
/** /**
* A Graph RDD that supports computation on graphs. * A Graph RDD that supports computation on graphs.
*/ */
class Graph[VD: Manifest, ED: Manifest]( class Graph[VD: ClassManifest, ED: ClassManifest] protected (
val rawVertices: RDD[Vertex[VD]], _rawVertices: RDD[Vertex[VD]],
val rawEdges: RDD[Edge[ED]]) { _rawEdges: RDD[Edge[ED]],
_rawVTable: RDD[(Vid, (VD, Array[Pid]))],
_rawETable: RDD[(Pid, EdgePartition[ED])]) {
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = this(vertices, edges, null, null)
protected var _cached = false
def cache(): Graph[VD, ED] = {
eTable.cache()
vTable.cache()
_cached = true
this
}
var numEdgePartitions = 5 var numEdgePartitions = 5
var numVertexPartitions = 5 var numVertexPartitions = 5
val vertexPartitioner = new HashPartitioner(numVertexPartitions) protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
val edgePartitioner = new HashPartitioner(numEdgePartitions) protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
lazy val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable( protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
rawEdges, numEdgePartitions) if (_rawETable == null) {
Graph.createETable(_rawEdges, numEdgePartitions)
} else {
_rawETable
}
}
lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable( protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
rawVertices, eTable, numVertexPartitions) if (_rawVTable == null) {
Graph.createVTable(_rawVertices, eTable, numVertexPartitions)
} else {
_rawVTable
}
}
def vertices(): RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } def vertices: RDD[Vertex[VD]] = {
if (!_cached && _rawVertices != null) {
_rawVertices
} else {
vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
}
}
def edges(): RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator } def edges: RDD[Edge[ED]] = {
if (!_cached && _rawEdges != null) {
_rawEdges
} else {
eTable.mapPartitions { iter => iter.next._2.iterator }
}
}
def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] = { def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = {
(new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions { case(vmap, iter) => iter } (new EdgeWithVerticesRDD[VD, ED](vTable, eTable)).mapPartitions { part => part.next._2 }
}
def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]) = {
ClosureCleaner.clean(f)
new Graph(vertices.map(f), edges)
}
def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]) = {
ClosureCleaner.clean(f)
new Graph(vertices, edges.map(f))
}
def updateVertices[U: ClassManifest](
updates: RDD[(Vid, U)],
updateFunc: (Vertex[VD], Seq[U]) => VD)
: Graph[VD, ED] = {
ClosureCleaner.clean(updateFunc)
val joined: RDD[(Vid, ((VD, Array[Pid]), Option[Seq[U]]))] =
vTable.leftOuterJoin(updates.groupByKey(vertexPartitioner))
val newVTable = (joined.mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), updates)) =>
val newVdata = if (updates.isDefined) updateFunc(Vertex(vid, vdata), updates.get) else vdata
(vid, (newVdata, pids))
}
}, preservesPartitioning = true)).cache()
new Graph(null, null, newVTable, eTable)
} }
def mapPartitions[U: ClassManifest]( def mapPartitions[U: ClassManifest](
f: (VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U], f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = { preservesPartitioning: Boolean = false): RDD[U] = {
(new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part => (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part =>
val (vmap, iter) = part.next() val (vmap, iter) = part.next()
iter.mapPartitions(f) f(vmap, iter)
}, preservesPartitioning) }, preservesPartitioning)
} }
@ -76,11 +145,48 @@ class Graph[VD: Manifest, ED: Manifest](
object Graph { object Graph {
/**
* Load an edge list from file initializing the Graph RDD
*/
def textFile[ED: ClassManifest](sc: SparkContext,
fname: String, edgeParser: Array[String] => ED) = {
// Parse the edge data table
val edges = sc.textFile(fname).map { line =>
val lineArray = line.split("\\s+")
if(lineArray.length < 2) {
println("Invalid line: " + line)
assert(false)
}
val source = lineArray(0)
val target = lineArray(1)
val tail = lineArray.drop(2)
val edata = edgeParser(tail)
Edge(source.trim.toInt, target.trim.toInt, edata)
}.cache()
// Parse the vertex data table
val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) }
.reduceByKey(_ + _)
.map(new Vertex(_))
val graph = new Graph[Int, ED](vertices, edges)
graph.cache()
println("Loaded graph:" +
"\n\t#edges: " + graph.edges.count +
"\n\t#vertices: " + graph.vertices.count)
graph
}
/** /**
* A partition of edges in 3 large columnar arrays. * A partition of edges in 3 large columnar arrays.
*/ */
private[graph] private[graph]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: Manifest] { class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED:ClassManifest]
{
val srcIds: IntArrayList = new IntArrayList val srcIds: IntArrayList = new IntArrayList
val dstIds: IntArrayList = new IntArrayList val dstIds: IntArrayList = new IntArrayList
// TODO: Specialize data. // TODO: Specialize data.
@ -104,7 +210,7 @@ object Graph {
private var edge = new Edge[ED] private var edge = new Edge[ED]
private var pos = 0 private var pos = 0
override def hasNext: Boolean = pos < size override def hasNext: Boolean = pos < EdgePartition.this.size
override def next(): Edge[ED] = { override def next(): Edge[ED] = {
edge.src = srcIds.get(pos) edge.src = srcIds.get(pos)
@ -117,7 +223,7 @@ object Graph {
} }
private[graph] private[graph]
def createVTable[VD: Manifest, ED: Manifest]( def createVTable[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[Vertex[VD]], vertices: RDD[Vertex[VD]],
eTable: RDD[(Pid, EdgePartition[ED])], eTable: RDD[(Pid, EdgePartition[ED])],
numPartitions: Int) = { numPartitions: Int) = {
@ -145,7 +251,6 @@ object Graph {
case (vdata, None) => (vdata, Array.empty[Pid]) case (vdata, None) => (vdata, Array.empty[Pid])
case (vdata, Some(pids)) => (vdata, pids.toArray) case (vdata, Some(pids)) => (vdata, pids.toArray)
} }
.cache()
} }
/** /**
@ -157,7 +262,7 @@ object Graph {
* containing all the edges in a partition. * containing all the edges in a partition.
*/ */
private[graph] private[graph]
def createETable[ED: Manifest](edges: RDD[Edge[ED]], numPartitions: Int) def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
: RDD[(Pid, EdgePartition[ED])] = { : RDD[(Pid, EdgePartition[ED])] = {
edges.map { e => edges.map { e =>
@ -170,7 +275,6 @@ object Graph {
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) } iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
Iterator((pid, edgePartition)) Iterator((pid, edgePartition))
}, preservesPartitioning = true) }, preservesPartitioning = true)
.cache()
} }
/** /**

View file

@ -1,45 +1,57 @@
package spark.graph package spark.graph
import scala.collection.JavaConversions._
import spark.RDD
object GraphLab { object GraphLab {
def iterateGAS[A: Manifest, VD: Manifest, ED: Manifest]( def iterateGAS[A: ClassManifest, VD: ClassManifest, ED: ClassManifest](
graph: Graph[VD, ED], graph: Graph[VD, ED],
gather: (Vid, EdgeWithVertices[VD, ED]) => A, gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A, merge: (A, A) => A,
default: A, default: A,
apply: (Vertex[VD], A) => VD, apply: (Vertex[VD], A) => VD,
numIter: Int, numIter: Int,
gatherEdges: EdgeDirection = EdgeDirection.In) = { gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) = {
val g = new Graph[(VD, A), ED](graph.rawVertices.map(v => (v, default)), graph.rawEdges) var g = graph.mapVertices(v => Vertex(v.id, VDataWithAcc(v.data, default))).cache()
var i = 0 var i = 0
while (i < numIter) { while (i < numIter) {
val gatherTable = g.mapPartitions { case(vmap, iter) => val accUpdates: RDD[(Vid, A)] = g.mapPartitions({ case(vmap, iter) =>
val edgeSansAcc = new EdgeWithVertices[VD, ED]() val edgeSansAcc = new EdgeWithVertices[VD, ED]()
iter.map { edge: EdgeWithVertices[(VD, A), ED] => iter.map { edge: EdgeWithVertices[VDataWithAcc[VD, A], ED] =>
edgeSansAcc.data = edge.data edgeSansAcc.data = edge.data
edgeSansAcc.src.data = edge.src.data._1 edgeSansAcc.src.data = edge.src.data.vdata
edgeSansAcc.dst.data = edge.dst.data._1 edgeSansAcc.dst.data = edge.dst.data.vdata
edgeSansAcc.src.id = edge.src.id edgeSansAcc.src.id = edge.src.id
edgeSansAcc.dst.id = edge.dst.id edgeSansAcc.dst.id = edge.dst.id
if (gatherEdges == EdgeDirection.In || gatherEdges == EdgeDirection.Both) { if (gatherEdges == EdgeDirection.In || gatherEdges == EdgeDirection.Both) {
edge.dst.data._2 = merge(edge.dst.data._2, gather(edgeSansAcc.dst.id, edgeSansAcc)) edge.dst.data.acc = merge(edge.dst.data.acc, gather(edgeSansAcc.dst.id, edgeSansAcc))
} }
if (gatherEdges == EdgeDirection.Out || gatherEdges == EdgeDirection.Both) { if (gatherEdges == EdgeDirection.Out || gatherEdges == EdgeDirection.Both) {
edge.src.data._2 = merge(edge.src.data._2, gather(edgeSansAcc.src.id, edgeSansAcc)) edge.src.data.acc = merge(edge.src.data.acc, gather(edgeSansAcc.src.id, edgeSansAcc))
} }
} }
vmap.int2ObjectEntrySet().fastIterator().map{ case (vid, (vdata, acc)) => (vid, acc) } vmap.int2ObjectEntrySet().fastIterator().map{ entry =>
}.reduceByKey(graph.vertexPartitioner, false) (entry.getIntKey(), entry.getValue().acc)
}
})(classManifest[(Int, A)])
gatherTable def applyFunc(v: Vertex[VDataWithAcc[VD, A]], updates: Seq[A]): VDataWithAcc[VD, A] = {
VDataWithAcc(apply(Vertex(v.id, v.data.vdata), updates.reduce(merge)), default)
}
g = g.updateVertices(accUpdates, applyFunc).cache()
i += 1 i += 1
} }
} }
} }
private[graph]
sealed case class VDataWithAcc[VD: ClassManifest, A](var vdata: VD, var acc: A)

View file

@ -4,8 +4,8 @@ package object graph {
type Vid = Int type Vid = Int
type Pid = Int type Pid = Int
type Status = Boolean
type VertexHashMap = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap type VertexHashMap[T] = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap[T]
/** /**
* Return the default null-like value for a data type T. * Return the default null-like value for a data type T.