Integrated IndexedRDD into graph design.

This commit is contained in:
Joseph E. Gonzalez 2013-10-13 19:42:32 -07:00
parent fa2f87ca63
commit 494472a6cc
14 changed files with 749 additions and 540 deletions

View file

@ -49,6 +49,32 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
}
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
override def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = {
val cleanF = self.index.rdd.context.clean(f)
val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ (keysIter, valuesIter) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
val oldValues = valuesIter.next()
assert(valuesIter.hasNext() == false)
// Allocate the array to store the results into
val newValues: Array[Seq[U]] = new Array[Seq[U]](oldValues.size)
// Populate the new Values
for( (k,i) <- index ) {
if(oldValues(i) != null) {
newValues(i) = oldValues(i).map( v => f(k,v) )
}
}
Array(newValues.toSeq).iterator
}
new IndexedRDD[K,U](self.index, newValues)
}
/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.

View file

@ -53,6 +53,8 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI
rdd.persist(newLevel)
return this
}
def partitioner: Partitioner = rdd.partitioner.get
}
@ -85,6 +87,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
override val partitioner = index.rdd.partitioner
/**
* The actual partitions are defined by the tuples.
*/

View file

@ -393,6 +393,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
new MappedValuesRDD(self, cleanF)
}
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = {
self.map{ case (k,v) => (k, f(k,v)) }
}
/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.

View file

@ -44,9 +44,9 @@ object Analytics extends Logging {
numIter: Int,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees,
(vertex, deg) => (deg.getOrElse(0), 1.0)
)
val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => (deg.getOrElse(0), 1.0)
}
println("Vertex Replication: " + pagerankGraph.replication)
@ -59,11 +59,11 @@ object Analytics extends Logging {
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(
(vertex, a: Double) => (vertex.data._1, (resetProb + (1.0 - resetProb) * a)), // apply
(vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply
(me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather
(a: Double, b: Double) => a + b, // merge
1.0,
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => r }
numIter).mapVertices{ case (id, (outDeg, r)) => r }
}
/**
@ -74,18 +74,19 @@ object Analytics extends Logging {
maxIter: Int = Integer.MAX_VALUE,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex
val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double, Double)](graph.outDegrees,
(vertex, degIter) => (degIter.sum, 1.0, 1.0)
)
val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
(id, data, degIter) => (degIter.sum, 1.0, 1.0)
}
// Run PageRank
GraphLab.iterate(pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Double, b: Double) => a + b,
(vertex, a: Option[Double]) =>
(vertex.data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), vertex.data._2), // apply
(id, data, a: Option[Double]) =>
(data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
(me_id, edge) => math.abs(edge.src.data._3 - edge.src.data._2) > tol, // scatter
maxIter).mapVertices { case Vertex(vid, data) => data._2 }
maxIter).mapVertices { case (vid, data) => data._2 }
}
@ -96,12 +97,12 @@ object Analytics extends Logging {
* that vertex.
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = {
val ccGraph = graph.mapVertices { case Vertex(vid, _) => vid }
val ccGraph = graph.mapVertices { case (vid, _) => vid }
GraphLab.iterate(ccGraph)(
(me_id, edge) => edge.otherVertex(me_id).data, // gather
(a: Vid, b: Vid) => math.min(a, b), // merge
(v, a: Option[Vid]) => math.min(v.data, a.getOrElse(Long.MaxValue)), // apply
(id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply
(me_id, edge) => (edge.vertex(me_id).data < edge.otherVertex(me_id).data), // scatter
gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both
)

View file

@ -2,6 +2,7 @@ package org.apache.spark.graph
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClosureCleaner
@ -33,7 +34,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
*
* @todo should vertices return tuples instead of vertex objects?
*/
def vertices: RDD[Vertex[VD]]
def vertices: RDD[(Vid,VD)]
/**
* Get the Edges and their data as an RDD. The entries in the RDD contain
@ -101,7 +102,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}}
*
*/
def mapVertices[VD2: ClassManifest](map: Vertex[VD] => VD2): Graph[VD2, ED]
def mapVertices[VD2: ClassManifest](map: (Vid, VD) => VD2): Graph[VD2, ED]
/**
* Construct a new graph where each the value of each edge is transformed by
@ -149,13 +150,13 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
/**
* Remove edges conntecting vertices that are not in the graph.
*
* @todo remove this function and ensure that for a graph G=(V,E):
* if (u,v) in E then u in V and v in V
*/
def correctEdges(): Graph[VD, ED]
// /**
// * Remove edges conntecting vertices that are not in the graph.
// *
// * @todo remove this function and ensure that for a graph G=(V,E):
// * if (u,v) in E then u in V and v in V
// */
// def correctEdges(): Graph[VD, ED]
/**
* Construct a new graph with all the edges reversed. If this graph contains
@ -183,8 +184,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @return the subgraph containing only the vertices and edges that satisfy the
* predicates.
*/
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (_ => true),
vpred: Vertex[VD] => Boolean = (_ => true) ): Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
// /**
@ -200,51 +201,55 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
reduceFunc: (A, A) => A)
: RDD[(Vid, A)]
/**
* This function is used to compute a statistic for the neighborhood of each
* vertex.
*
* This is one of the core functions in the Graph API in that enables
* neighborhood level computation. For example this function can be used to
* count neighbors satisfying a predicate or implement PageRank.
*
* @note The returned RDD may contain fewer entries than their are vertices
* in the graph. This is because some vertices may not have neighbors or the
* map function may return None for all neighbors.
*
* @param mapFunc the function applied to each edge adjacent to each vertex.
* The mapFunc can optionally return None in which case it does not
* contribute to the final sum.
* @param mergeFunc the function used to merge the results of each map
* operation.
* @param direction the direction of edges to consider (e.g., In, Out, Both).
* @tparam VD2 The returned type of the aggregation operation.
*
* @return A Spark.RDD containing tuples of vertex identifiers and thee
* resulting value. Note that the returned RDD may contain fewer vertices
* than in the original graph since some vertices may not have neighbors or
* the map function could return None for all neighbors.
*
* @example We can use this function to compute the average follower age for
* each user
* {{{
* val graph: Graph[Int,Int] = loadGraph()
* val averageFollowerAge: RDD[(Int, Int)] =
* graph.aggregateNeighbors[(Int,Double)](
* (vid, edge) => (edge.otherVertex(vid).data, 1),
* (a, b) => (a._1 + b._1, a._2 + b._2),
* EdgeDirection.In)
* .mapValues{ case (sum,followers) => sum.toDouble / followers}
* }}}
*
*/
def aggregateNeighbors[A: ClassManifest](
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeFunc: (A, A) => A,
direction: EdgeDirection)
: Graph[(VD, Option[A]), ED]
// /**
// * This function is used to compute a statistic for the neighborhood of each
// * vertex.
// *
// * This is one of the core functions in the Graph API in that enables
// * neighborhood level computation. For example this function can be used to
// * count neighbors satisfying a predicate or implement PageRank.
// *
// * @note The returned RDD may contain fewer entries than their are vertices
// * in the graph. This is because some vertices may not have neighbors or the
// * map function may return None for all neighbors.
// *
// * @param mapFunc the function applied to each edge adjacent to each vertex.
// * The mapFunc can optionally return None in which case it does not
// * contribute to the final sum.
// * @param mergeFunc the function used to merge the results of each map
// * operation.
// * @param direction the direction of edges to consider (e.g., In, Out, Both).
// * @tparam VD2 The returned type of the aggregation operation.
// *
// * @return A Spark.RDD containing tuples of vertex identifiers and thee
// * resulting value. Note that the returned RDD may contain fewer vertices
// * than in the original graph since some vertices may not have neighbors or
// * the map function could return None for all neighbors.
// *
// * @example We can use this function to compute the average follower age for
// * each user
// * {{{
// * val graph: Graph[Int,Int] = loadGraph()
// * val averageFollowerAge: RDD[(Int, Int)] =
// * graph.aggregateNeighbors[(Int,Double)](
// * (vid, edge) => (edge.otherVertex(vid).data, 1),
// * (a, b) => (a._1 + b._1, a._2 + b._2),
// * EdgeDirection.In)
// * .mapValues{ case (sum,followers) => sum.toDouble / followers}
// * }}}
// *
// */
// def aggregateNeighbors[A: ClassManifest](
// mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
// mergeFunc: (A, A) => A,
// direction: EdgeDirection)
// : Graph[(VD, Option[A]), ED]
/**
* This function is used to compute a statistic for the neighborhood of each
@ -291,9 +296,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def aggregateNeighbors[A: ClassManifest](
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
default: A, // Should this be a function or a value?
direction: EdgeDirection)
: Graph[(VD, Option[A]), ED]
: RDD[(Vid, A)]
/**
@ -328,9 +332,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}}
* @todo Is leftJoinVertices the right name?
*/
def leftJoinVertices[U: ClassManifest, VD2: ClassManifest](
table: RDD[(Vid, U)],
mapFunc: (Vertex[VD], Option[U]) => VD2)
def outerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)])
(mapFunc: (Vid, VD, Option[U]) => VD2)
: Graph[VD2, ED]
/**
@ -366,10 +369,15 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* graph.joinVertices(tbl)( (v, row) => row )
* }}}
*/
def joinVertices[U: ClassManifest](
table: RDD[(Vid, U)],
mapFunc: (Vertex[VD], U) => VD)
: Graph[VD, ED]
def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
: Graph[VD, ED] = {
ClosureCleaner.clean(mapFunc)
def uf(id: Vid, data: VD, o: Option[U]): VD = o match {
case Some(u) => mapFunc(id, data, u)
case None => data
}
outerJoinVertices(table)(uf)
}
// Save a copy of the GraphOps object so there is always one unique GraphOps object
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
@ -391,16 +399,16 @@ object Graph {
rawEdges.map { case (s, t) => Edge(s, t, 1) }
}
// Determine unique vertices
val vertices: RDD[Vertex[Int]] = edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }
.reduceByKey(_ + _)
.map{ case (id, deg) => Vertex(id, deg) }
val vertices: RDD[(Vid, Int)] =
edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _)
// Return graph
new GraphImpl(vertices, edges)
GraphImpl(vertices, edges)
}
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]): Graph[VD, ED] = {
new GraphImpl(vertices, edges)
vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = {
GraphImpl(vertices, edges)
}
implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops

View file

@ -36,7 +36,7 @@ object GraphLab {
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A,
applyFunc: (Vertex[VD], Option[A]) => VD,
applyFunc: (Vid, VD, Option[A]) => VD,
scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
numIter: Int = Integer.MAX_VALUE,
gatherDirection: EdgeDirection = EdgeDirection.In,
@ -45,7 +45,7 @@ object GraphLab {
// Add an active attribute to all vertices to track convergence.
var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
case Vertex(id, data) => (true, data)
case (id, data) => (true, data)
}.cache()
// The gather function wrapper strips the active attribute and
@ -64,9 +64,9 @@ object GraphLab {
// The apply function wrapper strips the vertex of the active attribute
// and only invokes the apply function on active vertices
def apply(v: Vertex[((Boolean, VD), Option[A])]): (Boolean, VD) = {
val ((active, vData), accum) = v.data
if (active) (true, applyFunc(Vertex(v.id, vData), accum))
def apply(vid: Vid, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
val (active, vData) = data
if (active) (true, applyFunc(vid, vData, accum))
else (false, vData)
}
@ -89,9 +89,9 @@ object GraphLab {
}
// Used to set the active status of vertices for the next round
def applyActive(v: Vertex[((Boolean, VD), Option[Boolean])]): (Boolean, VD) = {
val ((prevActive, vData), newActive) = v.data
(newActive.getOrElse(false), vData)
def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = {
val (prevActive, vData) = data
(newActive, vData)
}
// Main Loop ---------------------------------------------------------------------
@ -99,29 +99,32 @@ object GraphLab {
var numActive = activeGraph.numVertices
while (i < numIter && numActive > 0) {
val gathered: Graph[((Boolean, VD), Option[A]), ED] =
// Gather
val gathered: RDD[(Vid, A)] =
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
val applied: Graph[(Boolean, VD), ED] = gathered.mapVertices(apply).cache()
// Apply
activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache()
activeGraph = applied.cache()
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
// activeGraph: Graph[(Boolean, VD), ED]
val scattered: Graph[((Boolean, VD), Option[Boolean]), ED] =
val scattered: RDD[(Vid, Boolean)] =
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
val newActiveGraph: Graph[(Boolean, VD), ED] =
scattered.mapVertices(applyActive)
activeGraph = newActiveGraph.cache()
activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache()
numActive = activeGraph.vertices.map(v => if (v.data._1) 1 else 0).reduce(_ + _)
// Calculate the number of active vertices
numActive = activeGraph.vertices.map{
case (vid, data) => if (data._1) 1 else 0
}.reduce(_ + _)
println("Number active vertices: " + numActive)
i += 1
}
// Remove the active attribute from the vertex data before returning the graph
activeGraph.mapVertices(v => v.data._2)
activeGraph.mapVertices{case (vid, data) => data._2 }
}
}

View file

@ -48,7 +48,7 @@ object GraphLoader {
def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) }
.reduceByKey(_ + _)
.map{ case (vid, degree) => Vertex(vid, degree) }
new GraphImpl[Int, ED](vertices, edges)
.map{ case (vid, degree) => (vid, degree) }
GraphImpl(vertices, edges)
}
}

View file

@ -1,7 +1,7 @@
package org.apache.spark.graph
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) {
@ -16,22 +16,18 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) {
lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both)
def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
val graph: Graph[(VD, Option[Array[Vid]]), ED] = g.aggregateNeighbors(
val nbrs = g.aggregateNeighbors[Array[Vid]](
(vid, edge) => Some(Array(edge.otherVertex(vid).id)),
(a, b) => a ++ b,
edgeDirection)
graph.vertices.map(v => {
val (_, neighborIds) = v.data
(v.id, neighborIds.getOrElse(Array()))
})
g.vertices.leftOuterJoin(nbrs).mapValues{
case (_, Some(nbrs)) => nbrs
case (_, None) => Array.empty[Vid]
}
}
private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = {
val degreeGraph: Graph[(VD, Option[Int]), ED] =
g.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
degreeGraph.vertices.map(v => {
val (_, degree) = v.data
(v.id, degree.getOrElse(0))
})
g.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
}
}

View file

@ -6,7 +6,7 @@ import org.apache.spark.rdd.RDD
object Pregel {
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
vprog: (Vertex[VD], A) => VD,
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A,
initialMsg: A,
@ -19,25 +19,26 @@ object Pregel {
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge)
def runProg(vertexWithMsgs: Vertex[(VD, Option[A])]): VD = {
val (vData, msg) = vertexWithMsgs.data
val v = Vertex(vertexWithMsgs.id, vData)
def runProg(id: Vid, data: (VD, Option[A]) ): VD = {
val (vData, msg) = data
msg match {
case Some(m) => vprog(v, m)
case None => v.data
case Some(m) => vprog(id, vData, m)
case None => vData
}
}
var graphWithMsgs: Graph[(VD, Option[A]), ED] =
g.mapVertices(v => (v.data, Some(initialMsg)))
// Receive the first set of messages
g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
while (i < numIter) {
val newGraph: Graph[VD, ED] = graphWithMsgs.mapVertices(runProg).cache()
graphWithMsgs = newGraph.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
// compute the messages
val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
// receive the messages
g = g.joinVertices(messages)(vprog)
// count the iteration
i += 1
}
graphWithMsgs.mapVertices(vertexWithMsgs => vertexWithMsgs.data match {
case (vData, _) => vData
})
// Return the final graph
g
}
}

View file

@ -1,9 +1,6 @@
package org.apache.spark.graph.impl
import scala.collection.mutable.ArrayBuilder
import it.unimi.dsi.fastutil.ints.IntArrayList
import org.apache.spark.graph._
@ -11,29 +8,43 @@ import org.apache.spark.graph._
* A partition of edges in 3 large columnar arrays.
*/
private[graph]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] {
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
val srcIds: Array[Vid],
val dstIds: Array[Vid],
val data: Array[ED]
){
private var _data: Array[ED] = _
private var _dataBuilder = ArrayBuilder.make[ED]
// private var _data: Array[ED] = _
// private var _dataBuilder = ArrayBuilder.make[ED]
val srcIds = new VertexArrayList
val dstIds = new VertexArrayList
// var srcIds = new VertexArrayList
// var dstIds = new VertexArrayList
def data: Array[ED] = _data
def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data)
/** Add a new edge to the partition. */
def add(src: Vid, dst: Vid, d: ED) {
srcIds.add(src)
dstIds.add(dst)
_dataBuilder += d
def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = {
val newData = new Array[ED2](data.size)
val edge = new Edge[ED]()
for(i <- 0 until data.size){
edge.src = srcIds(i)
edge.dst = dstIds(i)
edge.data = data(i)
newData(i) = f(edge)
}
new EdgePartition(srcIds, dstIds, newData)
}
def trim() {
srcIds.trim()
dstIds.trim()
_data = _dataBuilder.result()
def foreach(f: Edge[ED] => Unit) {
val edge = new Edge[ED]
for(i <- 0 until data.size){
edge.src = srcIds(i)
edge.dst = dstIds(i)
edge.data = data(i)
f(edge)
}
}
def size: Int = srcIds.size
def iterator = new Iterator[Edge[ED]] {
@ -43,11 +54,13 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
override def hasNext: Boolean = pos < EdgePartition.this.size
override def next(): Edge[ED] = {
edge.src = srcIds.get(pos)
edge.dst = dstIds.get(pos)
edge.data = _data(pos)
edge.src = srcIds(pos)
edge.dst = dstIds(pos)
edge.data = data(pos)
pos += 1
edge
}
}
}

View file

@ -0,0 +1,31 @@
package org.apache.spark.graph.impl
import scala.collection.mutable.ArrayBuilder
import org.apache.spark.graph._
private[graph]
class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
ED: ClassManifest]{
val srcIds = new VertexArrayList
val dstIds = new VertexArrayList
var dataBuilder = ArrayBuilder.make[ED]
/** Add a new edge to the partition. */
def add(src: Vid, dst: Vid, d: ED) {
srcIds.add(src)
dstIds.add(dst)
dataBuilder += d
}
def toEdgePartition: EdgePartition[ED] = {
new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result())
}
}

View file

@ -1,112 +1,112 @@
package org.apache.spark.graph.impl
// package org.apache.spark.graph.impl
import scala.collection.mutable
// import scala.collection.mutable
import org.apache.spark.Aggregator
import org.apache.spark.Partition
import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.Dependency
import org.apache.spark.OneToOneDependency
import org.apache.spark.ShuffleDependency
import org.apache.spark.SparkContext._
import org.apache.spark.graph._
// import org.apache.spark.Aggregator
// import org.apache.spark.Partition
// import org.apache.spark.SparkEnv
// import org.apache.spark.TaskContext
// import org.apache.spark.rdd.RDD
// import org.apache.spark.Dependency
// import org.apache.spark.OneToOneDependency
// import org.apache.spark.ShuffleDependency
// import org.apache.spark.SparkContext._
// import org.apache.spark.graph._
private[graph]
class EdgeTripletPartition(idx: Int, val vPart: Partition, val ePart: Partition)
extends Partition {
override val index: Int = idx
override def hashCode(): Int = idx
}
// private[graph]
// class EdgeTripletPartition(idx: Int, val vPart: Partition, val ePart: Partition)
// extends Partition {
// override val index: Int = idx
// override def hashCode(): Int = idx
// }
/**
* A RDD that brings together edge data with its associated vertex data.
*/
private[graph]
class EdgeTripletRDD[VD: ClassManifest, ED: ClassManifest](
vTableReplicated: RDD[(Vid, VD)],
eTable: RDD[(Pid, EdgePartition[ED])])
extends RDD[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])](eTable.context, Nil) {
// /**
// * A RDD that brings together edge data with its associated vertex data.
// */
// private[graph]
// class EdgeTripletRDD[VD: ClassManifest, ED: ClassManifest](
// vTableReplicated: IndexedRDD[Pid, VertexHashMap[VD]],
// eTable: IndexedRDD[Pid, EdgePartition[ED]])
// extends RDD[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])](eTable.context, Nil) {
//println("ddshfkdfhds" + vTableReplicated.partitioner.get.numPartitions)
//println("9757984589347598734549" + eTable.partitioner.get.numPartitions)
// //println("ddshfkdfhds" + vTableReplicated.partitioner.get.numPartitions)
// //println("9757984589347598734549" + eTable.partitioner.get.numPartitions)
assert(vTableReplicated.partitioner == eTable.partitioner)
// assert(vTableReplicated.partitioner == eTable.partitioner)
override def getDependencies: List[Dependency[_]] = {
List(new OneToOneDependency(eTable), new OneToOneDependency(vTableReplicated))
}
// override def getDependencies: List[Dependency[_]] = {
// List(new OneToOneDependency(eTable), new OneToOneDependency(vTableReplicated))
// }
override def getPartitions = Array.tabulate[Partition](eTable.partitions.size) {
i => new EdgeTripletPartition(i, eTable.partitions(i), vTableReplicated.partitions(i))
}
// override def getPartitions = Array.tabulate[Partition](eTable.partitions.size) {
// i => new EdgeTripletPartition(i, eTable.partitions(i), vTableReplicated.partitions(i))
// }
override val partitioner = eTable.partitioner
// override val partitioner = eTable.partitioner
override def getPreferredLocations(s: Partition) =
eTable.preferredLocations(s.asInstanceOf[EdgeTripletPartition].ePart)
// override def getPreferredLocations(s: Partition) =
// eTable.preferredLocations(s.asInstanceOf[EdgeTripletPartition].ePart)
override def compute(s: Partition, context: TaskContext)
: Iterator[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])] = {
// override def compute(s: Partition, context: TaskContext)
// : Iterator[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])] = {
val split = s.asInstanceOf[EdgeTripletPartition]
// val split = s.asInstanceOf[EdgeTripletPartition]
// Fetch the vertices and put them in a hashmap.
// TODO: use primitive hashmaps for primitive VD types.
val vmap = new VertexHashMap[VD]//(1000000)
vTableReplicated.iterator(split.vPart, context).foreach { v => vmap.put(v._1, v._2) }
// // Fetch the vertices and put them in a hashmap.
// // TODO: use primitive hashmaps for primitive VD types.
// val vmap = new VertexHashMap[VD]//(1000000)
// vTableReplicated.iterator(split.vPart, context).foreach { v => vmap.put(v._1, v._2) }
val (pid, edgePartition) = eTable.iterator(split.ePart, context).next()
.asInstanceOf[(Pid, EdgePartition[ED])]
// val (pid, edgePartition) = eTable.iterator(split.ePart, context).next()
// .asInstanceOf[(Pid, EdgePartition[ED])]
// Return an iterator that looks up the hash map to find matching vertices for each edge.
val iter = new Iterator[EdgeTriplet[VD, ED]] {
private var pos = 0
private val e = new EdgeTriplet[VD, ED]
e.src = new Vertex[VD]
e.dst = new Vertex[VD]
// // Return an iterator that looks up the hash map to find matching vertices for each edge.
// val iter = new Iterator[EdgeTriplet[VD, ED]] {
// private var pos = 0
// private val e = new EdgeTriplet[VD, ED]
// e.src = new Vertex[VD]
// e.dst = new Vertex[VD]
override def hasNext: Boolean = pos < edgePartition.size
override def next() = {
e.src.id = edgePartition.srcIds.getLong(pos)
// assert(vmap.containsKey(e.src.id))
e.src.data = vmap.get(e.src.id)
// override def hasNext: Boolean = pos < edgePartition.size
// override def next() = {
// e.src.id = edgePartition.srcIds.getLong(pos)
// // assert(vmap.containsKey(e.src.id))
// e.src.data = vmap.get(e.src.id)
e.dst.id = edgePartition.dstIds.getLong(pos)
// assert(vmap.containsKey(e.dst.id))
e.dst.data = vmap.get(e.dst.id)
// e.dst.id = edgePartition.dstIds.getLong(pos)
// // assert(vmap.containsKey(e.dst.id))
// e.dst.data = vmap.get(e.dst.id)
//println("Iter called: " + pos)
e.data = edgePartition.data(pos)
pos += 1
e
}
// //println("Iter called: " + pos)
// e.data = edgePartition.data(pos)
// pos += 1
// e
// }
override def toList: List[EdgeTriplet[VD, ED]] = {
val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
for (i <- (0 until edgePartition.size)) {
val currentEdge = new EdgeTriplet[VD, ED]
currentEdge.src = new Vertex[VD]
currentEdge.dst = new Vertex[VD]
currentEdge.src.id = edgePartition.srcIds.getLong(i)
// assert(vmap.containsKey(e.src.id))
currentEdge.src.data = vmap.get(currentEdge.src.id)
// override def toList: List[EdgeTriplet[VD, ED]] = {
// val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
// for (i <- (0 until edgePartition.size)) {
// val currentEdge = new EdgeTriplet[VD, ED]
// currentEdge.src = new Vertex[VD]
// currentEdge.dst = new Vertex[VD]
// currentEdge.src.id = edgePartition.srcIds.getLong(i)
// // assert(vmap.containsKey(e.src.id))
// currentEdge.src.data = vmap.get(currentEdge.src.id)
currentEdge.dst.id = edgePartition.dstIds.getLong(i)
// assert(vmap.containsKey(e.dst.id))
currentEdge.dst.data = vmap.get(currentEdge.dst.id)
// currentEdge.dst.id = edgePartition.dstIds.getLong(i)
// // assert(vmap.containsKey(e.dst.id))
// currentEdge.dst.data = vmap.get(currentEdge.dst.id)
currentEdge.data = edgePartition.data(i)
//println("Iter: " + pos + " " + e.src.id + " " + e.dst.id + " " + e.data)
//println("List: " + i + " " + currentEdge.src.id + " " + currentEdge.dst.id + " " + currentEdge.data)
lb += currentEdge
}
lb.toList
}
}
Iterator((vmap, iter))
}
}
// currentEdge.data = edgePartition.data(i)
// //println("Iter: " + pos + " " + e.src.id + " " + e.dst.id + " " + e.data)
// //println("List: " + i + " " + currentEdge.src.id + " " + currentEdge.dst.id + " " + currentEdge.data)
// lb += currentEdge
// }
// lb.toList
// }
// }
// Iterator((vmap, iter))
// }
// }

View file

@ -2,12 +2,18 @@ package org.apache.spark.graph.impl
import scala.collection.JavaConversions._
import scala.collection.mutable
import org.apache.spark.SparkContext._
import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner
import org.apache.spark.util.ClosureCleaner
import org.apache.spark.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.IndexedRDD
import org.apache.spark.rdd.RDDIndex
import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
@ -18,112 +24,224 @@ import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._
* A Graph RDD that supports computation on graphs.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val numVertexPartitions: Int,
val numEdgePartitions: Int,
_rawVertices: RDD[Vertex[VD]],
_rawEdges: RDD[Edge[ED]],
_rawVTable: RDD[(Vid, (VD, Array[Pid]))],
_rawETable: RDD[(Pid, EdgePartition[ED])])
val vTable: IndexedRDD[Vid, VD],
val vid2pid: IndexedRDD[Vid, Pid],
val eTable: IndexedRDD[Pid, EdgePartition[ED]])
extends Graph[VD, ED] {
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null)
/**
* The vTableReplicated is a version of the vertex data after it is
* replicated.
*/
val vTableReplicated: IndexedRDD[Pid, VertexHashMap[VD]] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined
// result, and get the shuffle id so we can use it on the slave.
vTable.cogroup(vid2pid)
.flatMap { case (vid, (vdatas, pids)) =>
pids.iterator.map {
pid => MessageToPartition(pid, (vid, vdatas.head))
}
}
.partitionBy(eTable.partitioner.get) //@todo assert edge table has partitioner
.mapPartitionsWithIndex( (pid, iter) => {
// Build the hashmap for each partition
val vmap = new VertexHashMap[VD]
for( msg <- iter ) { vmap.put(msg.data._1, msg.data._2) }
Array((pid, vmap)).iterator
}, preservesPartitioning = true)
.indexed(eTable.index)
}
def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
if (_cached) {
new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)
.cache()
} else {
new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
}
}
def withVertexPartitioner(numVertexPartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
def withEdgePartitioner(numEdgePartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
protected var _cached = false
// def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
// this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null)
// }
// def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
// if (_cached) {
// new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)
// .cache()
// } else {
// new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
// }
// }
// def withVertexPartitioner(numVertexPartitions: Int) = {
// withPartitioner(numVertexPartitions, numEdgePartitions)
// }
// def withEdgePartitioner(numEdgePartitions: Int) = {
// withPartitioner(numVertexPartitions, numEdgePartitions)
// }
override def cache(): Graph[VD, ED] = {
eTable.cache()
vid2pid.cache()
vTable.cache()
_cached = true
// @todo: should we cache the replicated data?
vTableReplicated.cache()
this
}
override def replication(): Double = {
val rep = vTable.map{ case (_, (_, a)) => a.size }.sum
val rep = vid2pid.groupByKey().map(kv => kv._2.size).sum
rep / vTable.count
}
override def balance(): Array[Int] = {
eTable.map{ case (_, epart) => epart.data.size }.collect
eTable.map{ case (pid, epart) => epart.data.size }.collect
}
override def reverse: Graph[VD, ED] = {
newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) })
val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]]
new GraphImpl(vTable, vid2pid, etable)
}
/** Return a RDD of vertices. */
override def vertices: RDD[Vertex[VD]] = {
if (!_cached && _rawVertices != null) {
_rawVertices
} else {
vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
}
}
override def vertices: RDD[(Vid, VD)] = vTable
/** Return a RDD of edges. */
override def edges: RDD[Edge[ED]] = {
if (!_cached && _rawEdges != null) {
_rawEdges
} else {
eTable.mapPartitions { iter => iter.next()._2.iterator }
}
eTable.mapPartitions { iter => iter.next()._2.iterator }
}
/** Return a RDD that brings edges with its source and destination vertices together. */
override def triplets: RDD[EdgeTriplet[VD, ED]] = {
new EdgeTripletRDD(vTableReplicated, eTable).mapPartitions { part => part.next()._2 }
vTableReplicated.join(eTable)
.mapPartitions{ iter =>
val (pid, (vmap, edgePartition)) = iter.next()
assert(iter.hasNext == false)
// Return an iterator that looks up the hash map to find matching
// vertices for each edge.
new Iterator[EdgeTriplet[VD, ED]] {
private var pos = 0
private val e = new EdgeTriplet[VD, ED]
e.src = new Vertex[VD]
e.dst = new Vertex[VD]
override def hasNext: Boolean = pos < edgePartition.size
override def next() = {
e.src.id = edgePartition.srcIds(pos)
// assert(vmap.containsKey(e.src.id))
e.src.data = vmap.get(e.src.id)
e.dst.id = edgePartition.dstIds(pos)
// assert(vmap.containsKey(e.dst.id))
e.dst.data = vmap.get(e.dst.id)
//println("Iter called: " + pos)
e.data = edgePartition.data(pos)
pos += 1
e
}
override def toList: List[EdgeTriplet[VD, ED]] = {
val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
for (i <- (0 until edgePartition.size)) {
val currentEdge = new EdgeTriplet[VD, ED]
currentEdge.src = new Vertex[VD]
currentEdge.dst = new Vertex[VD]
currentEdge.src.id = edgePartition.srcIds(i)
// assert(vmap.containsKey(e.src.id))
currentEdge.src.data = vmap.get(currentEdge.src.id)
currentEdge.dst.id = edgePartition.dstIds(i)
// assert(vmap.containsKey(e.dst.id))
currentEdge.dst.data = vmap.get(currentEdge.dst.id)
currentEdge.data = edgePartition.data(i)
lb += currentEdge
}
lb.toList
}
} // end of iterator
} // end of map partition
}
override def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] = {
newGraph(vertices.map(v => Vertex(v.id, f(v))), edges)
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data))
.asInstanceOf[IndexedRDD[Vid, VD2]]
new GraphImpl(newVTable, vid2pid, eTable)
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = {
newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e))))
val newETable = eTable.mapValues(eBlock => eBlock.map(f))
.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
new GraphImpl(vTable, vid2pid, newETable)
}
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2):
Graph[VD, ED2] = {
newGraph(vertices, triplets.map(e => Edge(e.src.id, e.dst.id, f(e))))
val newETable = eTable.join(vTableReplicated).mapValues{
case (edgePartition, vmap) =>
val et = new EdgeTriplet[VD, ED]
et.src = new Vertex[VD]
et.dst = new Vertex[VD]
edgePartition.map{e =>
et.data = e.data
et.src.id = e.src
et.src.data = vmap(e.src)
et.dst.id = e.dst
et.dst.data = vmap(e.dst)
f(et)
}
}.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
new GraphImpl(vTable, vid2pid, newETable)
}
override def correctEdges(): Graph[VD, ED] = {
val sc = vertices.context
val vset = sc.broadcast(vertices.map(_.id).collect().toSet)
val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst))
Graph(vertices, newEdges)
}
// override def correctEdges(): Graph[VD, ED] = {
// val sc = vertices.context
// val vset = sc.broadcast(vertices.map(_.id).collect().toSet)
// val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst))
// Graph(vertices, newEdges)
// }
override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (_ => true),
vpred: Vertex[VD] => Boolean = (_ => true) ): Graph[VD, ED] = {
override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = {
/// @todo: The following code behaves deterministically on each
/// vertex predicate but uses additional space. Should we swithc to
/// this version
// val predGraph = mapVertices(v => (v.data, vpred(v)))
// val newETable = predGraph.triplets.filter(t =>
// if(v.src.data._2 && v.dst.data._2) {
// val src = Vertex(t.src.id, t.src.data._1)
// val dst = Vertex(t.dst.id, t.dst.data._1)
// epred(new EdgeTriplet[VD, ED](src, dst, t.data))
// } else { false })
// val newVTable = predGraph.vertices.filter(v => v.data._1)
// .map(v => (v.id, v.data._1)).indexed()
// Reuse the partitioner (but not the index) from this graph
val newVTable = vertices.filter(v => vpred(v._1, v._2)).indexed(vTable.index.partitioner)
// Restrict the set of vertices to those that satisfy the vertex predicate
val newVertices = vertices.filter(vpred)
// Restrict the set of edges to those that satisfy the vertex and the edge predicate.
val newEdges = triplets.filter(t => vpred(t.src) && vpred(t.dst) && epred(t))
.map( t => Edge(t.src.id, t.dst.id, t.data) )
val newETable = createETable(
triplets.filter(
t => vpred( t.src.id, t.src.data ) && vpred( t.dst.id, t.dst.data ) && epred(t)
)
.map( t => Edge(t.src.id, t.dst.id, t.data) ),
eTable.index.partitioner.numPartitions
)
new GraphImpl(newVertices, newEdges)
// Construct the Vid2Pid map. Here we assume that the filter operation
// behaves deterministically.
// @todo reindex the vertex and edge tables
val newVid2Pid = createVid2Pid(newETable, newVTable.index)
new GraphImpl(newVTable, newVid2Pid, newETable)
}
@ -135,10 +253,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// TODO(crankshaw) is there a better way to do this using RDD.groupBy()
// functions?
override def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ):
override def groupEdgeTriplets[ED2: ClassManifest](
f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = {
//override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ):
Graph[VD,ED2] = {
// I think that
// myRDD.mapPartitions { part =>
// val (vmap, edges) = part.next()
@ -169,7 +287,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
.mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
// convert the resulting map back to a list of tuples
.toList
// TODO(crankshaw) needs an iterator over the tuples? Why can't I map over the list?
// TODO(crankshaw) needs an iterator over the tuples?
// Why can't I map over the list?
.toIterator
// map over those tuples that contain src and dst info plus the
// new edge data to make my new edges
@ -185,7 +304,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list
}
newGraph(vertices, newEdges)
// @todo eliminate the need to call createETable
val newETable = createETable(newEdges,
eTable.index.partitioner.numPartitions)
new GraphImpl(vTable, vid2pid, newETable)
}
@ -202,11 +326,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
.toList
.toIterator
.map { case ((src, dst), data) => Edge(src, dst, data) }
}
newGraph(vertices, newEdges)
// @todo eliminate the need to call createETable
val newETable = createETable(newEdges,
eTable.index.partitioner.numPartitions)
new GraphImpl(vTable, vid2pid, newETable)
}
@ -215,156 +340,90 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Lower level transformation methods
//////////////////////////////////////////////////////////////////////////////////////////////////
override def aggregateNeighbors[A: ClassManifest](
override def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
reduceFunc: (A, A) => A)
: RDD[(Vid, A)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable: RDD[(Vid, A)] =
vTableReplicated.join(eTable).flatMap{
case (pid, (vmap, edgePartition)) =>
val aggMap = new VertexHashMap[A]
val et = new EdgeTriplet[VD, ED]
et.src = new Vertex[VD]
et.dst = new Vertex[VD]
edgePartition.foreach{e =>
et.data = e.data
et.src.id = e.src
et.src.data = vmap(e.src)
et.dst.id = e.dst
et.dst.data = vmap(e.dst)
mapFunc(et).foreach{case (vid, a) =>
if(aggMap.containsKey(vid)) {
aggMap.put(vid, reduceFunc(aggMap.get(vid), a))
} else { aggMap.put(vid, a) }
}
}
// Return the aggregate map
aggMap.long2ObjectEntrySet().fastIterator().map{
entry => (entry.getLongKey(), entry.getValue())
}
}
.indexed(vTable.index).reduceByKey(reduceFunc)
newVTable
}
def aggregateNeighbors[A: ClassManifest](
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
default: A,
gatherDirection: EdgeDirection)
: Graph[(VD, Option[A]), ED] = {
dir: EdgeDirection)
: RDD[(Vid, A)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[A])) }
}, preservesPartitioning = true)
// Define a new map function over edge triplets
def mf(et: EdgeTriplet[VD,ED]): Array[(Vid, A)] = {
// Compute the message to the dst vertex
val dstA =
if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
mapFunc(et.dst.id, et)
} else { Option.empty[A] }
// Compute the message to the source vertex
val srcA =
if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
mapFunc(et.src.id, et)
} else { Option.empty[A] }
// construct the return array
(srcA, dstA) match {
case (None, None) => Array.empty[(Vid, A)]
case (Some(src),None) => Array((et.src.id, src))
case (None, Some(dst)) => Array((et.dst.id, dst))
case (Some(src), Some(dst)) =>
Array((et.src.id, src), (et.dst.id, dst))
}
}
val newVertices: RDD[(Vid, A)] =
new EdgeTripletRDD[MutableTuple2[VD, Option[A]], ED](newVTable, eTable)
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeTriplet[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { e: EdgeTriplet[MutableTuple2[VD, Option[A]], ED] =>
edgeSansAcc.data = e.data
edgeSansAcc.src.data = e.src.data._1
edgeSansAcc.dst.data = e.dst.data._1
edgeSansAcc.src.id = e.src.id
edgeSansAcc.dst.id = e.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2
}
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.src.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2
}
}
}
vmap.long2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
(entry.getLongKey(), entry.getValue()._2)
}
}
.map{ case (vid, aOpt) => (vid, aOpt.get) }
.combineByKey((v: A) => v, reduceFunc, null, vertexPartitioner, false)
this.leftJoinVertices(newVertices, (v: Vertex[VD], a: Option[A]) => (v.data, a))
mapReduceTriplets(mf, reduceFunc)
}
/**
* Same as aggregateNeighbors but map function can return none and there is no default value.
* As a consequence, the resulting table may be much smaller than the set of vertices.
*/
override def aggregateNeighbors[A: ClassManifest](
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
gatherDirection: EdgeDirection): Graph[(VD, Option[A]), ED] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
val newVTable = vTableReplicated.mapPartitions({ part =>
part.map { v => (v._1, MutableTuple2(v._2, Option.empty[A])) }
}, preservesPartitioning = true)
val newVertices: RDD[(Vid, A)] =
new EdgeTripletRDD[MutableTuple2[VD, Option[A]], ED](newVTable, eTable)
.mapPartitions { part =>
val (vmap, edges) = part.next()
val edgeSansAcc = new EdgeTriplet[VD, ED]()
edgeSansAcc.src = new Vertex[VD]
edgeSansAcc.dst = new Vertex[VD]
edges.foreach { e: EdgeTriplet[MutableTuple2[VD, Option[A]], ED] =>
edgeSansAcc.data = e.data
edgeSansAcc.src.data = e.src.data._1
edgeSansAcc.dst.data = e.dst.data._1
edgeSansAcc.src.id = e.src.id
edgeSansAcc.dst.id = e.dst.id
if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
e.dst.data._2 =
if (e.dst.data._2.isEmpty) {
mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2
}
}
if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
e.src.data._2 =
if (e.src.data._2.isEmpty) {
mapFunc(edgeSansAcc.src.id, edgeSansAcc)
} else {
val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2
}
}
}
vmap.long2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
(entry.getLongKey(), entry.getValue()._2)
}
}
.map{ case (vid, aOpt) => (vid, aOpt.get) }
.combineByKey((v: A) => v, reduceFunc, null, vertexPartitioner, false)
this.leftJoinVertices(newVertices, (v: Vertex[VD], a: Option[A]) => (v.data, a))
}
override def leftJoinVertices[U: ClassManifest, VD2: ClassManifest](
updates: RDD[(Vid, U)],
updateF: (Vertex[VD], Option[U]) => VD2)
override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
(updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2)
: Graph[VD2, ED] = {
ClosureCleaner.clean(updateF)
val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), update)) =>
val newVdata = updateF(Vertex(vid, vdata), update)
(vid, (newVdata, pids))
}
}, preservesPartitioning = true).cache()
new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable)
}
override def joinVertices[U: ClassManifest](
updates: RDD[(Vid, U)],
updateF: (Vertex[VD], U) => VD)
: Graph[VD, ED] = {
ClosureCleaner.clean(updateF)
val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
iter.map { case (vid, ((vdata, pids), update)) =>
if (update.isDefined) {
val newVdata = updateF(Vertex(vid, vdata), update.get)
(vid, (newVdata, pids))
} else {
(vid, (vdata, pids))
}
}
}, preservesPartitioning = true).cache()
new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable)
val newVTable = vTable.leftOuterJoin(updates).mapValuesWithKeys{
case (vid, (data, other)) => updateF(vid, data, other)
}.asInstanceOf[IndexedRDD[Vid,VD2]]
new GraphImpl(newVTable, vid2pid, eTable)
}
@ -372,49 +431,130 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Internals hidden from callers
//////////////////////////////////////////////////////////////////////////////////////////////////
// TODO: Support non-hash partitioning schemes.
protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
/** Create a new graph but keep the current partitioning scheme. */
protected def newGraph[VD2: ClassManifest, ED2: ClassManifest](
vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = {
(new GraphImpl[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions)
}
protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
if (_rawETable == null) {
createETable(_rawEdges, numEdgePartitions)
} else {
_rawETable
}
}
protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
if (_rawVTable == null) {
createVTable(_rawVertices, eTable, numVertexPartitions)
} else {
_rawVTable
}
}
// /** Create a new graph but keep the current partitioning scheme. */
// protected def newGraph[VD2: ClassManifest, ED2: ClassManifest](
// vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = {
// (new GraphImpl[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions)
// }
protected lazy val vTableReplicated: RDD[(Vid, VD)] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get
// the shuffle id so we can use it on the slave.
vTable
.flatMap { case (vid, (vdata, pids)) =>
pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
}
.partitionBy(edgePartitioner)
.mapPartitions({ part =>
part.map { message => (message.data._1, message.data._2) }
}, preservesPartitioning = true)
}
// protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
// if (_rawETable == null) {
// createETable(_rawEdges, numEdgePartitions)
// } else {
// _rawETable
// }
// }
// protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
// if (_rawVTable == null) {
// createVTable(_rawVertices, eTable, numVertexPartitions)
// } else {
// _rawVTable
// }
// }
// protected lazy val vTableReplicated: RDD[(Vid, VD)] = {
// // Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get
// // the shuffle id so we can use it on the slave.
// vTable
// .flatMap { case (vid, (vdata, pids)) =>
// pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
// }
// .partitionBy(edgePartitioner)
// .mapPartitions({ part =>
// part.map { message => (message.data._1, message.data._2) }
// }, preservesPartitioning = true)
// }
}
object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]]):
GraphImpl[VD,ED] = {
apply(vertices, edges,
vertices.context.defaultParallelism, edges.context.defaultParallelism)
}
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]],
numVPart: Int, numEPart: Int): GraphImpl[VD,ED] = {
val vtable = vertices.indexed(numVPart)
val etable = createETable(edges, numEPart)
val vid2pid = createVid2Pid(etable, vtable.index)
new GraphImpl(vtable, vid2pid, etable)
}
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
*
* The edge table contains multiple partitions, and each partition contains only one RDD
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
protected def createETable[ED: ClassManifest](
edges: RDD[Edge[ED]], numPartitions: Int)
: IndexedRDD[Pid, EdgePartition[ED]] = {
val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
edges
.map { e =>
// Random partitioning based on the source vertex id.
// val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions)
val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
//val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
// Should we be using 3-tuple or an optimized class
MessageToPartition(part, (e.src, e.dst, e.data))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex({ (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]
iter.foreach { message =>
val data = message.data
builder.add(data._1, data._2, data._3)
}
Iterator((pid, builder.toEdgePartition))
}, preservesPartitioning = true).indexed()
}
protected def createVid2Pid[ED: ClassManifest](
eTable: IndexedRDD[Pid, EdgePartition[ED]],
vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Pid] = {
eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
edgePartition.foreach(e => {vSet.add(e.src); vSet.add(e.dst)})
vSet.iterator.map { vid => (vid.toLong, pid) }
}.indexed(vTableIndex)
}
protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L
@ -500,70 +640,44 @@ object GraphImpl {
}
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
*
* The edge table contains multiple partitions, and each partition contains only one RDD
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
: RDD[(Pid, EdgePartition[ED])] = {
val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
edges
.map { e =>
// Random partitioning based on the source vertex id.
// val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions)
val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
//val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
// Should we be using 3-tuple or an optimized class
MessageToPartition(part, (e.src, e.dst, e.data))
// (math.abs(e.src) % numPartitions, (e.src, e.dst, e.data))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex({ (pid, iter) =>
val edgePartition = new EdgePartition[ED]
iter.foreach { message =>
val data = message.data
edgePartition.add(data._1, data._2, data._3)
}
edgePartition.trim()
Iterator((pid, edgePartition))
}, preservesPartitioning = true)
}
protected def createVTable[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[Vertex[VD]],
eTable: RDD[(Pid, EdgePartition[ED])],
numPartitions: Int)
: RDD[(Vid, (VD, Array[Pid]))] = {
val partitioner = new HashPartitioner(numPartitions)
// protected def createVTable[VD: ClassManifest, ED: ClassManifest](
// eTable: IndexedRDD[Pid, EdgePartition[ED]],
// vid2pid: Index
// vertices: RDD[Vertex[VD]],
// A key-value RDD. The key is a vertex id, and the value is a list of
// partitions that contains edges referencing the vertex.
val vid2pid : RDD[(Vid, Seq[Pid])] = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
var i = 0
while (i < edgePartition.srcIds.size) {
vSet.add(edgePartition.srcIds.getLong(i))
vSet.add(edgePartition.dstIds.getLong(i))
i += 1
}
vSet.iterator.map { vid => (vid.toLong, pid) }
}.groupByKey(partitioner)
// default: VD) : IndexedRDD[Vid, VD] = {
vertices
.map { v => (v.id, v.data) }
.partitionBy(partitioner)
.leftOuterJoin(vid2pid)
.mapValues {
case (vdata, None) => (vdata, Array.empty[Pid])
case (vdata, Some(pids)) => (vdata, pids.toArray)
}
}
// // Compute all the vertices in the edge table.
// val vid2pid = createVid2Pid(eTable)
// // Compute all the
// vertices.map(v => (v.id, v.data)).cogroup(vids)
// // A key-value RDD. The key is a vertex id, and the value is a list of
// // partitions that contains edges referencing the vertex.
// val vid2pid : RDD[(Vid, Seq[Pid])] = eTable.mapPartitions { iter =>
// val (pid, edgePartition) = iter.next()
// val vSet = new VertexSet
// var i = 0
// while (i < edgePartition.srcIds.size) {
// vSet.add(edgePartition.srcIds.getLong(i))
// vSet.add(edgePartition.dstIds.getLong(i))
// i += 1
// }
// vSet.iterator.map { vid => (vid.toLong, pid) }
// }.groupByKey(partitioner)
// vertices
// .map { v => (v.id, v.data) }
// .partitionBy(partitioner)
// .leftOuterJoin(vid2pid)
// .mapValues {
// case (vdata, None) => (vdata, Array.empty[Pid])
// case (vdata, Some(pids)) => (vdata, pids.toArray)
// }
// }
}

View file

@ -37,7 +37,7 @@ object GraphGenerators {
val host = "local[4]"
val sc = new SparkContext(host, "Lognormal graph generator")
val lnGraph = lognormalGraph(sc, 10000)
val lnGraph = logNormalGraph(sc, 10000)
val rmat = rmatGraph(sc, 1000, 3000)
@ -69,19 +69,21 @@ object GraphGenerators {
// Right now it just generates a bunch of edges where
// the edge data is the weight (default 1)
def lognormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = {
def logNormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = {
// based on Pregel settings
val mu = 4
val sigma = 1.3
//val vertsAndEdges = (0 until numVertices).flatMap { src => {
val vertices = (0 until numVertices).flatMap { src =>
Array(Vertex(src, sampleLogNormal(mu, sigma, numVertices))) }
val edges = vertices.flatMap( { v =>
generateRandomEdges(v.id.toInt, v.data, numVertices) })
val vertices: RDD[(Vid, Int)] = sc.parallelize(0 until numVertices).map{
src => (src, sampleLogNormal(mu, sigma, numVertices))
}
val edges = vertices.flatMap{
v => generateRandomEdges(v._1.toInt, v._2, numVertices)
}
new GraphImpl[Int, Int](sc.parallelize(vertices), sc.parallelize(edges))
GraphImpl(vertices, edges)
//println("Vertices:")
//for (v <- vertices) {
// println(v.id)
@ -161,8 +163,8 @@ object GraphGenerators {
val vertices = edges.flatMap { edge => List((edge.src, 1)) }
.reduceByKey(_ + _)
.map{ case (vid, degree) => Vertex(vid, degree) }
new GraphImpl[Int, ED](vertices, edges)
.map{ case (vid, degree) => (vid, degree) }
GraphImpl(vertices, edges)
}
/**