Specializing IndexedRDD as VertexSetRDD.

1) This allows the index map to be optimized for Vids
2) This makes the code more readable
2) The Graph API can now return VertexSetRDDs from operations that produce results for vertices
This commit is contained in:
Joseph E. Gonzalez 2013-10-18 18:45:10 -07:00
parent 0794bd7bc5
commit 5d01ebca3c
4 changed files with 212 additions and 209 deletions

View file

@ -28,7 +28,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @see Vertex for the vertex type.
*
*/
val vertices: RDD[(Vid,VD)]
val vertices: VertexSetRDD[VD]
/**
* Get the Edges and their data as an RDD. The entries in the RDD contain
@ -257,7 +257,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
reduceFunc: (A, A) => A)
: RDD[(Vid, A)]
: VertexSetRDD[A]
/**

View file

@ -13,11 +13,11 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
lazy val numVertices: Long = graph.vertices.count()
lazy val inDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.In)
lazy val inDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.In)
lazy val outDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Out)
lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out)
lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both)
lazy val degrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Both)
/**
@ -62,7 +62,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
dir: EdgeDirection)
: RDD[(Vid, A)] = {
: VertexSetRDD[A] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
@ -94,20 +94,20 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
} // end of aggregateNeighbors
def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
def collectNeighborIds(edgeDirection: EdgeDirection) : VertexSetRDD[Array[Vid]] = {
val nbrs = graph.aggregateNeighbors[Array[Vid]](
(vid, edge) => Some(Array(edge.otherVertexId(vid))),
(a, b) => a ++ b,
edgeDirection)
graph.vertices.leftOuterJoin(nbrs).mapValues{
graph.vertices.leftZipJoin(nbrs).mapValues{
case (_, Some(nbrs)) => nbrs
case (_, None) => Array.empty[Vid]
}
}
private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = {
private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = {
graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
}

View file

@ -39,47 +39,40 @@ import org.apache.spark.storage.StorageLevel
/**
* The BlockIndex is the internal map structure used inside the index
* of the IndexedRDD.
*/
class BlockIndex[@specialized K: ClassManifest] extends JHashMap[K,Int]
/**
* The RDDIndex is an opaque type used to represent the organization
* The VertexSetIndex is an opaque type used to represent the organization
* of values in an RDD
*/
class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockIndex[K]]) {
def persist(newLevel: StorageLevel): RDDIndex[K] = {
class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
def persist(newLevel: StorageLevel): VertexSetIndex = {
rdd.persist(newLevel)
return this
}
def partitioner: Partitioner = rdd.partitioner.get
}
/**
* An IndexedRDD[K,V] extends the RDD[(K,V)] by pre-indexing the keys and
* An VertexSetRDD[V] extends the RDD[(Vid,V)] by pre-indexing the keys and
* organizing the values to enable faster join operations.
*
* In addition to providing the basic RDD[(K,V)] functionality the IndexedRDD
* exposes an index member which can be used to "key" other IndexedRDDs
* In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD
* exposes an index member which can be used to "key" other VertexSetRDDs
*
*/
class IndexedRDD[K: ClassManifest, V: ClassManifest](
@transient val index: RDDIndex[K],
class VertexSetRDD[V: ClassManifest](
@transient val index: VertexSetIndex,
@transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
extends RDD[(K, V)](index.rdd.context,
extends RDD[(Vid, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
/**
* Construct a new IndexedRDD that is indexed by only the keys in the RDD
* Construct a new VertexSetRDD that is indexed by only the keys in the RDD
*/
def reindex(): IndexedRDD[K,V] = IndexedRDD(this)
def reindex(): VertexSetRDD[V] = VertexSetRDD(this)
/**
@ -109,20 +102,26 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
/**
* Caching an IndexedRDD causes the index and values to be cached separately.
* Caching an VertexSetRDD causes the index and values to be cached separately.
*/
override def persist(newLevel: StorageLevel): RDD[(K,V)] = {
override def persist(newLevel: StorageLevel): VertexSetRDD[V] = {
index.persist(newLevel)
valuesRDD.persist(newLevel)
return this
}
override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): VertexSetRDD[V] = persist()
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = {
def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
val cleanF = index.rdd.context.clean(f)
val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
valuesRDD.mapPartitions(iter => iter.map{
@ -133,7 +132,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
(newValues.toIndexedSeq, bs)
}, preservesPartitioning = true)
new IndexedRDD[K,U](index, newValuesRDD)
new VertexSetRDD[U](index, newValuesRDD)
}
@ -141,7 +140,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = {
def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
val cleanF = index.rdd.context.clean(f)
val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
index.rdd.zipPartitions(valuesRDD){
@ -158,11 +157,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
Array((newValues.toIndexedSeq, bs)).iterator
}
new IndexedRDD[K,U](index, newValues)
new VertexSetRDD[U](index, newValues)
}
def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = {
def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = {
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
@ -176,11 +175,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val newValues = thisValues.view.zip(otherValues)
Iterator((newValues.toIndexedSeq, newBS))
}
new IndexedRDD(index, newValuesRDD)
new VertexSetRDD(index, newValuesRDD)
}
def leftZipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,Option[W])] = {
def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = {
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
@ -195,18 +194,18 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val newValues = thisValues.view.zip(otherOption)
Iterator((newValues.toIndexedSeq, thisBS))
}
new IndexedRDD(index, newValuesRDD)
new VertexSetRDD(index, newValuesRDD)
}
def leftJoin[W: ClassManifest](
other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a):
IndexedRDD[K, (V, Option[W]) ] = {
other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a):
VertexSetRDD[(V, Option[W]) ] = {
val cleanMerge = index.rdd.context.clean(merge)
other match {
case other: IndexedRDD[_, _] if index == other.index => {
case other: VertexSetRDD[_] if index == other.index => {
leftZipJoin(other)
}
case _ => {
@ -247,21 +246,21 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
Iterator((newValues.toIndexedSeq, thisBS))
} // end of newValues
new IndexedRDD(index, newValues)
new VertexSetRDD(index, newValues)
}
}
}
/**
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner):
IndexedRDD[K, (Seq[V], Seq[W])] = {
def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner):
VertexSetRDD[(Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = {
other match {
case other: IndexedRDD[_, _] if index == other.index => {
case other: VertexSetRDD[_] if index == other.index => {
// if both RDDs share exactly the same index and therefore the same super set of keys
// then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in
@ -284,9 +283,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
Iterator((newValues.toIndexedSeq, newBS))
}
new IndexedRDD(index, newValues)
new VertexSetRDD(index, newValues)
}
case other: IndexedRDD[_, _]
case other: VertexSetRDD[_]
if index.rdd.partitioner == other.index.rdd.partitioner => {
// If both RDDs are indexed using different indices but with the same partitioners
// then we we need to first merge the indicies and then use the merged index to
@ -298,7 +297,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
assert(!thisIter.hasNext)
val otherIndex = otherIter.next()
assert(!otherIter.hasNext)
val newIndex = new BlockIndex[K]()
val newIndex = new VertexIdToIndexMap()
// @todo Merge only the keys that correspond to non-null values
// Merge the keys
newIndex.putAll(thisIndex)
@ -318,7 +317,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
// Get the new index for this partition
val newIndex = newIndexIter.next()
assert(!newIndexIter.hasNext)
// Get the corresponding indicies and values for this and the other IndexedRDD
// Get the corresponding indicies and values for this and the other VertexSetRDD
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext)
val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next()
@ -347,7 +346,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
Iterator((newValues.toIndexedSeq, newBS))
})
new IndexedRDD(new RDDIndex(newIndex), newValues)
new VertexSetRDD(new VertexSetIndex(newIndex), newValues)
}
case _ => {
// Get the partitioner from the index
@ -360,20 +359,20 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
if (other.partitioner == Some(partitioner)) {
other
} else {
new ShuffledRDD[K, W, (K,W)](other, partitioner)
other.partitionBy(partitioner)
}
// Join the other RDD with this RDD building a new valueset and new index on the fly
val groups = tuples.zipPartitions(otherShuffled)(
(thisTuplesIter, otherTuplesIter) => {
// Get the corresponding indicies and values for this IndexedRDD
// Get the corresponding indicies and values for this VertexSetRDD
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
// Construct a new index
val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]]
val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap]
// Construct a new array Buffer to store the values
val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null)
val newBS = new BitSet(thisValues.size)
// populate the newValues with the values in this IndexedRDD
// populate the newValues with the values in this VertexSetRDD
for ((k,i) <- thisIndex) {
if (thisBS(i)) {
newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W])
@ -415,14 +414,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues)
new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues)
}
}
}
//
// def zipJoinToRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = {
// def zipJoinToRDD[W: ClassManifest](other: VertexSetRDD[K,W]): RDD[(K,(V,W))] = {
// if(index != other.index) {
// throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!")
// }
@ -447,11 +446,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
other: RDD[(K,W)])(
f: (K, V, W) => Z,
merge: (Z,Z) => Z = (a:Z, b:Z) => a):
IndexedRDD[K,Z] = {
VertexSetRDD[K,Z] = {
val cleanF = index.rdd.context.clean(f)
val cleanMerge = index.rdd.context.clean(merge)
other match {
case other: IndexedRDD[_, _] if index == other.index => {
case other: VertexSetRDD[_, _] if index == other.index => {
val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
(thisIndexIter, thisIter, otherIter) =>
val index = thisIndexIter.next()
@ -469,7 +468,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
List((newValues, newBS)).iterator
}
new IndexedRDD(index, newValues)
new VertexSetRDD(index, newValues)
}
case _ => {
@ -508,7 +507,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
List((newValues, tempBS)).iterator
} // end of newValues
new IndexedRDD(index, newValues)
new VertexSetRDD(index, newValues)
}
}
}
@ -519,11 +518,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
other: RDD[(K,W)])(
f: (K, V, Option[W]) => Z,
merge: (Z,Z) => Z = (a:Z, b:Z) => a):
IndexedRDD[K,Z] = {
VertexSetRDD[K,Z] = {
val cleanF = index.rdd.context.clean(f)
val cleanMerge = index.rdd.context.clean(merge)
other match {
case other: IndexedRDD[_, _] if index == other.index => {
case other: VertexSetRDD[_, _] if index == other.index => {
val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
(thisIndexIter, thisIter, otherIter) =>
val index = thisIndexIter.next()
@ -541,7 +540,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
List((newValues, thisBS)).iterator
}
new IndexedRDD(index, newValues)
new VertexSetRDD(index, newValues)
}
case _ => {
@ -584,7 +583,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
List((newValues, thisBS)).iterator
} // end of newValues
new IndexedRDD(index, newValues)
new VertexSetRDD(index, newValues)
}
}
}
@ -593,7 +592,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = {
override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
val cleanF = index.rdd.context.clean(f)
val newValues = index.rdd.zipPartitions(valuesRDD){
(keysIter, valuesIter) =>
@ -609,14 +608,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
Array((oldValues, newBS)).iterator
}
new IndexedRDD[K,V](index, newValues)
new VertexSetRDD[V](index, newValues)
}
/**
* Provide the RDD[(K,V)] equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = {
tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) =>
// Walk the index to construct the key, value pairs
indexMap.iterator
@ -629,27 +628,27 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
}
}
} // End of IndexedRDD
} // End of VertexSetRDD
object IndexedRDD {
object VertexSetRDD {
def apply[K: ClassManifest, V: ClassManifest](rdd: RDD[(K,V)]): IndexedRDD[K,V] =
def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] =
apply(rdd, (a:V, b:V) => a )
def apply[K: ClassManifest, V: ClassManifest](
rdd: RDD[(K,V)], reduceFunc: (V, V) => V): IndexedRDD[K,V] = {
def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
// Preaggregate and shuffle if necessary
// Preaggregation.
val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc)
val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc)
val partitioner = new HashPartitioner(rdd.partitions.size)
val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
val groups = preAgg.mapPartitions( iter => {
val indexMap = new BlockIndex[K]()
val indexMap = new VertexIdToIndexMap()
val values = new ArrayBuffer[V]
val bs = new BitSet
for ((k,v) <- iter) {
@ -669,19 +668,19 @@ object IndexedRDD {
val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
val values: RDD[(IndexedSeq[V], BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K,V](new RDDIndex(index), values)
new VertexSetRDD[V](new VertexSetIndex(index), values)
}
def apply[K: ClassManifest, V: ClassManifest](
rdd: RDD[(K,V)], index: RDDIndex[K]): IndexedRDD[K,V] =
def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] =
apply(rdd, index, (a:V,b:V) => a)
def apply[K: ClassManifest, V: ClassManifest](
rdd: RDD[(K,V)], index: RDDIndex[K],
reduceFunc: (V, V) => V): IndexedRDD[K,V] =
def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], index: VertexSetIndex,
reduceFunc: (V, V) => V): VertexSetRDD[V] =
apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
// {
// // Get the index Partitioner
@ -721,16 +720,16 @@ object IndexedRDD {
// }
// List((values, bs)).iterator
// })
// new IndexedRDD[K,V](index, values)
// new VertexSetRDD[K,V](index, values)
// } // end of apply
def apply[K: ClassManifest, V: ClassManifest, C: ClassManifest](
rdd: RDD[(K,V)],
index: RDDIndex[K],
def apply[V: ClassManifest, C: ClassManifest](
rdd: RDD[(Vid,V)],
index: VertexSetIndex,
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): IndexedRDD[K,C] = {
mergeCombiners: (C, C) => C): VertexSetRDD[C] = {
// Get the index Partitioner
val partitioner = index.rdd.partitioner match {
case Some(p) => p
@ -740,7 +739,7 @@ object IndexedRDD {
val partitioned =
if (rdd.partitioner != Some(partitioner)) {
// Preaggregation.
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue,
val aggregator = new Aggregator[Vid, V, C](createCombiner, mergeValue,
mergeCombiners)
rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
} else {
@ -769,15 +768,15 @@ object IndexedRDD {
}
Iterator((values, bs))
})
new IndexedRDD(index, values)
new VertexSetRDD(index, values)
} // end of apply
/**
* Construct and index of the unique values in a given RDD.
*/
def makeIndex[K: ClassManifest](keys: RDD[K],
partitioner: Option[Partitioner] = None): RDDIndex[K] = {
def makeIndex(keys: RDD[Vid],
partitioner: Option[Partitioner] = None): VertexSetIndex = {
// @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD
// Ugly hack :-(. In order to partition the keys they must have values.
val tbl = keys.mapPartitions(_.map(k => (k, false)), true)
@ -786,7 +785,7 @@ object IndexedRDD {
case None => {
if (tbl.partitioner.isEmpty) {
// @todo: I don't need the boolean its only there to be the second type of the shuffle.
new ShuffledRDD[K, Boolean, (K, Boolean)](tbl, Partitioner.defaultPartitioner(tbl))
new ShuffledRDD[Vid, Boolean, (Vid, Boolean)](tbl, Partitioner.defaultPartitioner(tbl))
} else { tbl }
}
case Some(partitioner) =>
@ -794,7 +793,7 @@ object IndexedRDD {
}
val index = shuffledTbl.mapPartitions( iter => {
val indexMap = new BlockIndex[K]()
val indexMap = new VertexIdToIndexMap()
for ( (k,_) <- iter ){
if(!indexMap.contains(k)){
val ind = indexMap.size
@ -803,10 +802,10 @@ object IndexedRDD {
}
Iterator(indexMap)
}, true).cache
new RDDIndex(index)
new VertexSetIndex(index)
}
} // end of object IndexedRDD
} // end of object VertexSetRDD

View file

@ -66,17 +66,16 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
object EdgeTripletBuilder {
def makeTriplets[VD: ClassManifest, ED: ClassManifest](
localVidMap: IndexedRDD[Pid, VertexIdToIndexMap],
vTableReplicatedValues: IndexedRDD[Pid, Array[VD]],
eTable: IndexedRDD[Pid, EdgePartition[ED]]): RDD[EdgeTriplet[VD, ED]] = {
val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => {
val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next()
assert(iter.hasNext == false)
localVidMap: RDD[(Pid, VertexIdToIndexMap)],
vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
localVidMap.zipPartitions(vTableReplicatedValues, eTable) {
(vidMapIter, replicatedValuesIter, eTableIter) =>
val (_, vidToIndex) = vidMapIter.next()
val (_, vertexArray) = replicatedValuesIter.next()
val (_, edgePartition) = eTableIter.next()
new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
}
ClosureCleaner.clean(iterFun)
localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable)
.mapPartitions( iterFun ) // end of map partition
}
}
@ -100,30 +99,30 @@ object EdgeTripletBuilder {
* A Graph RDD that supports computation on graphs.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vTable: IndexedRDD[Vid, VD],
@transient val vid2pid: IndexedRDD[Vid, Array[Pid]],
@transient val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap],
@transient val eTable: IndexedRDD[Pid, EdgePartition[ED]])
@transient val vTable: VertexSetRDD[VD],
@transient val vid2pid: VertexSetRDD[Array[Pid]],
@transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)],
@transient val eTable: RDD[(Pid, EdgePartition[ED])] )
extends Graph[VD, ED] {
// def this() = this(null,null,null)
/**
* (localVidMap: IndexedRDD[Pid, VertexIdToIndexMap]) is a version of the
* (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
* vertex data after it is replicated. Within each partition, it holds a map
* from vertex ID to the index where that vertex's attribute is stored. This
* index refers to an array in the same partition in vTableReplicatedValues.
*
* (vTableReplicatedValues: IndexedRDD[Pid, Array[VD]]) holds the vertex data
* (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data
* and is arranged as described above.
*/
@transient val vTableReplicatedValues =
@transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] =
createVTableReplicated(vTable, vid2pid, localVidMap)
/** Return a RDD of vertices. */
@transient override val vertices: RDD[(Vid, VD)] = vTable
@transient override val vertices = vTable
/** Return a RDD of edges. */
@ -177,36 +176,40 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def reverse: Graph[VD, ED] = {
val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]]
new GraphImpl(vTable, vid2pid, localVidMap, etable)
val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) },
preservesPartitioning = true)
new GraphImpl(vTable, vid2pid, localVidMap, newEtable)
}
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data))
.asInstanceOf[IndexedRDD[Vid, VD2]]
new GraphImpl(newVTable, vid2pid, localVidMap, eTable)
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = {
val newETable = eTable.mapValues(eBlock => eBlock.map(f))
.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
val newETable = eTable.mapPartitions(_.map{ case (pid, epart) => (pid, epart.map(f)) },
preservesPartitioning = true)
new GraphImpl(vTable, vid2pid, localVidMap, newETable)
}
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2):
Graph[VD, ED2] = {
val newETable = eTable.zipJoin(localVidMap).zipJoin(vTableReplicatedValues).mapValues{
case ((edgePartition, vidToIndex), vertexArray) =>
val et = new EdgeTriplet[VD, ED]
edgePartition.map{e =>
et.set(e)
et.srcAttr = vertexArray(vidToIndex(e.srcId))
et.dstAttr = vertexArray(vidToIndex(e.dstId))
f(et)
}
}.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
val newETable = eTable.zipPartitions(localVidMap, vTableReplicatedValues){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (pid, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
val (_, vertexArray) = vertexArrayIter.next()
val et = new EdgeTriplet[VD, ED]
val newEdgePartition = edgePartition.map{e =>
et.set(e)
et.srcAttr = vertexArray(vidToIndex(e.srcId))
et.dstAttr = vertexArray(vidToIndex(e.dstId))
f(et)
}
Iterator((pid, newEdgePartition))
}
new GraphImpl(vTable, vid2pid, localVidMap, newETable)
}
@ -238,7 +241,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Reuse the partitioner (but not the index) from this graph
val newVTable =
IndexedRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner))
VertexSetRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner))
// Restrict the set of edges to those that satisfy the vertex and the edge predicate.
@ -309,53 +312,56 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
reduceFunc: (A, A) => A)
: RDD[(Vid, A)] = {
: VertexSetRDD[A] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// Map and preaggregate
val preAgg = localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable).flatMap{
case (pid, ((vidToIndex, vertexArray), edgePartition)) =>
// We can reuse the vidToIndex map for aggregation here as well.
/** @todo Since this has the downside of not allowing "messages" to arbitrary
* vertices we should consider just using a fresh map.
*/
val msgArray = new Array[A](vertexArray.size)
val msgBS = new BitSet(vertexArray.size)
// Iterate over the partition
val et = new EdgeTriplet[VD, ED]
edgePartition.foreach{e =>
et.set(e)
et.srcAttr = vertexArray(vidToIndex(e.srcId))
et.dstAttr = vertexArray(vidToIndex(e.dstId))
mapFunc(et).foreach{ case (vid, msg) =>
// verify that the vid is valid
assert(vid == et.srcId || vid == et.dstId)
val ind = vidToIndex(vid)
// Populate the aggregator map
if(msgBS(ind)) {
msgArray(ind) = reduceFunc(msgArray(ind), msg)
} else {
msgArray(ind) = msg
msgBS(ind) = true
}
val preAgg = eTable.zipPartitions(localVidMap, vTableReplicatedValues){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (pid, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
val (_, vertexArray) = vertexArrayIter.next()
// We can reuse the vidToIndex map for aggregation here as well.
/** @todo Since this has the downside of not allowing "messages" to arbitrary
* vertices we should consider just using a fresh map.
*/
val msgArray = new Array[A](vertexArray.size)
val msgBS = new BitSet(vertexArray.size)
// Iterate over the partition
val et = new EdgeTriplet[VD, ED]
edgePartition.foreach{e =>
et.set(e)
et.srcAttr = vertexArray(vidToIndex(e.srcId))
et.dstAttr = vertexArray(vidToIndex(e.dstId))
mapFunc(et).foreach{ case (vid, msg) =>
// verify that the vid is valid
assert(vid == et.srcId || vid == et.dstId)
val ind = vidToIndex(vid)
// Populate the aggregator map
if(msgBS(ind)) {
msgArray(ind) = reduceFunc(msgArray(ind), msg)
} else {
msgArray(ind) = msg
msgBS(ind) = true
}
}
// Return the aggregate map
vidToIndex.long2IntEntrySet().fastIterator()
// Remove the entries that did not receive a message
.filter{ entry => msgBS(entry.getValue()) }
// Construct the actual pairs
.map{ entry =>
val vid = entry.getLongKey()
val ind = entry.getValue()
val msg = msgArray(ind)
(vid, msg)
}
}
// Return the aggregate map
vidToIndex.long2IntEntrySet().fastIterator()
// Remove the entries that did not receive a message
.filter{ entry => msgBS(entry.getValue()) }
// Construct the actual pairs
.map{ entry =>
val vid = entry.getLongKey()
val ind = entry.getValue()
val msg = msgArray(ind)
(vid, msg)
}
}.partitionBy(vTable.index.rdd.partitioner.get)
// do the final reduction reusing the index map
IndexedRDD(preAgg, vTable.index, reduceFunc)
VertexSetRDD(preAgg, vTable.index, reduceFunc)
}
@ -402,7 +408,7 @@ object GraphImpl {
defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
val vtable = IndexedRDD(vertices, mergeFunc)
val vtable = VertexSetRDD(vertices, mergeFunc)
/**
* @todo Verify that there are no edges that contain vertices
* that are not in vTable. This should probably be resolved:
@ -432,54 +438,54 @@ object GraphImpl {
* containing all the edges in a partition.
*/
protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]])
: IndexedRDD[Pid, EdgePartition[ED]] = {
// Get the number of partitions
val numPartitions = edges.partitions.size
val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
IndexedRDD(edges.map { e =>
// Random partitioning based on the source vertex id.
// val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
// val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions)
//val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
: RDD[(Pid, EdgePartition[ED])] = {
// Get the number of partitions
val numPartitions = edges.partitions.size
val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
edges.map { e =>
// Random partitioning based on the source vertex id.
// val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
// val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions)
//val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
// Should we be using 3-tuple or an optimized class
MessageToPartition(part, (e.srcId, e.dstId, e.attr))
// Should we be using 3-tuple or an optimized class
MessageToPartition(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex( (pid, iter) => {
val builder = new EdgePartitionBuilder[ED]
iter.foreach { message =>
val data = message.data
builder.add(data._1, data._2, data._3)
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex({ (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]
iter.foreach { message =>
val data = message.data
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true))
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true).cache()
}
protected def createVid2Pid[ED: ClassManifest](
eTable: IndexedRDD[Pid, EdgePartition[ED]],
vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Array[Pid]] = {
eTable: RDD[(Pid, EdgePartition[ED])],
vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = {
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
vSet.iterator.map { vid => (vid.toLong, pid) }
}
IndexedRDD[Vid, Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray).asInstanceOf[IndexedRDD[Vid, Array[Pid]]]
.mapValues(a => a.toArray).cache()
}
protected def createLocalVidMap[ED: ClassManifest](
eTable: IndexedRDD[Pid, EdgePartition[ED]]): IndexedRDD[Pid, VertexIdToIndexMap] = {
eTable.mapValues{ epart =>
val vidToIndex = new VertexIdToIndexMap()
protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]):
RDD[(Pid, VertexIdToIndexMap)] = {
eTable.mapPartitions( _.map{ case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
var i = 0
epart.foreach{ e =>
if(!vidToIndex.contains(e.srcId)) {
@ -491,16 +497,16 @@ object GraphImpl {
i += 1
}
}
vidToIndex
}
(pid, vidToIndex)
}, preservesPartitioning = true).cache()
}
protected def createVTableReplicated[VD: ClassManifest](
vTable: IndexedRDD[Vid, VD],
vid2pid: IndexedRDD[Vid, Array[Pid]],
replicationMap: IndexedRDD[Pid, VertexIdToIndexMap]):
IndexedRDD[Pid, Array[VD]] = {
vTable: VertexSetRDD[VD],
vid2pid: VertexSetRDD[Array[Pid]],
replicationMap: RDD[(Pid, VertexIdToIndexMap)]):
RDD[(Pid, Array[VD])] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined
// result, and get the shuffle id so we can use it on the slave.
val msgsByPartition = vTable.zipJoin(vid2pid)
@ -509,9 +515,9 @@ object GraphImpl {
}
.partitionBy(replicationMap.partitioner.get).cache()
val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){
replicationMap.zipPartitions(msgsByPartition){
(mapIter, msgsIter) =>
val (IndexedSeq(vidToIndex), bs) = mapIter.next()
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.size)
@ -519,14 +525,12 @@ object GraphImpl {
val ind = vidToIndex(msg.data._1)
vertexArray(ind) = msg.data._2
}
Iterator((IndexedSeq(vertexArray), bs))
}
new IndexedRDD(replicationMap.index, newValuesRDD)
Iterator((pid, vertexArray))
}.cache()
// @todo assert edge table has partitioner
// val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap] =
// val localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap] =
// msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {
// val vidToIndex = new VertexIdToIndexMap
// var i = 0
@ -537,7 +541,7 @@ object GraphImpl {
// Array((pid, vidToIndex)).iterator
// }, preservesPartitioning = true).indexed(eTable.index)
// val vTableReplicatedValues: IndexedRDD[Pid, Array[VD]] =
// val vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]] =
// msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {
// val vertexArray = ArrayBuilder.make[VD]
// for (msg <- iter) {