numVertexPartitions and numEdgePartitions are now part of the

constructor and are immutable.

Also done some cleanups.
This commit is contained in:
Reynold Xin 2013-04-05 12:54:59 +08:00
parent d5b0f4dfa6
commit 7134856351
2 changed files with 236 additions and 221 deletions

View file

@ -287,9 +287,7 @@ object Analytics {
println("======================================")
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = Graph.textFile(sc, fname, a => 1.0F)
graph.numVertexPartitions = numVPart
graph.numEdgePartitions = numEPart
val graph = Graph.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart)
val startTime = System.currentTimeMillis
val pr = Analytics.pagerank(graph, numIter)

View file

@ -49,17 +49,40 @@ case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Dou
var _1: U, var _2: V)
/**
* A Graph RDD that supports computation on graphs.
*/
class Graph[VD: ClassManifest, ED: ClassManifest] protected (
val numVertexPartitions: Int,
val numEdgePartitions: Int,
_rawVertices: RDD[Vertex[VD]],
_rawEdges: RDD[Edge[ED]],
_rawVTable: RDD[(Vid, (VD, Array[Pid]))],
_rawETable: RDD[(Pid, EdgePartition[ED])]) {
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = this(vertices, edges, null, null)
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
this(
Graph.DEFAULT_NUM_VERTEX_PARTITIONS, Graph.DEFAULT_NUM_EDGE_PARTITIONS,
vertices, edges,
null, null)
}
def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
if (_cached) {
val newgraph = new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)
newgraph.cache()
} else {
new Graph(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
}
}
def withVertexPartitioner(numVertexPartitions: Int = Graph.DEFAULT_NUM_VERTEX_PARTITIONS) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
def withEdgePartitioner(numEdgePartitions: Int = Graph.DEFAULT_NUM_EDGE_PARTITIONS) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
protected var _cached = false
@ -70,17 +93,32 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
this
}
/// Todo: Should theses be set on construction and passed onto derived graphs? Yes.
var numEdgePartitions = 5
var numVertexPartitions = 5
/** Return a RDD of vertices. */
def vertices: RDD[Vertex[VD]] = {
if (!_cached && _rawVertices != null) {
_rawVertices
} else {
vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
}
}
/// Todo: Should these be passed onto derived graphs?
protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
/** Return a RDD of edges. */
def edges: RDD[Edge[ED]] = {
if (!_cached && _rawEdges != null) {
_rawEdges
} else {
eTable.mapPartitions { iter => iter.next._2.iterator }
}
}
lazy val numEdges = edges.count()
/** Return a RDD that brings edges with its source and destination vertices together. */
def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = {
(new EdgeWithVerticesRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 }
}
lazy val numVertices = vertices.count()
lazy val numEdges: Long = edges.count()
lazy val numVertices: Long = vertices.count()
lazy val inDegrees = mapReduceNeighborhood[Vid]((vid, edge) => 1, _+_, 0, EdgeDirection.In)
@ -88,11 +126,164 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
lazy val degrees = mapReduceNeighborhood[Vid]((vid,edge) => 1, _+_, 0, EdgeDirection.Both)
/** Return a new graph with its edge directions reversed. */
lazy val reverse: Graph[VD,ED] = {
newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) })
}
edges
.map { case Edge(src, target, _) => (target, 1) }
.reduceByKey(vertexPartitioner, _ + _)
def collectNeighborIds(edgeDirection: EdgeDirection.EdgeDirection) : RDD[(Vid, Array[Vid])] = {
mapReduceNeighborhood[Array[Vid]](
(vid, edge) => Array(edge.otherVertex(vid).id),
(a, b) => a ++ b,
Array.empty[Vid],
edgeDirection)
}
def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]): Graph[VD2, ED] = {
newGraph(vertices.map(f), edges)
}
def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]): Graph[VD, ED2] = {
newGraph(vertices, edges.map(f))
}
//////////////////////////////////////////////////////////////////////////////////////////////////
// Lower level transformation methods
//////////////////////////////////////////////////////////////////////////////////////////////////
def mapReduceNeighborhood[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => VD2,
reduceFunc: (VD2, VD2) => VD2,
default: VD2,
gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, default)) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, VD2], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, VD2], ED] =>
edgeSansAcc.data = e.data
edgeSansAcc.src.data = e.src.data._1
edgeSansAcc.dst.data = e.dst.data._1
edgeSansAcc.src.id = e.src.id
edgeSansAcc.dst.id = e.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 = reduceFunc(e.dst.data._2, mapFunc(edgeSansAcc.dst.id, edgeSansAcc))
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
e.src.data._2 = reduceFunc(e.src.data._2, mapFunc(edgeSansAcc.src.id, edgeSansAcc))
}
}
vmap.int2ObjectEntrySet().fastIterator().map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
/**
* Same as mapReduceNeighborhood but map function can return none and there is no default value.
* As a consequence, the resulting table may be much smaller than the set of vertices.
*/
def flatMapReduceNeighborhood[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2,
gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { e: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
edgeSansAcc.data = e.data
edgeSansAcc.src.data = e.src.data._1
edgeSansAcc.dst.data = e.dst.data._1
edgeSansAcc.src.id = e.src.id
edgeSansAcc.dst.id = e.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2
}
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.src.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2
}
}
}
vmap.int2ObjectEntrySet().fastIterator().filter{!_.getValue()._2.isEmpty}.map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
.map{ case (vid, aOpt) => (vid, aOpt.get) }
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
def updateVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)],
updateFunc: (Vertex[VD], Option[U]) => VD2)
: Graph[VD2, ED] = {
ClosureCleaner.clean(updateFunc)
val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), update)) =>
val newVdata = updateFunc(Vertex(vid, vdata), update)
(vid, (newVdata, pids))
}
}, preservesPartitioning = true).cache()
new Graph(newVTable.partitioner.size, eTable.partitioner.size, null, null, newVTable, eTable)
}
// def mapPartitions[U: ClassManifest](
// f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
// preservesPartitioning: Boolean = false): RDD[U] = {
// (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part =>
// val (vmap, iter) = part.next()
// f(vmap, iter)
// }, preservesPartitioning)
// }
//////////////////////////////////////////////////////////////////////////////////////////////////
// Internals hidden from callers
//////////////////////////////////////////////////////////////////////////////////////////////////
// TODO: Support non-hash partitioning schemes.
protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
/** Create a new graph but keep the current partitioning scheme. */
protected def newGraph[VD2: ClassManifest, ED2: ClassManifest](
vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = {
(new Graph[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions)
}
protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
if (_rawETable == null) {
@ -120,182 +311,14 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
{ part => part.map { case(pid, (vid, vdata)) => (vid, vdata) } },
preservesPartitioning = true)
}
def vertices: RDD[Vertex[VD]] = {
if (!_cached && _rawVertices != null) {
_rawVertices
} else {
vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
}
}
def edges: RDD[Edge[ED]] = {
if (!_cached && _rawEdges != null) {
_rawEdges
} else {
eTable.mapPartitions { iter => iter.next._2.iterator }
}
}
def reverse: Graph[VD,ED] = {
new Graph(vertices, edges.map{ case Edge(s,t,e) => Edge(t,s,e) })
}
def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = {
(new EdgeWithVerticesRDD(vTableReplicated, eTable)).mapPartitions { part => part.next._2 }
}
def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]) = {
ClosureCleaner.clean(f)
new Graph(vertices.map(f), edges)
}
def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]) = {
ClosureCleaner.clean(f)
new Graph(vertices, edges.map(f))
}
def mapReduceNeighborhood[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => VD2,
reduceFunc: (VD2, VD2) => VD2,
default: VD2,
gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
//ClosureCleaner.clean(default)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, default)) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, VD2], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { edge: EdgeWithVertices[MutableTuple2[VD, VD2], ED] =>
edgeSansAcc.data = edge.data
edgeSansAcc.src.data = edge.src.data._1
edgeSansAcc.dst.data = edge.dst.data._1
edgeSansAcc.src.id = edge.src.id
edgeSansAcc.dst.id = edge.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
edge.dst.data._2 = reduceFunc(edge.dst.data._2, mapFunc(edgeSansAcc.dst.id, edgeSansAcc))
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
edge.src.data._2 = reduceFunc(edge.src.data._2, mapFunc(edgeSansAcc.src.id, edgeSansAcc))
}
}
vmap.int2ObjectEntrySet().fastIterator().map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
def collectNeighborIds(edgeDirection: EdgeDirection.EdgeDirection) : RDD[(Vid, Array[Vid])] = {
mapReduceNeighborhood[Array[Vid]](
(vid, edge) => Array(edge.otherVertex(vid).id),
(a,b) => a ++ b,
Array.empty[Vid],
edgeDirection)
}
/**
* Same as mapReduceNeighborhood but map function can return none and there is no default value.
* As a consequence the resulting table may be much smaller than the set of vertices.
*/
def flatMapReduceNeighborhood[VD2: ClassManifest](
mapFunc: (Vid, EdgeWithVertices[VD, ED]) => Option[VD2],
reduceFunc: (VD2, VD2) => VD2,
gatherDirection: EdgeDirection.EdgeDirection): RDD[(Vid, VD2)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[VD2])) }
}, preservesPartitioning = true)
(new EdgeWithVerticesRDD[MutableTuple2[VD, Option[VD2]], ED](newVTable, eTable))
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { edge: EdgeWithVertices[MutableTuple2[VD, Option[VD2]], ED] =>
edgeSansAcc.data = edge.data
edgeSansAcc.src.data = edge.src.data._1
edgeSansAcc.dst.data = edge.dst.data._1
edgeSansAcc.src.id = edge.src.id
edgeSansAcc.dst.id = edge.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
edge.dst.data._2 =
if(edge.dst.data._2.isEmpty) mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
else {
val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
if(!tmp.isEmpty) Some(reduceFunc(edge.dst.data._2.get, tmp.get))
else edge.dst.data._2
}
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
edge.dst.data._2 =
if(edge.dst.data._2.isEmpty) mapFunc(edgeSansAcc.src.id, edgeSansAcc)
else {
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
if(!tmp.isEmpty) Some(reduceFunc(edge.src.data._2.get, tmp.get))
else edge.src.data._2
}
}
}
vmap.int2ObjectEntrySet().fastIterator().filter{!_.getValue()._2.isEmpty}.map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
.map{ case (vid, aOpt) => (vid, aOpt.get) }
.combineByKey((v: VD2) => v, reduceFunc, null, vertexPartitioner, false)
}
def updateVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)],
updateFunc: (Vertex[VD], Option[U]) => VD2)
: Graph[VD2, ED] = {
ClosureCleaner.clean(updateFunc)
val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), update)) =>
val newVdata = updateFunc(Vertex(vid, vdata), update)
(vid, (newVdata, pids))
}
}, preservesPartitioning = true).cache()
new Graph(null, null, newVTable, eTable)
}
// def mapPartitions[U: ClassManifest](
// f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
// preservesPartitioning: Boolean = false): RDD[U] = {
// (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part =>
// val (vmap, iter) = part.next()
// f(vmap, iter)
// }, preservesPartitioning)
// }
}
object Graph {
val DEFAULT_NUM_VERTEX_PARTITIONS = 5
val DEFAULT_NUM_EDGE_PARTITIONS = 5
/**
* Load an edge list from file initializing the Graph RDD
*/
@ -324,12 +347,11 @@ object Graph {
graph
}
def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): Graph[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) }
.reduceByKey(_ + _)
.map{ case (vid, degree) => Vertex(vid, degree) }
new Graph[Int, ED](vertices, edges).cache
(new Graph[Int, ED](vertices, edges))
}
/**
@ -370,7 +392,7 @@ object Graph {
* A partition of edges in 3 large columnar arrays.
*/
private[graph]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED:ClassManifest]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]
{
val srcIds: IntArrayList = new IntArrayList
val dstIds: IntArrayList = new IntArrayList
@ -407,11 +429,34 @@ object Graph {
}
}
private[graph]
def createVTable[VD: ClassManifest, ED: ClassManifest](
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
*
* The edge table contains multiple partitions, and each partition contains only one RDD
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
: RDD[(Pid, EdgePartition[ED])] = {
edges
.map { e =>
// Random partitioning based on the source vertex id.
(math.abs(e.src) % numPartitions, (e.src, e.dst, e.data))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex({ (pid, iter) =>
val edgePartition = new Graph.EdgePartition[ED]
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
Iterator((pid, edgePartition))
}, preservesPartitioning = true)
}
protected def createVTable[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[Vertex[VD]],
eTable: RDD[(Pid, EdgePartition[ED])],
numPartitions: Int) = {
numPartitions: Int)
: RDD[(Vid, (VD, Array[Pid]))] = {
val partitioner = new HashPartitioner(numPartitions)
// A key-value RDD. The key is a vertex id, and the value is a list of
@ -437,32 +482,4 @@ object Graph {
case (vdata, Some(pids)) => (vdata, pids.toArray)
}
}
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
*
* The edge table contains multiple partitions, and each partition contains only one RDD
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
private[graph]
def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
: RDD[(Pid, EdgePartition[ED])] = {
edges.map { e =>
// Random partitioning based on the source vertex id.
(math.abs(e.src) % numPartitions, (e.src, e.dst, e.data))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex({ (pid, iter) =>
val edgePartition = new Graph.EdgePartition[ED]
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
Iterator((pid, edgePartition))
}, preservesPartitioning = true)
}
}