Oh wow it finally compiles!

This commit is contained in:
Reynold Xin 2013-04-03 23:12:01 +08:00
parent d63c895945
commit cb0efe92d1
4 changed files with 115 additions and 38 deletions

View file

@ -17,10 +17,10 @@ class EdgeWithVerticesPartition(idx: Int, val eTablePartition: Partition) extend
* A RDD that brings together edge data with its associated vertex data.
*/
private[graph]
class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
class EdgeWithVerticesRDD[VD: ClassManifest, ED: ClassManifest](
@transient vTable: RDD[(Vid, (VD, Array[Pid]))],
eTable: RDD[(Pid, EdgePartition[ED])])
extends RDD[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]](eTable.context, Nil) {
extends RDD[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])](eTable.context, Nil) {
@transient
private val shuffleDependency = {
@ -48,7 +48,7 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].eTablePartition)
override def compute(s: Partition, context: TaskContext)
: Iterator[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]] = {
: Iterator[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])] = {
val split = s.asInstanceOf[EdgeWithVerticesPartition]
@ -81,6 +81,6 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
e
}
}
(vmap, iter)
Iterator((vmap, iter))
}
}

View file

@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntArrayList
import spark.{ClosureCleaner, HashPartitioner, SparkContext, RDD}
import spark.{ClosureCleaner, HashPartitioner, RDD}
import spark.SparkContext._
import spark.graph.Graph.EdgePartition
import spark.storage.StorageLevel
@ -37,37 +37,102 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl
/**
* A Graph RDD that supports computation on graphs.
*/
class Graph[VD: Manifest, ED: Manifest](
val rawVertices: RDD[Vertex[VD]],
val rawEdges: RDD[Edge[ED]]) {
class Graph[VD: ClassManifest, ED: ClassManifest] protected (
_rawVertices: RDD[Vertex[VD]],
_rawEdges: RDD[Edge[ED]],
_rawVTable: RDD[(Vid, (VD, Array[Pid]))],
_rawETable: RDD[(Pid, EdgePartition[ED])]) {
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = this(vertices, edges, null, null)
protected var _cached = false
def cache(): Graph[VD, ED] = {
eTable.cache()
vTable.cache()
_cached = true
this
}
var numEdgePartitions = 5
var numVertexPartitions = 5
val vertexPartitioner = new HashPartitioner(numVertexPartitions)
protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
val edgePartitioner = new HashPartitioner(numEdgePartitions)
protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
lazy val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable(
rawEdges, numEdgePartitions)
protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
if (_rawETable == null) {
Graph.createETable(_rawEdges, numEdgePartitions)
} else {
_rawETable
}
}
lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable(
rawVertices, eTable, numVertexPartitions)
protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
if (_rawVTable == null) {
Graph.createVTable(_rawVertices, eTable, numVertexPartitions)
} else {
_rawVTable
}
}
def vertices(): RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
def vertices: RDD[Vertex[VD]] = {
if (!_cached && _rawVertices != null) {
_rawVertices
} else {
vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
}
}
def edges(): RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator }
def edges: RDD[Edge[ED]] = {
if (!_cached && _rawEdges != null) {
_rawEdges
} else {
eTable.mapPartitions { iter => iter.next._2.iterator }
}
}
def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] = {
(new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions { case(vmap, iter) => iter }
def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = {
(new EdgeWithVerticesRDD[VD, ED](vTable, eTable)).mapPartitions { part => part.next._2 }
}
def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]) = {
ClosureCleaner.clean(f)
new Graph(vertices.map(f), edges)
}
def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]) = {
ClosureCleaner.clean(f)
new Graph(vertices, edges.map(f))
}
def updateVertices[U: ClassManifest](
updates: RDD[(Vid, U)],
updateFunc: (Vertex[VD], Seq[U]) => VD)
: Graph[VD, ED] = {
ClosureCleaner.clean(updateFunc)
val joined: RDD[(Vid, ((VD, Array[Pid]), Option[Seq[U]]))] =
vTable.leftOuterJoin(updates.groupByKey(vertexPartitioner))
val newVTable = (joined.mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), updates)) =>
val newVdata = if (updates.isDefined) updateFunc(Vertex(vid, vdata), updates.get) else vdata
(vid, (newVdata, pids))
}
}, preservesPartitioning = true)).cache()
new Graph(null, null, newVTable, eTable)
}
def mapPartitions[U: ClassManifest](
f: (VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
(new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part =>
val (vmap, iter) = part.next()
iter.mapPartitions(f)
f(vmap, iter)
}, preservesPartitioning)
}
@ -80,7 +145,8 @@ object Graph {
* A partition of edges in 3 large columnar arrays.
*/
private[graph]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: Manifest] {
class EdgePartition [@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED:ClassManifest]
{
val srcIds: IntArrayList = new IntArrayList
val dstIds: IntArrayList = new IntArrayList
// TODO: Specialize data.
@ -117,7 +183,7 @@ object Graph {
}
private[graph]
def createVTable[VD: Manifest, ED: Manifest](
def createVTable[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[Vertex[VD]],
eTable: RDD[(Pid, EdgePartition[ED])],
numPartitions: Int) = {
@ -145,7 +211,6 @@ object Graph {
case (vdata, None) => (vdata, Array.empty[Pid])
case (vdata, Some(pids)) => (vdata, pids.toArray)
}
.cache()
}
/**
@ -157,7 +222,7 @@ object Graph {
* containing all the edges in a partition.
*/
private[graph]
def createETable[ED: Manifest](edges: RDD[Edge[ED]], numPartitions: Int)
def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
: RDD[(Pid, EdgePartition[ED])] = {
edges.map { e =>
@ -170,7 +235,6 @@ object Graph {
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
Iterator((pid, edgePartition))
}, preservesPartitioning = true)
.cache()
}
}

View file

@ -1,45 +1,58 @@
package spark.graph
import scala.collection.JavaConversions._
import spark.RDD
class GraphLab {
def iterateGAS[A: Manifest, VD: Manifest, ED: Manifest](
def iterateGAS[A: ClassManifest, VD: ClassManifest, ED: ClassManifest](
graph: Graph[VD, ED],
gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A,
default: A,
apply: (Vertex[VD], A) => VD,
numIter: Int,
gatherEdges: EdgeDirection = EdgeDirection.In) = {
gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) = {
val g = new Graph[(VD, A), ED](graph.rawVertices.map(v => (v, default)), graph.rawEdges)
var g = graph.mapVertices(v => Vertex(v.id, VDataWithAcc(v.data, default))).cache()
var i = 0
while (i < numIter) {
val gatherTable = g.mapPartitions { case(vmap, iter) =>
val accUpdates: RDD[(Vid, A)] = g.mapPartitions({ case(vmap, iter) =>
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
iter.map { edge: EdgeWithVertices[(VD, A), ED] =>
iter.map { edge: EdgeWithVertices[VDataWithAcc[VD, A], ED] =>
edgeSansAcc.data = edge.data
edgeSansAcc.src.data = edge.src.data._1
edgeSansAcc.dst.data = edge.dst.data._1
edgeSansAcc.src.data = edge.src.data.vdata
edgeSansAcc.dst.data = edge.dst.data.vdata
edgeSansAcc.src.id = edge.src.id
edgeSansAcc.dst.id = edge.dst.id
if (gatherEdges == EdgeDirection.In || gatherEdges == EdgeDirection.Both) {
edge.dst.data._2 = merge(edge.dst.data._2, gather(edgeSansAcc.dst.id, edgeSansAcc))
edge.dst.data.acc = merge(edge.dst.data.acc, gather(edgeSansAcc.dst.id, edgeSansAcc))
}
if (gatherEdges == EdgeDirection.Out || gatherEdges == EdgeDirection.Both) {
edge.src.data._2 = merge(edge.src.data._2, gather(edgeSansAcc.src.id, edgeSansAcc))
edge.src.data.acc = merge(edge.src.data.acc, gather(edgeSansAcc.src.id, edgeSansAcc))
}
}
vmap.int2ObjectEntrySet().fastIterator().map{ case (vid, (vdata, acc)) => (vid, acc) }
}.reduceByKey(graph.vertexPartitioner, false)
vmap.int2ObjectEntrySet().fastIterator().map{ entry =>
(entry.getIntKey(), entry.getValue().acc)
}
})(classManifest[(Int, A)])
gatherTable
def applyFunc(v: Vertex[VDataWithAcc[VD, A]], updates: Seq[A]): VDataWithAcc[VD, A] = {
VDataWithAcc(apply(Vertex(v.id, v.data.vdata), updates.reduce(merge)), default)
}
g = g.updateVertices(accUpdates, applyFunc).cache()
i += 1
}
}
}
private[graph]
sealed case class VDataWithAcc[VD: ClassManifest, A](var vdata: VD, var acc: A)

View file

@ -5,7 +5,7 @@ package object graph {
type Vid = Int
type Pid = Int
type Status = Boolean
type VertexHashMap = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap
type VertexHashMap[T] = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap[T]
/**
* Return the default null-like value for a data type T.