Documented the VertexSetRDD

This commit is contained in:
Joseph E. Gonzalez 2013-10-29 15:03:13 -07:00
parent ede329336d
commit 08c7b040d6

View file

@ -41,9 +41,9 @@ import org.apache.spark.storage.StorageLevel
/**
* The `VertexSetIndex` maintains the per-partition mapping from vertex id
* to the corresponding location in the per-partition values array.
* This class is meant to be an opaque type.
* The `VertexSetIndex` maintains the per-partition mapping from
* vertex id to the corresponding location in the per-partition values
* array. This class is meant to be an opaque type.
*
*/
class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
@ -56,8 +56,8 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
}
/**
* Returns the partitioner object of the underlying RDD. This is used
* by the VertexSetRDD to partition the values RDD.
* Returns the partitioner object of the underlying RDD. This is
* used by the VertexSetRDD to partition the values RDD.
*/
def partitioner: Partitioner = rdd.partitioner.get
} // end of VertexSetIndex
@ -65,19 +65,21 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
/**
* An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there is only
* one entry for each vertex and by pre-indexing the entries for fast, efficient
* joins.
* An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there
* is only one entry for each vertex and by pre-indexing the entries
* for fast, efficient joins.
*
* In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD
* exposes an index member which can be used to "key" other VertexSetRDDs
* In addition to providing the basic RDD[(Vid,V)] functionality the
* VertexSetRDD exposes an index member which can be used to "key"
* other VertexSetRDDs
*
* @tparam V the vertex attribute associated with each vertex in the set.
* @tparam V the vertex attribute associated with each vertex in the
* set.
*
* @param index the index which contains the vertex id information and is used
* to organize the values in the RDD.
* @param valuesRDD the values RDD contains the actual vertex attributes organized
* as an array within each partition.
* @param index the index which contains the vertex id information and
* is used to organize the values in the RDD.
* @param valuesRDD the values RDD contains the actual vertex
* attributes organized as an array within each partition.
*
* To construct a `VertexSetRDD` use the singleton object:
*
@ -175,10 +177,18 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* @todo finish documenting
* Restrict the vertex set to the set of vertices satisfying the
* given predicate.
*
* @param pred the user defined predicate
*
* @note The vertex set preserves the original index structure
* which means that the returned RDD can be easily joined with
* the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
val cleanF = index.rdd.context.clean(f)
override def filter(pred: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
val cleanF = index.rdd.context.clean(pred)
val newValues = index.rdd.zipPartitions(valuesRDD){
(keysIter, valuesIter) =>
val index = keysIter.next()
@ -198,15 +208,15 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* Pass each vertex attribute through a map function and retain
* the original RDD's partitioning and index.
* Pass each vertex attribute through a map function and retain the
* original RDD's partitioning and index.
*
* @tparam U the type returned by the map function
*
* @param f the function applied to each value in the RDD
* @return a new VertexSet with values obtaind by applying `f` to each of the
* entries in the original VertexSet. The resulting VertexSetRDD retains the
* same index.
* @return a new VertexSet with values obtaind by applying `f` to
* each of the entries in the original VertexSet. The resulting
* VertexSetRDD retains the same index.
*/
def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
val cleanF = index.rdd.context.clean(f)
@ -214,9 +224,10 @@ class VertexSetRDD[@specialized V: ClassManifest](
valuesRDD.mapPartitions(iter => iter.map{
case (values, bs) =>
/**
* @todo Consider using a view rather than creating a new array.
* This is already being done for join operations. It could reduce
* memory overhead but require additional recomputation.
* @todo Consider using a view rather than creating a new
* array. This is already being done for join operations.
* It could reduce memory overhead but require additional
* recomputation.
*/
val newValues = new Array[U](values.size)
for ( ind <- bs ) {
@ -229,16 +240,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* Pass each vertex attribute along with the vertex id through a
* map function and retain the original RDD's partitioning and index.
* Pass each vertex attribute along with the vertex id through a map
* function and retain the original RDD's partitioning and index.
*
* @tparam U the type returned by the map function
*
* @param f the function applied to each vertex id and vertex
* @param f the function applied to each vertex id and vertex
* attribute in the RDD
* @return a new VertexSet with values obtaind by applying `f` to each of the
* entries in the original VertexSet. The resulting VertexSetRDD retains the
* same index.
* @return a new VertexSet with values obtaind by applying `f` to
* each of the entries in the original VertexSet. The resulting
* VertexSetRDD retains the same index.
*/
def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
val cleanF = index.rdd.context.clean(f)
@ -267,16 +278,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* Inner join this VertexSet with another VertexSet which has the same Index.
* This function will fail if both VertexSets do not share the same index.
* The resulting vertex set will only contain vertices that are in both this
* and the other vertex set.
* Inner join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set will only contain
* vertices that are in both this and the other vertex set.
*
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @return a VertexSetRDD containing only the vertices in both this and the
* other VertexSet and with tuple attributes.
* @param other the other VertexSet with which to join.
* @return a VertexSetRDD containing only the vertices in both this
* and the other VertexSet and with tuple attributes.
*
*/
def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = {
@ -299,17 +310,18 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* Left join this VertexSet with another VertexSet which has the same Index.
* This function will fail if both VertexSets do not share the same index.
* The resulting vertex set contains an entry for each vertex in this set.
* If the other VertexSet is missing any vertex in this VertexSet then a
* `None` attribute is generated
* Left join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set contains an entry
* for each vertex in this set. If the other VertexSet is missing
* any vertex in this VertexSet then a `None` attribute is generated
*
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @return a VertexSetRDD containing all the vertices in this VertexSet
* with `None` attributes used for Vertices missing in the other VertexSet.
* @param other the other VertexSet with which to join.
* @return a VertexSetRDD containing all the vertices in this
* VertexSet with `None` attributes used for Vertices missing in the
* other VertexSet.
*
*/
def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = {
@ -332,19 +344,21 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* Left join this VertexSet with an RDD containing vertex attribute pairs.
* If the other RDD is backed by a VertexSet with the same index than the
* efficient leftZipJoin implementation is used.
* The resulting vertex set contains an entry for each vertex in this set.
* If the other VertexSet is missing any vertex in this VertexSet then a
* `None` attribute is generated
* Left join this VertexSet with an RDD containing vertex attribute
* pairs. If the other RDD is backed by a VertexSet with the same
* index than the efficient leftZipJoin implementation is used. The
* resulting vertex set contains an entry for each vertex in this
* set. If the other VertexSet is missing any vertex in this
* VertexSet then a `None` attribute is generated
*
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @param merge the function used combine duplicate vertex attributes
* @return a VertexSetRDD containing all the vertices in this VertexSet
* with `None` attributes used for Vertices missing in the other VertexSet.
* @param merge the function used combine duplicate vertex
* attributes
* @return a VertexSetRDD containing all the vertices in this
* VertexSet with `None` attributes used for Vertices missing in the
* other VertexSet.
*
*/
def leftJoin[W: ClassManifest](
@ -581,8 +595,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
object VertexSetRDD {
/**
* Construct a vertex set from an RDD of vertex-attribute pairs.
* Duplicate entries are removed arbitrarily.
* Construct a vertex set from an RDD of vertex-attribute pairs.
* Duplicate entries are removed arbitrarily.
*
* @tparam V the vertex attribute type
*
@ -592,14 +606,14 @@ object VertexSetRDD {
apply(rdd, (a:V, b:V) => a )
/**
* Construct a vertex set from an RDD of vertex-attribute pairs where
* duplicate entries are merged using the reduceFunc
* Construct a vertex set from an RDD of vertex-attribute pairs
* where duplicate entries are merged using the reduceFunc
*
* @tparam V the vertex attribute type
*
* @param rdd the collection of vertex-attribute pairs
* @param reduceFunc the function used to merge attributes of duplicate
* vertices.
* @param reduceFunc the function used to merge attributes of
* duplicate vertices.
*/
def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
@ -635,7 +649,14 @@ object VertexSetRDD {
/**
* @todo finish documenting
* Construct a vertex set from an RDD using an existing index.
*
* @note duplicate vertices are discarded arbitrarily
*
* @tparam the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
*/
def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] =
@ -643,7 +664,15 @@ object VertexSetRDD {
/**
* @todo finish documenting
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
*
* @tparam the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
* @param reduceFunc the user defined reduce function used to merge
* duplicate vertex attributes.
*/
def apply[V: ClassManifest](
rdd: RDD[(Vid,V)], index: VertexSetIndex,
@ -652,7 +681,19 @@ object VertexSetRDD {
/**
* @todo finish documenting
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
*
* @tparam the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
* @param createCombiner a user defined function to create a combiner
* from a vertex attribute
* @param mergeValue a user defined function to merge a vertex
* attribute into an existing combiner
* @param mergeCombiners a user defined function to merge combiners
*
*/
def apply[V: ClassManifest, C: ClassManifest](
rdd: RDD[(Vid,V)],
@ -703,9 +744,8 @@ object VertexSetRDD {
/**
* Construct and index of the unique values in a given RDD.
*
* @todo finish documenting
* Construct and index of the unique vertex ids. This can be used
* as an index when building a vertex set.
*/
def makeIndex(keys: RDD[Vid],
partitioner: Option[Partitioner] = None): VertexSetIndex = {