diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index b3f1fa768c..cb75da6c21 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -41,28 +41,62 @@ import org.apache.spark.storage.StorageLevel /** - * The VertexSetIndex is an opaque type used to represent the organization - * of values in an RDD + * The `VertexSetIndex` maintains the per-partition mapping from vertex id + * to the corresponding location in the per-partition values array. + * This class is meant to be an opaque type. + * */ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { + /** + * The persist function behaves like the standard RDD persist + */ def persist(newLevel: StorageLevel): VertexSetIndex = { rdd.persist(newLevel) return this } + + /** + * Returns the partitioner object of the underlying RDD. This is used + * by the VertexSetRDD to partition the values RDD. + */ def partitioner: Partitioner = rdd.partitioner.get -} +} // end of VertexSetIndex /** - * An VertexSetRDD[V] extends the RDD[(Vid,V)] by pre-indexing the keys and - * organizing the values to enable faster join operations. + * An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there is only + * one entry for each vertex and by pre-indexing the entries for fast, efficient + * joins. * * In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD * exposes an index member which can be used to "key" other VertexSetRDDs * + * @tparam V the vertex attribute associated with each vertex in the set. + * + * @param index the index which contains the vertex id information and is used + * to organize the values in the RDD. + * @param valuesRDD the values RDD contains the actual vertex attributes organized + * as an array within each partition. + * + * To construct a `VertexSetRDD` use the singleton object: + * + * @example Construct a `VertexSetRDD` from a plain RDD + * {{{ + * // Construct an intial vertex set + * val someData: RDD[(Vid, SomeType)] = loadData(someFile) + * val vset = VertexSetRDD(someData) + * // If there were redundant values in someData we would use a reduceFunc + * val vset2 = VertexSetRDD(someData, reduceFunc) + * // Finally we can use the VertexSetRDD to index another dataset + * val otherData: RDD[(Vid, OtherType)] = loadData(otherFile) + * val vset3 = VertexSetRDD(otherData, vset.index) + * // Now we can construct very fast joins between the two sets + * val vset4: VertexSetRDD[(SomeType, OtherType)] = vset.leftJoin(vset3) + * }}} + * */ -class VertexSetRDD[V: ClassManifest]( +class VertexSetRDD[@specialized V: ClassManifest]( @transient val index: VertexSetIndex, @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ]) extends RDD[(Vid, V)](index.rdd.context, @@ -70,20 +104,23 @@ class VertexSetRDD[V: ClassManifest]( /** - * Construct a new VertexSetRDD that is indexed by only the keys in the RDD + * Construct a new VertexSetRDD that is indexed by only the keys in the RDD. + * The resulting VertexSet will be based on a different index and can + * no longer be quickly joined with this RDD. */ def reindex(): VertexSetRDD[V] = VertexSetRDD(this) /** * An internal representation which joins the block indices with the values + * This is used by the compute function to emulate RDD[(Vid, V)] */ protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) /** - * The partitioner is defined by the index + * The partitioner is defined by the index. */ override val partitioner = index.rdd.partitioner @@ -95,7 +132,8 @@ class VertexSetRDD[V: ClassManifest]( /** - * The preferred locations are computed based on the preferred locations of the tuples. + * The preferred locations are computed based on the preferred + * locations of the tuples. */ override def getPreferredLocations(s: Partition): Seq[String] = tuples.getPreferredLocations(s) @@ -110,75 +148,170 @@ class VertexSetRDD[V: ClassManifest]( return this } + + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY) + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def cache(): VertexSetRDD[V] = persist() - /** - * Pass each value in the key-value pair RDD through a map function without changing the keys; - * this also retains the original RDD's partitioning. + * Provide the RDD[(K,V)] equivalent output. */ - def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = { - val cleanF = index.rdd.context.clean(f) - val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = - valuesRDD.mapPartitions(iter => iter.map{ - case (values, bs) => - val newValues = new Array[U](values.size) - for ( ind <- bs ) { - newValues(ind) = f(values(ind)) - } - (newValues.toIndexedSeq, bs) - }, preservesPartitioning = true) - new VertexSetRDD[U](index, newValuesRDD) - } + override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = { + tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => + // Walk the index to construct the key, value pairs + indexMap.iterator + // Extract rows with key value pairs and indicators + .map{ case (k, ind) => (bs(ind), k, ind) } + // Remove tuples that aren't actually present in the array + .filter( _._1 ) + // Extract the pair (removing the indicator from the tuple) + .map( x => (x._2, values(x._3) ) ) + } + } // end of compute /** - * Pass each value in the key-value pair RDD through a map function without changing the keys; - * this also retains the original RDD's partitioning. + * @todo finish documenting */ - def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = { + override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = { val cleanF = index.rdd.context.clean(f) - val newValues: RDD[ (IndexedSeq[U], BitSet) ] = - index.rdd.zipPartitions(valuesRDD){ + val newValues = index.rdd.zipPartitions(valuesRDD){ (keysIter, valuesIter) => val index = keysIter.next() assert(keysIter.hasNext() == false) val (oldValues, bs) = valuesIter.next() assert(valuesIter.hasNext() == false) - // Allocate the array to store the results into - val newValues: Array[U] = new Array[U](oldValues.size) + // Allocate the array to store the results into + val newBS = new BitSet(oldValues.size) // Populate the new Values for( (k,i) <- index ) { - if (bs(i)) { newValues(i) = f(k, oldValues(i)) } + newBS(i) = bs(i) && cleanF( (k, oldValues(i)) ) } - Array((newValues.toIndexedSeq, bs)).iterator + Array((oldValues, newBS)).iterator } + new VertexSetRDD[V](index, newValues) + } // end of filter + + + /** + * Pass each vertex attribute through a map function and retain + * the original RDD's partitioning and index. + * + * @tparam U the type returned by the map function + * + * @param f the function applied to each value in the RDD + * @return a new VertexSet with values obtaind by applying `f` to each of the + * entries in the original VertexSet. The resulting VertexSetRDD retains the + * same index. + */ + def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = { + val cleanF = index.rdd.context.clean(f) + val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = + valuesRDD.mapPartitions(iter => iter.map{ + case (values, bs) => + /** + * @todo Consider using a view rather than creating a new array. + * This is already being done for join operations. It could reduce + * memory overhead but require additional recomputation. + */ + val newValues = new Array[U](values.size) + for ( ind <- bs ) { + newValues(ind) = f(values(ind)) + } + (newValues.toIndexedSeq, bs) + }, preservesPartitioning = true) + new VertexSetRDD[U](index, newValuesRDD) + } // end of mapValues + + + /** + * Pass each vertex attribute along with the vertex id through a + * map function and retain the original RDD's partitioning and index. + * + * @tparam U the type returned by the map function + * + * @param f the function applied to each vertex id and vertex + * attribute in the RDD + * @return a new VertexSet with values obtaind by applying `f` to each of the + * entries in the original VertexSet. The resulting VertexSetRDD retains the + * same index. + */ + def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = { + val cleanF = index.rdd.context.clean(f) + val newValues: RDD[ (IndexedSeq[U], BitSet) ] = + index.rdd.zipPartitions(valuesRDD){ + (keysIter, valuesIter) => + val index = keysIter.next() + assert(keysIter.hasNext() == false) + val (oldValues, bs) = valuesIter.next() + assert(valuesIter.hasNext() == false) + /** + * @todo Consider using a view rather than creating a new array. + * This is already being done for join operations. It could reduce + * memory overhead but require additional recomputation. + */ + // Allocate the array to store the results into + val newValues: Array[U] = new Array[U](oldValues.size) + // Populate the new Values + for( (k,i) <- index ) { + if (bs(i)) { newValues(i) = f(k, oldValues(i)) } + } + Array((newValues.toIndexedSeq, bs)).iterator + } new VertexSetRDD[U](index, newValues) - } + } // end of mapValuesWithKeys + /** + * Inner join this VertexSet with another VertexSet which has the same Index. + * This function will fail if both VertexSets do not share the same index. + * The resulting vertex set will only contain vertices that are in both this + * and the other vertex set. + * + * @tparam W the attribute type of the other VertexSet + * + * @param other the other VertexSet with which to join. + * @return a VertexSetRDD containing only the vertices in both this and the + * other VertexSet and with tuple attributes. + * + */ def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = { if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } - val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter, otherIter) => - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - val (otherValues, otherBS) = otherIter.next() - assert(!otherIter.hasNext) - val newBS = thisBS & otherBS - val newValues = thisValues.view.zip(otherValues) - Iterator((newValues.toIndexedSeq, newBS)) - } + val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = + valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val newBS = thisBS & otherBS + val newValues = thisValues.view.zip(otherValues) + Iterator((newValues.toIndexedSeq, newBS)) + } new VertexSetRDD(index, newValuesRDD) } + /** + * Left join this VertexSet with another VertexSet which has the same Index. + * This function will fail if both VertexSets do not share the same index. + * The resulting vertex set contains an entry for each vertex in this set. + * If the other VertexSet is missing any vertex in this VertexSet then a + * `None` attribute is generated + * + * @tparam W the attribute type of the other VertexSet + * + * @param other the other VertexSet with which to join. + * @return a VertexSetRDD containing all the vertices in this VertexSet + * with `None` attributes used for Vertices missing in the other VertexSet. + * + */ def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = { if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") @@ -195,41 +328,63 @@ class VertexSetRDD[V: ClassManifest]( Iterator((newValues.toIndexedSeq, thisBS)) } new VertexSetRDD(index, newValuesRDD) - } - + } // end of leftZipJoin + /** + * Left join this VertexSet with an RDD containing vertex attribute pairs. + * If the other RDD is backed by a VertexSet with the same index than the + * efficient leftZipJoin implementation is used. + * The resulting vertex set contains an entry for each vertex in this set. + * If the other VertexSet is missing any vertex in this VertexSet then a + * `None` attribute is generated + * + * @tparam W the attribute type of the other VertexSet + * + * @param other the other VertexSet with which to join. + * @param merge the function used combine duplicate vertex attributes + * @return a VertexSetRDD containing all the vertices in this VertexSet + * with `None` attributes used for Vertices missing in the other VertexSet. + * + */ def leftJoin[W: ClassManifest]( other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a): VertexSetRDD[(V, Option[W]) ] = { val cleanMerge = index.rdd.context.clean(merge) - + // Test if the other vertex is a VertexSetRDD to choose the optimal + // join strategy other match { + // If the other set is a VertexSetRDD and shares the same index as + // this vertex set then we use the much more efficient leftZipJoin case other: VertexSetRDD[_] if index == other.index => { leftZipJoin(other) } case _ => { - // Get the partitioner from the index - val partitioner = index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - // Shuffle the other RDD using the partitioner for this index + // Otherwise we treat the other RDD as a collectiong of + // vertex-attribute pairs. + // If necessary shuffle the other RDD using the partitioner + // for this VertexSet val otherShuffled = - if (other.partitioner == Some(partitioner)) other - else other.partitionBy(partitioner) + if (other.partitioner == partitioner) other + else other.partitionBy(partitioner.get) + // Compute the new values RDD val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = - index.rdd.zipPartitions(valuesRDD, other) { + index.rdd.zipPartitions(valuesRDD, otherShuffled) { (thisIndexIter, thisIter, tuplesIter) => + // Get the Index and values for this RDD val index = thisIndexIter.next() assert(!thisIndexIter.hasNext) val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) + // Create a new array to store the values in the resulting VertexSet val newW = new Array[W](thisValues.size) // track which values are matched with values in other val wBS = new BitSet(thisValues.size) + // Loop over all the tuples that have vertices in this VertexSet. for( (k, w) <- tuplesIter if index.contains(k) ) { val ind = index.get(k) + // Not all the vertex ids in the index are in this VertexSet. + // If there is a vertex in this set then record the other value if(thisBS(ind)) { if(wBS(ind)) { newW(ind) = cleanMerge(newW(ind), w) @@ -238,31 +393,33 @@ class VertexSetRDD[V: ClassManifest]( wBS(ind) = true } } - } - + } // end of for loop over tuples + // Some vertices in this vertex set may not have a corresponding + // tuple in the join and so a None value should be returned. val otherOption = newW.view.zipWithIndex .map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None } + // the final values is the zip of the values in this RDD along with + // the values in the other val newValues = thisValues.view.zip(otherOption) - Iterator((newValues.toIndexedSeq, thisBS)) } // end of newValues new VertexSetRDD(index, newValues) } } - } + } // end of leftJoin /** - * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the - * list of values for that key in `this` as well as `other`. + * For each key k in `this` or `other`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this` as well as `other`. */ def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): VertexSetRDD[(Seq[V], Seq[W])] = { //RDD[(K, (Seq[V], Seq[W]))] = { other match { case other: VertexSetRDD[_] if index == other.index => { - // if both RDDs share exactly the same index and therefore the same super set of keys - // then we simply merge the value RDDs. + // if both RDDs share exactly the same index and therefore the same + // super set of keys then we simply merge the value RDDs. // However it is possible that both RDDs are missing a value for a given key in // which case the returned RDD should have a null value val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = @@ -272,10 +429,12 @@ class VertexSetRDD[V: ClassManifest]( assert(!thisIter.hasNext) val (otherValues, otherBS) = otherIter.next() assert(!otherIter.hasNext) - + /** + * @todo consider implementing this with a view as in leftJoin to + * reduce array allocations + */ val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) val newBS = thisBS | otherBS - for( ind <- newBS ) { val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] @@ -400,12 +559,6 @@ class VertexSetRDD[V: ClassManifest]( newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) } } - // // Finalize the new values array - // val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = - // newValues.view.map{ - // case null => null - // case (s, ab) => Seq((s, ab.toSeq)) - // }.toSeq Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) }).cache() @@ -417,228 +570,37 @@ class VertexSetRDD[V: ClassManifest]( new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues) } } - } - - - // - // def zipJoinToRDD[W: ClassManifest](other: VertexSetRDD[K,W]): RDD[(K,(V,W))] = { - // if(index != other.index) { - // throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!") - // } - // index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ - // (thisIndexIter, thisIter, otherIter) => - // val index = thisIndexIter.next() - // assert(!thisIndexIter.hasNext) - // val (thisValues, thisBS) = thisIter.next() - // assert(!thisIter.hasNext) - // val (otherValues, otherBS) = otherIter.next() - // assert(!otherIter.hasNext) - // val newBS = thisBS & otherBS - // index.iterator.filter{ case (k,i) => newBS(i) }.map{ - // case (k,i) => (k, (thisValues(i), otherValues(i))) - // } - // } - // } - - -/* This is probably useful but we are not using it - def zipJoinWithKeys[W: ClassManifest, Z: ClassManifest]( - other: RDD[(K,W)])( - f: (K, V, W) => Z, - merge: (Z,Z) => Z = (a:Z, b:Z) => a): - VertexSetRDD[K,Z] = { - val cleanF = index.rdd.context.clean(f) - val cleanMerge = index.rdd.context.clean(merge) - other match { - case other: VertexSetRDD[_, _] if index == other.index => { - val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ - (thisIndexIter, thisIter, otherIter) => - val index = thisIndexIter.next() - assert(!thisIndexIter.hasNext) - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - val (otherValues, otherBS) = otherIter.next() - assert(!otherIter.hasNext) - val newValues = new Array[Z](thisValues.size) - val newBS = thisBS & otherBS - for( (k,i) <- index ) { - if (newBS(i)) { - newValues(i) = cleanF(k, thisValues(i), otherValues(i)) - } - } - List((newValues, newBS)).iterator - } - new VertexSetRDD(index, newValues) - } - - case _ => { - // Get the partitioner from the index - val partitioner = index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - // Shuffle the other RDD using the partitioner for this index - val otherShuffled = - if (other.partitioner == Some(partitioner)) other - else other.partitionBy(partitioner) - - val newValues = index.rdd.zipPartitions(valuesRDD, other) { - (thisIndexIter, thisIter, tuplesIter) => - val index = thisIndexIter.next() - assert(!thisIndexIter.hasNext) - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - - val newValues = new Array[Z](thisValues.size) - // track which values are matched with values in other - val tempBS = new BitSet(thisValues.size) - - for( (k, w) <- tuplesIter if index.contains(k) ) { - val ind = index.get(k) - if(thisBS(ind)) { - val result = cleanF(k, thisValues(ind), w) - if(tempBS(ind)) { - newValues(ind) = cleanMerge(newValues(ind), result) - } else { - newValues(ind) = result - tempBS(ind) = true - } - } - } - List((newValues, tempBS)).iterator - } // end of newValues - new VertexSetRDD(index, newValues) - } - } - } -*/ - -/* - def zipJoinLeftWithKeys[W: ClassManifest, Z: ClassManifest]( - other: RDD[(K,W)])( - f: (K, V, Option[W]) => Z, - merge: (Z,Z) => Z = (a:Z, b:Z) => a): - VertexSetRDD[K,Z] = { - val cleanF = index.rdd.context.clean(f) - val cleanMerge = index.rdd.context.clean(merge) - other match { - case other: VertexSetRDD[_, _] if index == other.index => { - val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ - (thisIndexIter, thisIter, otherIter) => - val index = thisIndexIter.next() - assert(!thisIndexIter.hasNext) - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - val (otherValues, otherBS) = otherIter.next() - assert(!otherIter.hasNext) - val newValues = new Array[Z](thisValues.size) - for( (k,i) <- index ) { - if (thisBS(i)) { - val otherVal = if(otherBS(i)) Some(otherValues(i)) else None - newValues(i) = cleanF(k, thisValues(i), otherVal) - } - } - List((newValues, thisBS)).iterator - } - new VertexSetRDD(index, newValues) - } - - case _ => { - // Get the partitioner from the index - val partitioner = index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - // Shuffle the other RDD using the partitioner for this index - val otherShuffled = - if (other.partitioner == Some(partitioner)) other - else other.partitionBy(partitioner) - val newValues = index.rdd.zipPartitions(valuesRDD, other) { - (thisIndexIter, thisIter, tuplesIter) => - val index = thisIndexIter.next() - assert(!thisIndexIter.hasNext) - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - - val newValues = new Array[Z](thisValues.size) - // track which values are matched with values in other - val tempBS = new BitSet(thisValues.size) - - for( (k, w) <- tuplesIter if index.contains(k) ) { - val ind = index.get(k) - if(thisBS(ind)) { - val result = cleanF(k, thisValues(ind), Option(w)) - if(tempBS(ind)) { - newValues(ind) = cleanMerge(newValues(ind), result) - } else { - newValues(ind) = result - tempBS(ind) = true - } - } - } - - // Process remaining keys in lef join - for( (k,ind) <- index if thisBS(ind) && !tempBS(ind)) { - newValues(ind) = cleanF(k, thisValues(ind), None) - } - List((newValues, thisBS)).iterator - } // end of newValues - new VertexSetRDD(index, newValues) - } - } - } - -*/ - - - - override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = { - val cleanF = index.rdd.context.clean(f) - val newValues = index.rdd.zipPartitions(valuesRDD){ - (keysIter, valuesIter) => - val index = keysIter.next() - assert(keysIter.hasNext() == false) - val (oldValues, bs) = valuesIter.next() - assert(valuesIter.hasNext() == false) - // Allocate the array to store the results into - val newBS = new BitSet(oldValues.size) - // Populate the new Values - for( (k,i) <- index ) { - newBS(i) = bs(i) && cleanF( (k, oldValues(i)) ) - } - Array((oldValues, newBS)).iterator - } - new VertexSetRDD[V](index, newValues) - } - - - /** - * Provide the RDD[(K,V)] equivalent output. - */ - override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = { - tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => - // Walk the index to construct the key, value pairs - indexMap.iterator - // Extract rows with key value pairs and indicators - .map{ case (k, ind) => (bs(ind), k, ind) } - // Remove tuples that aren't actually present in the array - .filter( _._1 ) - // Extract the pair (removing the indicator from the tuple) - .map( x => (x._2, values(x._3) ) ) - } - } - + } // end of cogroup } // End of VertexSetRDD - +/** + * The VertexSetRDD singleton is used to construct VertexSets + */ object VertexSetRDD { - + /** + * Construct a vertex set from an RDD of vertex-attribute pairs. + * Duplicate entries are removed arbitrarily. + * + * @tparam V the vertex attribute type + * + * @param rdd the collection of vertex-attribute pairs + */ def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] = apply(rdd, (a:V, b:V) => a ) + /** + * Construct a vertex set from an RDD of vertex-attribute pairs where + * duplicate entries are merged using the reduceFunc + * + * @tparam V the vertex attribute type + * + * @param rdd the collection of vertex-attribute pairs + * @param reduceFunc the function used to merge attributes of duplicate + * vertices. + */ def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = { // Preaggregate and shuffle if necessary @@ -669,61 +631,29 @@ object VertexSetRDD { val values: RDD[(IndexedSeq[V], BitSet)] = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) new VertexSetRDD[V](new VertexSetIndex(index), values) - } - + } // end of apply + /** + * @todo finish documenting + */ def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = apply(rdd, index, (a:V,b:V) => a) + /** + * @todo finish documenting + */ def apply[V: ClassManifest]( rdd: RDD[(Vid,V)], index: VertexSetIndex, reduceFunc: (V, V) => V): VertexSetRDD[V] = apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc) - // { - // // Get the index Partitioner - // val partitioner = index.rdd.partitioner match { - // case Some(p) => p - // case None => throw new SparkException("An index must have a partitioner.") - // } - // // Preaggregate and shuffle if necessary - // val partitioned = - // if (rdd.partitioner != Some(partitioner)) { - // // Preaggregation. - // val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc) - // rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) - // } else { - // rdd - // } - - // // Use the index to build the new values table - // val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { - // // There is only one map - // val index = indexIter.next() - // assert(!indexIter.hasNext()) - // val values = new Array[V](index.size) - // val bs = new BitSet(index.size) - // for ((k,v) <- tblIter) { - // if (!index.contains(k)) { - // throw new SparkException("Error: Trying to bind an external index " + - // "to an RDD which contains keys that are not in the index.") - // } - // val ind = index(k) - // if (bs(ind)) { - // values(ind) = reduceFunc(values(ind), v) - // } else { - // values(ind) = v - // bs(ind) = true - // } - // } - // List((values, bs)).iterator - // }) - // new VertexSetRDD[K,V](index, values) - // } // end of apply - + + /** + * @todo finish documenting + */ def apply[V: ClassManifest, C: ClassManifest]( rdd: RDD[(Vid,V)], index: VertexSetIndex, @@ -774,6 +704,8 @@ object VertexSetRDD { /** * Construct and index of the unique values in a given RDD. + * + * @todo finish documenting */ def makeIndex(keys: RDD[Vid], partitioner: Option[Partitioner] = None): VertexSetIndex = { diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 2d74ce92e2..f2b3d5bdfe 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -72,40 +72,22 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } -// test("graph partitioner") { -// sc = new SparkContext("local", "test") -// val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two"))) -// val edges = sc.parallelize(Seq(Edge(1, 2, "onlyedge"))) -// var g = Graph(vertices, edges) -// -// g = g.withPartitioner(4, 7) -// assert(g.numVertexPartitions === 4) -// assert(g.numEdgePartitions === 7) -// -// g = g.withVertexPartitioner(5) -// assert(g.numVertexPartitions === 5) -// -// g = g.withEdgePartitioner(8) -// assert(g.numEdgePartitions === 8) -// -// g = g.mapVertices(x => x) -// assert(g.numVertexPartitions === 5) -// assert(g.numEdgePartitions === 8) -// -// g = g.mapEdges(x => x) -// assert(g.numVertexPartitions === 5) -// assert(g.numEdgePartitions === 8) -// -// val updates = sc.parallelize(Seq((1, " more"))) -// g = g.updateVertices( -// updates, -// (v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data) -// assert(g.numVertexPartitions === 5) -// assert(g.numEdgePartitions === 8) -// -// g = g.reverse -// assert(g.numVertexPartitions === 5) -// assert(g.numEdgePartitions === 8) -// -// } + + test("VertexSetRDD") { + withSpark(new SparkContext("local", "test")) { sc => + val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5) + val b = VertexSetRDD(a).mapValues(x => -x) + assert(b.leftJoin(a) + .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0) + val c = VertexSetRDD(a, b.index) + assert(b.leftJoin(c) + .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0) + val d = c.filter(q => ((q._2 % 2) == 0)) + val e = a.filter(q => ((q._2 % 2) == 0)) + assert(d.count === e.count) + assert(b.zipJoin(c).mapValues(x => x._1 + x._2) + .map(x => x._2).reduce(_+_) === 0) + } + } + }