Merge branch 'VertexSetRDD_Tests' into AnalyticsCleanup

This commit is contained in:
Joseph E. Gonzalez 2013-10-22 15:03:49 -07:00
commit be8269af07
2 changed files with 279 additions and 365 deletions

View file

@ -41,28 +41,62 @@ import org.apache.spark.storage.StorageLevel
/** /**
* The VertexSetIndex is an opaque type used to represent the organization * The `VertexSetIndex` maintains the per-partition mapping from vertex id
* of values in an RDD * to the corresponding location in the per-partition values array.
* This class is meant to be an opaque type.
*
*/ */
class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
/**
* The persist function behaves like the standard RDD persist
*/
def persist(newLevel: StorageLevel): VertexSetIndex = { def persist(newLevel: StorageLevel): VertexSetIndex = {
rdd.persist(newLevel) rdd.persist(newLevel)
return this return this
} }
/**
* Returns the partitioner object of the underlying RDD. This is used
* by the VertexSetRDD to partition the values RDD.
*/
def partitioner: Partitioner = rdd.partitioner.get def partitioner: Partitioner = rdd.partitioner.get
} } // end of VertexSetIndex
/** /**
* An VertexSetRDD[V] extends the RDD[(Vid,V)] by pre-indexing the keys and * An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there is only
* organizing the values to enable faster join operations. * one entry for each vertex and by pre-indexing the entries for fast, efficient
* joins.
* *
* In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD * In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD
* exposes an index member which can be used to "key" other VertexSetRDDs * exposes an index member which can be used to "key" other VertexSetRDDs
* *
* @tparam V the vertex attribute associated with each vertex in the set.
*
* @param index the index which contains the vertex id information and is used
* to organize the values in the RDD.
* @param valuesRDD the values RDD contains the actual vertex attributes organized
* as an array within each partition.
*
* To construct a `VertexSetRDD` use the singleton object:
*
* @example Construct a `VertexSetRDD` from a plain RDD
* {{{
* // Construct an intial vertex set
* val someData: RDD[(Vid, SomeType)] = loadData(someFile)
* val vset = VertexSetRDD(someData)
* // If there were redundant values in someData we would use a reduceFunc
* val vset2 = VertexSetRDD(someData, reduceFunc)
* // Finally we can use the VertexSetRDD to index another dataset
* val otherData: RDD[(Vid, OtherType)] = loadData(otherFile)
* val vset3 = VertexSetRDD(otherData, vset.index)
* // Now we can construct very fast joins between the two sets
* val vset4: VertexSetRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
* }}}
*
*/ */
class VertexSetRDD[V: ClassManifest]( class VertexSetRDD[@specialized V: ClassManifest](
@transient val index: VertexSetIndex, @transient val index: VertexSetIndex,
@transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ]) @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
extends RDD[(Vid, V)](index.rdd.context, extends RDD[(Vid, V)](index.rdd.context,
@ -70,20 +104,23 @@ class VertexSetRDD[V: ClassManifest](
/** /**
* Construct a new VertexSetRDD that is indexed by only the keys in the RDD * Construct a new VertexSetRDD that is indexed by only the keys in the RDD.
* The resulting VertexSet will be based on a different index and can
* no longer be quickly joined with this RDD.
*/ */
def reindex(): VertexSetRDD[V] = VertexSetRDD(this) def reindex(): VertexSetRDD[V] = VertexSetRDD(this)
/** /**
* An internal representation which joins the block indices with the values * An internal representation which joins the block indices with the values
* This is used by the compute function to emulate RDD[(Vid, V)]
*/ */
protected[spark] val tuples = protected[spark] val tuples =
new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
/** /**
* The partitioner is defined by the index * The partitioner is defined by the index.
*/ */
override val partitioner = index.rdd.partitioner override val partitioner = index.rdd.partitioner
@ -95,7 +132,8 @@ class VertexSetRDD[V: ClassManifest](
/** /**
* The preferred locations are computed based on the preferred locations of the tuples. * The preferred locations are computed based on the preferred
* locations of the tuples.
*/ */
override def getPreferredLocations(s: Partition): Seq[String] = override def getPreferredLocations(s: Partition): Seq[String] =
tuples.getPreferredLocations(s) tuples.getPreferredLocations(s)
@ -110,75 +148,170 @@ class VertexSetRDD[V: ClassManifest](
return this return this
} }
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY) override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): VertexSetRDD[V] = persist() override def cache(): VertexSetRDD[V] = persist()
/** /**
* Pass each value in the key-value pair RDD through a map function without changing the keys; * Provide the RDD[(K,V)] equivalent output.
* this also retains the original RDD's partitioning.
*/ */
def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = { override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = {
val cleanF = index.rdd.context.clean(f) tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) =>
val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = // Walk the index to construct the key, value pairs
valuesRDD.mapPartitions(iter => iter.map{ indexMap.iterator
case (values, bs) => // Extract rows with key value pairs and indicators
val newValues = new Array[U](values.size) .map{ case (k, ind) => (bs(ind), k, ind) }
for ( ind <- bs ) { // Remove tuples that aren't actually present in the array
newValues(ind) = f(values(ind)) .filter( _._1 )
} // Extract the pair (removing the indicator from the tuple)
(newValues.toIndexedSeq, bs) .map( x => (x._2, values(x._3) ) )
}, preservesPartitioning = true) }
new VertexSetRDD[U](index, newValuesRDD) } // end of compute
}
/** /**
* Pass each value in the key-value pair RDD through a map function without changing the keys; * @todo finish documenting
* this also retains the original RDD's partitioning.
*/ */
def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = { override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
val cleanF = index.rdd.context.clean(f) val cleanF = index.rdd.context.clean(f)
val newValues: RDD[ (IndexedSeq[U], BitSet) ] = val newValues = index.rdd.zipPartitions(valuesRDD){
index.rdd.zipPartitions(valuesRDD){
(keysIter, valuesIter) => (keysIter, valuesIter) =>
val index = keysIter.next() val index = keysIter.next()
assert(keysIter.hasNext() == false) assert(keysIter.hasNext() == false)
val (oldValues, bs) = valuesIter.next() val (oldValues, bs) = valuesIter.next()
assert(valuesIter.hasNext() == false) assert(valuesIter.hasNext() == false)
// Allocate the array to store the results into // Allocate the array to store the results into
val newValues: Array[U] = new Array[U](oldValues.size) val newBS = new BitSet(oldValues.size)
// Populate the new Values // Populate the new Values
for( (k,i) <- index ) { for( (k,i) <- index ) {
if (bs(i)) { newValues(i) = f(k, oldValues(i)) } newBS(i) = bs(i) && cleanF( (k, oldValues(i)) )
} }
Array((newValues.toIndexedSeq, bs)).iterator Array((oldValues, newBS)).iterator
} }
new VertexSetRDD[V](index, newValues)
} // end of filter
/**
* Pass each vertex attribute through a map function and retain
* the original RDD's partitioning and index.
*
* @tparam U the type returned by the map function
*
* @param f the function applied to each value in the RDD
* @return a new VertexSet with values obtaind by applying `f` to each of the
* entries in the original VertexSet. The resulting VertexSetRDD retains the
* same index.
*/
def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
val cleanF = index.rdd.context.clean(f)
val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
valuesRDD.mapPartitions(iter => iter.map{
case (values, bs) =>
/**
* @todo Consider using a view rather than creating a new array.
* This is already being done for join operations. It could reduce
* memory overhead but require additional recomputation.
*/
val newValues = new Array[U](values.size)
for ( ind <- bs ) {
newValues(ind) = f(values(ind))
}
(newValues.toIndexedSeq, bs)
}, preservesPartitioning = true)
new VertexSetRDD[U](index, newValuesRDD)
} // end of mapValues
/**
* Pass each vertex attribute along with the vertex id through a
* map function and retain the original RDD's partitioning and index.
*
* @tparam U the type returned by the map function
*
* @param f the function applied to each vertex id and vertex
* attribute in the RDD
* @return a new VertexSet with values obtaind by applying `f` to each of the
* entries in the original VertexSet. The resulting VertexSetRDD retains the
* same index.
*/
def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
val cleanF = index.rdd.context.clean(f)
val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
index.rdd.zipPartitions(valuesRDD){
(keysIter, valuesIter) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
val (oldValues, bs) = valuesIter.next()
assert(valuesIter.hasNext() == false)
/**
* @todo Consider using a view rather than creating a new array.
* This is already being done for join operations. It could reduce
* memory overhead but require additional recomputation.
*/
// Allocate the array to store the results into
val newValues: Array[U] = new Array[U](oldValues.size)
// Populate the new Values
for( (k,i) <- index ) {
if (bs(i)) { newValues(i) = f(k, oldValues(i)) }
}
Array((newValues.toIndexedSeq, bs)).iterator
}
new VertexSetRDD[U](index, newValues) new VertexSetRDD[U](index, newValues)
} } // end of mapValuesWithKeys
/**
* Inner join this VertexSet with another VertexSet which has the same Index.
* This function will fail if both VertexSets do not share the same index.
* The resulting vertex set will only contain vertices that are in both this
* and the other vertex set.
*
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @return a VertexSetRDD containing only the vertices in both this and the
* other VertexSet and with tuple attributes.
*
*/
def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = { def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = {
if(index != other.index) { if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
} }
val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] =
(thisIter, otherIter) => valuesRDD.zipPartitions(other.valuesRDD){
val (thisValues, thisBS) = thisIter.next() (thisIter, otherIter) =>
assert(!thisIter.hasNext) val (thisValues, thisBS) = thisIter.next()
val (otherValues, otherBS) = otherIter.next() assert(!thisIter.hasNext)
assert(!otherIter.hasNext) val (otherValues, otherBS) = otherIter.next()
val newBS = thisBS & otherBS assert(!otherIter.hasNext)
val newValues = thisValues.view.zip(otherValues) val newBS = thisBS & otherBS
Iterator((newValues.toIndexedSeq, newBS)) val newValues = thisValues.view.zip(otherValues)
} Iterator((newValues.toIndexedSeq, newBS))
}
new VertexSetRDD(index, newValuesRDD) new VertexSetRDD(index, newValuesRDD)
} }
/**
* Left join this VertexSet with another VertexSet which has the same Index.
* This function will fail if both VertexSets do not share the same index.
* The resulting vertex set contains an entry for each vertex in this set.
* If the other VertexSet is missing any vertex in this VertexSet then a
* `None` attribute is generated
*
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @return a VertexSetRDD containing all the vertices in this VertexSet
* with `None` attributes used for Vertices missing in the other VertexSet.
*
*/
def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = { def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = {
if(index != other.index) { if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
@ -195,41 +328,63 @@ class VertexSetRDD[V: ClassManifest](
Iterator((newValues.toIndexedSeq, thisBS)) Iterator((newValues.toIndexedSeq, thisBS))
} }
new VertexSetRDD(index, newValuesRDD) new VertexSetRDD(index, newValuesRDD)
} } // end of leftZipJoin
/**
* Left join this VertexSet with an RDD containing vertex attribute pairs.
* If the other RDD is backed by a VertexSet with the same index than the
* efficient leftZipJoin implementation is used.
* The resulting vertex set contains an entry for each vertex in this set.
* If the other VertexSet is missing any vertex in this VertexSet then a
* `None` attribute is generated
*
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @param merge the function used combine duplicate vertex attributes
* @return a VertexSetRDD containing all the vertices in this VertexSet
* with `None` attributes used for Vertices missing in the other VertexSet.
*
*/
def leftJoin[W: ClassManifest]( def leftJoin[W: ClassManifest](
other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a): other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a):
VertexSetRDD[(V, Option[W]) ] = { VertexSetRDD[(V, Option[W]) ] = {
val cleanMerge = index.rdd.context.clean(merge) val cleanMerge = index.rdd.context.clean(merge)
// Test if the other vertex is a VertexSetRDD to choose the optimal
// join strategy
other match { other match {
// If the other set is a VertexSetRDD and shares the same index as
// this vertex set then we use the much more efficient leftZipJoin
case other: VertexSetRDD[_] if index == other.index => { case other: VertexSetRDD[_] if index == other.index => {
leftZipJoin(other) leftZipJoin(other)
} }
case _ => { case _ => {
// Get the partitioner from the index // Otherwise we treat the other RDD as a collectiong of
val partitioner = index.rdd.partitioner match { // vertex-attribute pairs.
case Some(p) => p // If necessary shuffle the other RDD using the partitioner
case None => throw new SparkException("An index must have a partitioner.") // for this VertexSet
}
// Shuffle the other RDD using the partitioner for this index
val otherShuffled = val otherShuffled =
if (other.partitioner == Some(partitioner)) other if (other.partitioner == partitioner) other
else other.partitionBy(partitioner) else other.partitionBy(partitioner.get)
// Compute the new values RDD
val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
index.rdd.zipPartitions(valuesRDD, other) { index.rdd.zipPartitions(valuesRDD, otherShuffled) {
(thisIndexIter, thisIter, tuplesIter) => (thisIndexIter, thisIter, tuplesIter) =>
// Get the Index and values for this RDD
val index = thisIndexIter.next() val index = thisIndexIter.next()
assert(!thisIndexIter.hasNext) assert(!thisIndexIter.hasNext)
val (thisValues, thisBS) = thisIter.next() val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext) assert(!thisIter.hasNext)
// Create a new array to store the values in the resulting VertexSet
val newW = new Array[W](thisValues.size) val newW = new Array[W](thisValues.size)
// track which values are matched with values in other // track which values are matched with values in other
val wBS = new BitSet(thisValues.size) val wBS = new BitSet(thisValues.size)
// Loop over all the tuples that have vertices in this VertexSet.
for( (k, w) <- tuplesIter if index.contains(k) ) { for( (k, w) <- tuplesIter if index.contains(k) ) {
val ind = index.get(k) val ind = index.get(k)
// Not all the vertex ids in the index are in this VertexSet.
// If there is a vertex in this set then record the other value
if(thisBS(ind)) { if(thisBS(ind)) {
if(wBS(ind)) { if(wBS(ind)) {
newW(ind) = cleanMerge(newW(ind), w) newW(ind) = cleanMerge(newW(ind), w)
@ -238,31 +393,33 @@ class VertexSetRDD[V: ClassManifest](
wBS(ind) = true wBS(ind) = true
} }
} }
} } // end of for loop over tuples
// Some vertices in this vertex set may not have a corresponding
// tuple in the join and so a None value should be returned.
val otherOption = newW.view.zipWithIndex val otherOption = newW.view.zipWithIndex
.map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None } .map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None }
// the final values is the zip of the values in this RDD along with
// the values in the other
val newValues = thisValues.view.zip(otherOption) val newValues = thisValues.view.zip(otherOption)
Iterator((newValues.toIndexedSeq, thisBS)) Iterator((newValues.toIndexedSeq, thisBS))
} // end of newValues } // end of newValues
new VertexSetRDD(index, newValues) new VertexSetRDD(index, newValues)
} }
} }
} } // end of leftJoin
/** /**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * For each key k in `this` or `other`, return a resulting RDD that contains a
* list of values for that key in `this` as well as `other`. * tuple with the list of values for that key in `this` as well as `other`.
*/ */
def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner):
VertexSetRDD[(Seq[V], Seq[W])] = { VertexSetRDD[(Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = { //RDD[(K, (Seq[V], Seq[W]))] = {
other match { other match {
case other: VertexSetRDD[_] if index == other.index => { case other: VertexSetRDD[_] if index == other.index => {
// if both RDDs share exactly the same index and therefore the same super set of keys // if both RDDs share exactly the same index and therefore the same
// then we simply merge the value RDDs. // super set of keys then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in // However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value // which case the returned RDD should have a null value
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
@ -272,10 +429,12 @@ class VertexSetRDD[V: ClassManifest](
assert(!thisIter.hasNext) assert(!thisIter.hasNext)
val (otherValues, otherBS) = otherIter.next() val (otherValues, otherBS) = otherIter.next()
assert(!otherIter.hasNext) assert(!otherIter.hasNext)
/**
* @todo consider implementing this with a view as in leftJoin to
* reduce array allocations
*/
val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
val newBS = thisBS | otherBS val newBS = thisBS | otherBS
for( ind <- newBS ) { for( ind <- newBS ) {
val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V]
val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
@ -400,12 +559,6 @@ class VertexSetRDD[V: ClassManifest](
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
} }
} }
// // Finalize the new values array
// val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] =
// newValues.view.map{
// case null => null
// case (s, ab) => Seq((s, ab.toSeq))
// }.toSeq
Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
}).cache() }).cache()
@ -417,228 +570,37 @@ class VertexSetRDD[V: ClassManifest](
new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues) new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues)
} }
} }
} } // end of cogroup
//
// def zipJoinToRDD[W: ClassManifest](other: VertexSetRDD[K,W]): RDD[(K,(V,W))] = {
// if(index != other.index) {
// throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!")
// }
// index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
// (thisIndexIter, thisIter, otherIter) =>
// val index = thisIndexIter.next()
// assert(!thisIndexIter.hasNext)
// val (thisValues, thisBS) = thisIter.next()
// assert(!thisIter.hasNext)
// val (otherValues, otherBS) = otherIter.next()
// assert(!otherIter.hasNext)
// val newBS = thisBS & otherBS
// index.iterator.filter{ case (k,i) => newBS(i) }.map{
// case (k,i) => (k, (thisValues(i), otherValues(i)))
// }
// }
// }
/* This is probably useful but we are not using it
def zipJoinWithKeys[W: ClassManifest, Z: ClassManifest](
other: RDD[(K,W)])(
f: (K, V, W) => Z,
merge: (Z,Z) => Z = (a:Z, b:Z) => a):
VertexSetRDD[K,Z] = {
val cleanF = index.rdd.context.clean(f)
val cleanMerge = index.rdd.context.clean(merge)
other match {
case other: VertexSetRDD[_, _] if index == other.index => {
val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
(thisIndexIter, thisIter, otherIter) =>
val index = thisIndexIter.next()
assert(!thisIndexIter.hasNext)
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
val (otherValues, otherBS) = otherIter.next()
assert(!otherIter.hasNext)
val newValues = new Array[Z](thisValues.size)
val newBS = thisBS & otherBS
for( (k,i) <- index ) {
if (newBS(i)) {
newValues(i) = cleanF(k, thisValues(i), otherValues(i))
}
}
List((newValues, newBS)).iterator
}
new VertexSetRDD(index, newValues)
}
case _ => {
// Get the partitioner from the index
val partitioner = index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Shuffle the other RDD using the partitioner for this index
val otherShuffled =
if (other.partitioner == Some(partitioner)) other
else other.partitionBy(partitioner)
val newValues = index.rdd.zipPartitions(valuesRDD, other) {
(thisIndexIter, thisIter, tuplesIter) =>
val index = thisIndexIter.next()
assert(!thisIndexIter.hasNext)
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
val newValues = new Array[Z](thisValues.size)
// track which values are matched with values in other
val tempBS = new BitSet(thisValues.size)
for( (k, w) <- tuplesIter if index.contains(k) ) {
val ind = index.get(k)
if(thisBS(ind)) {
val result = cleanF(k, thisValues(ind), w)
if(tempBS(ind)) {
newValues(ind) = cleanMerge(newValues(ind), result)
} else {
newValues(ind) = result
tempBS(ind) = true
}
}
}
List((newValues, tempBS)).iterator
} // end of newValues
new VertexSetRDD(index, newValues)
}
}
}
*/
/*
def zipJoinLeftWithKeys[W: ClassManifest, Z: ClassManifest](
other: RDD[(K,W)])(
f: (K, V, Option[W]) => Z,
merge: (Z,Z) => Z = (a:Z, b:Z) => a):
VertexSetRDD[K,Z] = {
val cleanF = index.rdd.context.clean(f)
val cleanMerge = index.rdd.context.clean(merge)
other match {
case other: VertexSetRDD[_, _] if index == other.index => {
val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
(thisIndexIter, thisIter, otherIter) =>
val index = thisIndexIter.next()
assert(!thisIndexIter.hasNext)
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
val (otherValues, otherBS) = otherIter.next()
assert(!otherIter.hasNext)
val newValues = new Array[Z](thisValues.size)
for( (k,i) <- index ) {
if (thisBS(i)) {
val otherVal = if(otherBS(i)) Some(otherValues(i)) else None
newValues(i) = cleanF(k, thisValues(i), otherVal)
}
}
List((newValues, thisBS)).iterator
}
new VertexSetRDD(index, newValues)
}
case _ => {
// Get the partitioner from the index
val partitioner = index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Shuffle the other RDD using the partitioner for this index
val otherShuffled =
if (other.partitioner == Some(partitioner)) other
else other.partitionBy(partitioner)
val newValues = index.rdd.zipPartitions(valuesRDD, other) {
(thisIndexIter, thisIter, tuplesIter) =>
val index = thisIndexIter.next()
assert(!thisIndexIter.hasNext)
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
val newValues = new Array[Z](thisValues.size)
// track which values are matched with values in other
val tempBS = new BitSet(thisValues.size)
for( (k, w) <- tuplesIter if index.contains(k) ) {
val ind = index.get(k)
if(thisBS(ind)) {
val result = cleanF(k, thisValues(ind), Option(w))
if(tempBS(ind)) {
newValues(ind) = cleanMerge(newValues(ind), result)
} else {
newValues(ind) = result
tempBS(ind) = true
}
}
}
// Process remaining keys in lef join
for( (k,ind) <- index if thisBS(ind) && !tempBS(ind)) {
newValues(ind) = cleanF(k, thisValues(ind), None)
}
List((newValues, thisBS)).iterator
} // end of newValues
new VertexSetRDD(index, newValues)
}
}
}
*/
override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
val cleanF = index.rdd.context.clean(f)
val newValues = index.rdd.zipPartitions(valuesRDD){
(keysIter, valuesIter) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
val (oldValues, bs) = valuesIter.next()
assert(valuesIter.hasNext() == false)
// Allocate the array to store the results into
val newBS = new BitSet(oldValues.size)
// Populate the new Values
for( (k,i) <- index ) {
newBS(i) = bs(i) && cleanF( (k, oldValues(i)) )
}
Array((oldValues, newBS)).iterator
}
new VertexSetRDD[V](index, newValues)
}
/**
* Provide the RDD[(K,V)] equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = {
tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) =>
// Walk the index to construct the key, value pairs
indexMap.iterator
// Extract rows with key value pairs and indicators
.map{ case (k, ind) => (bs(ind), k, ind) }
// Remove tuples that aren't actually present in the array
.filter( _._1 )
// Extract the pair (removing the indicator from the tuple)
.map( x => (x._2, values(x._3) ) )
}
}
} // End of VertexSetRDD } // End of VertexSetRDD
/**
* The VertexSetRDD singleton is used to construct VertexSets
*/
object VertexSetRDD { object VertexSetRDD {
/**
* Construct a vertex set from an RDD of vertex-attribute pairs.
* Duplicate entries are removed arbitrarily.
*
* @tparam V the vertex attribute type
*
* @param rdd the collection of vertex-attribute pairs
*/
def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] = def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] =
apply(rdd, (a:V, b:V) => a ) apply(rdd, (a:V, b:V) => a )
/**
* Construct a vertex set from an RDD of vertex-attribute pairs where
* duplicate entries are merged using the reduceFunc
*
* @tparam V the vertex attribute type
*
* @param rdd the collection of vertex-attribute pairs
* @param reduceFunc the function used to merge attributes of duplicate
* vertices.
*/
def apply[V: ClassManifest]( def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = { rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
// Preaggregate and shuffle if necessary // Preaggregate and shuffle if necessary
@ -669,61 +631,29 @@ object VertexSetRDD {
val values: RDD[(IndexedSeq[V], BitSet)] = val values: RDD[(IndexedSeq[V], BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new VertexSetRDD[V](new VertexSetIndex(index), values) new VertexSetRDD[V](new VertexSetIndex(index), values)
} } // end of apply
/**
* @todo finish documenting
*/
def apply[V: ClassManifest]( def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] =
apply(rdd, index, (a:V,b:V) => a) apply(rdd, index, (a:V,b:V) => a)
/**
* @todo finish documenting
*/
def apply[V: ClassManifest]( def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], index: VertexSetIndex, rdd: RDD[(Vid,V)], index: VertexSetIndex,
reduceFunc: (V, V) => V): VertexSetRDD[V] = reduceFunc: (V, V) => V): VertexSetRDD[V] =
apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc) apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
// {
// // Get the index Partitioner
// val partitioner = index.rdd.partitioner match {
// case Some(p) => p
// case None => throw new SparkException("An index must have a partitioner.")
// }
// // Preaggregate and shuffle if necessary
// val partitioned =
// if (rdd.partitioner != Some(partitioner)) {
// // Preaggregation.
// val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc)
// rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
// } else {
// rdd
// }
// // Use the index to build the new values table
// val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// // There is only one map
// val index = indexIter.next()
// assert(!indexIter.hasNext())
// val values = new Array[V](index.size)
// val bs = new BitSet(index.size)
// for ((k,v) <- tblIter) {
// if (!index.contains(k)) {
// throw new SparkException("Error: Trying to bind an external index " +
// "to an RDD which contains keys that are not in the index.")
// }
// val ind = index(k)
// if (bs(ind)) {
// values(ind) = reduceFunc(values(ind), v)
// } else {
// values(ind) = v
// bs(ind) = true
// }
// }
// List((values, bs)).iterator
// })
// new VertexSetRDD[K,V](index, values)
// } // end of apply
/**
* @todo finish documenting
*/
def apply[V: ClassManifest, C: ClassManifest]( def apply[V: ClassManifest, C: ClassManifest](
rdd: RDD[(Vid,V)], rdd: RDD[(Vid,V)],
index: VertexSetIndex, index: VertexSetIndex,
@ -774,6 +704,8 @@ object VertexSetRDD {
/** /**
* Construct and index of the unique values in a given RDD. * Construct and index of the unique values in a given RDD.
*
* @todo finish documenting
*/ */
def makeIndex(keys: RDD[Vid], def makeIndex(keys: RDD[Vid],
partitioner: Option[Partitioner] = None): VertexSetIndex = { partitioner: Option[Partitioner] = None): VertexSetIndex = {

View file

@ -72,40 +72,22 @@ class GraphSuite extends FunSuite with LocalSparkContext {
} }
} }
// test("graph partitioner") {
// sc = new SparkContext("local", "test") test("VertexSetRDD") {
// val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two"))) withSpark(new SparkContext("local", "test")) { sc =>
// val edges = sc.parallelize(Seq(Edge(1, 2, "onlyedge"))) val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
// var g = Graph(vertices, edges) val b = VertexSetRDD(a).mapValues(x => -x)
// assert(b.leftJoin(a)
// g = g.withPartitioner(4, 7) .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
// assert(g.numVertexPartitions === 4) val c = VertexSetRDD(a, b.index)
// assert(g.numEdgePartitions === 7) assert(b.leftJoin(c)
// .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
// g = g.withVertexPartitioner(5) val d = c.filter(q => ((q._2 % 2) == 0))
// assert(g.numVertexPartitions === 5) val e = a.filter(q => ((q._2 % 2) == 0))
// assert(d.count === e.count)
// g = g.withEdgePartitioner(8) assert(b.zipJoin(c).mapValues(x => x._1 + x._2)
// assert(g.numEdgePartitions === 8) .map(x => x._2).reduce(_+_) === 0)
// }
// g = g.mapVertices(x => x) }
// assert(g.numVertexPartitions === 5)
// assert(g.numEdgePartitions === 8)
//
// g = g.mapEdges(x => x)
// assert(g.numVertexPartitions === 5)
// assert(g.numEdgePartitions === 8)
//
// val updates = sc.parallelize(Seq((1, " more")))
// g = g.updateVertices(
// updates,
// (v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data)
// assert(g.numVertexPartitions === 5)
// assert(g.numEdgePartitions === 8)
//
// g = g.reverse
// assert(g.numVertexPartitions === 5)
// assert(g.numEdgePartitions === 8)
//
// }
} }