Merge branch 'master' of github.com:amplab/graphx
This commit is contained in:
commit
4a670ef0ba
|
@ -27,7 +27,7 @@ the challenges of graph construction and transformation and provide
|
|||
limited fault-tolerance and support for interactive analysis.
|
||||
|
||||
<p align="center">
|
||||
<img src="https://raw.github.com/jegonzal/graphx/Documentation/docs/img/data_parallel_vs_graph_parallel.png" />
|
||||
<img src="https://raw.github.com/amplab/graphx/master/docs/img/data_parallel_vs_graph_parallel.png" />
|
||||
</p>
|
||||
|
||||
|
||||
|
@ -47,7 +47,7 @@ Finally, by exploiting the Scala foundation of Spark, we enable users
|
|||
to interactively load, transform, and compute on massive graphs.
|
||||
|
||||
<p align="center">
|
||||
<img src="https://raw.github.com/jegonzal/graphx/Documentation/docs/img/tables_and_graphs.png" />
|
||||
<img src="https://raw.github.com/amplab/graphx/master/docs/img/tables_and_graphs.png" />
|
||||
</p>
|
||||
|
||||
## Examples
|
||||
|
|
|
@ -18,6 +18,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
|
|||
kryo.register(classOf[EdgePartition[Object]])
|
||||
kryo.register(classOf[BitSet])
|
||||
kryo.register(classOf[VertexIdToIndexMap])
|
||||
kryo.register(classOf[VertexAttributeBlock[Object]])
|
||||
// This avoids a large number of hash table lookups.
|
||||
kryo.setReferences(false)
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
|
|||
import org.apache.spark.graph.impl.AggregationMsg
|
||||
import org.apache.spark.graph.impl.MsgRDDFunctions._
|
||||
|
||||
|
||||
/**
|
||||
* The `VertexSetIndex` maintains the per-partition mapping from
|
||||
* vertex id to the corresponding location in the per-partition values
|
||||
|
@ -88,7 +89,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
extends RDD[(Vid, V)](index.rdd.context,
|
||||
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
|
||||
|
||||
|
||||
/**
|
||||
* Construct a new VertexSetRDD that is indexed by only the keys in the RDD.
|
||||
* The resulting VertexSet will be based on a different index and can
|
||||
|
@ -96,7 +96,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
*/
|
||||
def reindex(): VertexSetRDD[V] = VertexSetRDD(this)
|
||||
|
||||
|
||||
/**
|
||||
* An internal representation which joins the block indices with the values
|
||||
* This is used by the compute function to emulate RDD[(Vid, V)]
|
||||
|
@ -104,19 +103,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
protected[spark] val tuples =
|
||||
new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
|
||||
|
||||
|
||||
/**
|
||||
* The partitioner is defined by the index.
|
||||
*/
|
||||
override val partitioner = index.rdd.partitioner
|
||||
|
||||
|
||||
/**
|
||||
* The actual partitions are defined by the tuples.
|
||||
*/
|
||||
override def getPartitions: Array[Partition] = tuples.getPartitions
|
||||
|
||||
|
||||
/**
|
||||
* The preferred locations are computed based on the preferred
|
||||
* locations of the tuples.
|
||||
|
@ -124,7 +120,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
override def getPreferredLocations(s: Partition): Seq[String] =
|
||||
tuples.getPreferredLocations(s)
|
||||
|
||||
|
||||
/**
|
||||
* Caching an VertexSetRDD causes the index and values to be cached separately.
|
||||
*/
|
||||
|
@ -134,15 +129,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
return this
|
||||
}
|
||||
|
||||
|
||||
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
|
||||
override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY)
|
||||
|
||||
|
||||
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
|
||||
override def cache(): VertexSetRDD[V] = persist()
|
||||
|
||||
|
||||
/**
|
||||
* Provide the RDD[(K,V)] equivalent output.
|
||||
*/
|
||||
|
@ -152,7 +144,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
}
|
||||
} // end of compute
|
||||
|
||||
|
||||
/**
|
||||
* Restrict the vertex set to the set of vertices satisfying the
|
||||
* given predicate.
|
||||
|
@ -190,7 +181,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
new VertexSetRDD[V](index, newValues)
|
||||
} // end of filter
|
||||
|
||||
|
||||
/**
|
||||
* Pass each vertex attribute through a map function and retain the
|
||||
* original RDD's partitioning and index.
|
||||
|
@ -214,7 +204,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
new VertexSetRDD[U](index, newValuesRDD)
|
||||
} // end of mapValues
|
||||
|
||||
|
||||
/**
|
||||
* Pass each vertex attribute along with the vertex id through a map
|
||||
* function and retain the original RDD's partitioning and index.
|
||||
|
@ -247,8 +236,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
|
||||
|
||||
/**
|
||||
* @todo update docs to reflect function argument
|
||||
*
|
||||
* Inner join this VertexSet with another VertexSet which has the
|
||||
* same Index. This function will fail if both VertexSets do not
|
||||
* share the same index. The resulting vertex set will only contain
|
||||
|
@ -257,6 +244,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
* @tparam W the attribute type of the other VertexSet
|
||||
*
|
||||
* @param other the other VertexSet with which to join.
|
||||
* @param f the function mapping a vertex id and its attributes in
|
||||
* this and the other vertex set to a new vertex attribute.
|
||||
* @return a VertexSetRDD containing only the vertices in both this
|
||||
* and the other VertexSet and with tuple attributes.
|
||||
*
|
||||
|
@ -287,13 +276,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
|
||||
|
||||
/**
|
||||
* @todo document
|
||||
* Inner join this VertexSet with another VertexSet which has the
|
||||
* same Index. This function will fail if both VertexSets do not
|
||||
* share the same index.
|
||||
*
|
||||
* @param other
|
||||
* @param f
|
||||
* @tparam W
|
||||
* @tparam Z
|
||||
* @return
|
||||
* @param other the vertex set to join with this vertex set
|
||||
* @param f the function mapping a vertex id and its attributes in
|
||||
* this and the other vertex set to a collection of tuples.
|
||||
* @tparam W the type of the other vertex set attributes
|
||||
* @tparam Z the type of the tuples emitted by `f`
|
||||
* @return an RDD containing the tuples emitted by `f`
|
||||
*/
|
||||
def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]):
|
||||
RDD[Z] = {
|
||||
|
@ -316,8 +308,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
|
||||
|
||||
/**
|
||||
* @todo update docs to reflect function argument
|
||||
|
||||
* Left join this VertexSet with another VertexSet which has the
|
||||
* same Index. This function will fail if both VertexSets do not
|
||||
* share the same index. The resulting vertex set contains an entry
|
||||
|
@ -327,6 +317,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
* @tparam W the attribute type of the other VertexSet
|
||||
*
|
||||
* @param other the other VertexSet with which to join.
|
||||
* @param f the function mapping a vertex id and its attributes in
|
||||
* this and the other vertex set to a new vertex attribute.
|
||||
* @return a VertexSetRDD containing all the vertices in this
|
||||
* VertexSet with `None` attributes used for Vertices missing in the
|
||||
* other VertexSet.
|
||||
|
@ -368,11 +360,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
* @tparam W the attribute type of the other VertexSet
|
||||
*
|
||||
* @param other the other VertexSet with which to join.
|
||||
* @param f the function mapping a vertex id and its attributes in
|
||||
* this and the other vertex set to a new vertex attribute.
|
||||
* @param merge the function used combine duplicate vertex
|
||||
* attributes
|
||||
* @return a VertexSetRDD containing all the vertices in this
|
||||
* VertexSet with `None` attributes used for Vertices missing in the
|
||||
* other VertexSet.
|
||||
* VertexSet with the attribute emitted by f.
|
||||
*
|
||||
*/
|
||||
def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)])
|
||||
|
@ -396,181 +389,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
|
|||
}
|
||||
} // end of leftJoin
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* For each key k in `this` or `other`, return a resulting RDD that contains a
|
||||
* tuple with the list of values for that key in `this` as well as `other`.
|
||||
*/
|
||||
/*
|
||||
def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner):
|
||||
VertexSetRDD[(Seq[V], Seq[W])] = {
|
||||
//RDD[(K, (Seq[V], Seq[W]))] = {
|
||||
other match {
|
||||
case other: VertexSetRDD[_] if index == other.index => {
|
||||
// if both RDDs share exactly the same index and therefore the same
|
||||
// super set of keys then we simply merge the value RDDs.
|
||||
// However it is possible that both RDDs are missing a value for a given key in
|
||||
// which case the returned RDD should have a null value
|
||||
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
|
||||
valuesRDD.zipPartitions(other.valuesRDD){
|
||||
(thisIter, otherIter) =>
|
||||
val (thisValues, thisBS) = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
val (otherValues, otherBS) = otherIter.next()
|
||||
assert(!otherIter.hasNext)
|
||||
/**
|
||||
* @todo consider implementing this with a view as in leftJoin to
|
||||
* reduce array allocations
|
||||
*/
|
||||
val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
|
||||
val newBS = thisBS | otherBS
|
||||
|
||||
var ind = newBS.nextSetBit(0)
|
||||
while(ind >= 0) {
|
||||
val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
|
||||
val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
|
||||
newValues(ind) = (a, b)
|
||||
ind = newBS.nextSetBit(ind+1)
|
||||
}
|
||||
Iterator((newValues.toIndexedSeq, newBS))
|
||||
}
|
||||
new VertexSetRDD(index, newValues)
|
||||
}
|
||||
case other: VertexSetRDD[_]
|
||||
if index.rdd.partitioner == other.index.rdd.partitioner => {
|
||||
// If both RDDs are indexed using different indices but with the same partitioners
|
||||
// then we we need to first merge the indicies and then use the merged index to
|
||||
// merge the values.
|
||||
val newIndex =
|
||||
index.rdd.zipPartitions(other.index.rdd)(
|
||||
(thisIter, otherIter) => {
|
||||
val thisIndex = thisIter.next()
|
||||
assert(!thisIter.hasNext)
|
||||
val otherIndex = otherIter.next()
|
||||
assert(!otherIter.hasNext)
|
||||
// Merge the keys
|
||||
val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity)
|
||||
var ind = thisIndex.nextPos(0)
|
||||
while(ind >= 0) {
|
||||
newIndex.fastAdd(thisIndex.getValue(ind))
|
||||
ind = thisIndex.nextPos(ind+1)
|
||||
}
|
||||
var ind = otherIndex.nextPos(0)
|
||||
while(ind >= 0) {
|
||||
newIndex.fastAdd(otherIndex.getValue(ind))
|
||||
ind = otherIndex.nextPos(ind+1)
|
||||
}
|
||||
List(newIndex).iterator
|
||||
}).cache()
|
||||
// Use the new index along with the this and the other indices to merge the values
|
||||
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
|
||||
newIndex.zipPartitions(tuples, other.tuples)(
|
||||
(newIndexIter, thisTuplesIter, otherTuplesIter) => {
|
||||
// Get the new index for this partition
|
||||
val newIndex = newIndexIter.next()
|
||||
assert(!newIndexIter.hasNext)
|
||||
// Get the corresponding indicies and values for this and the other VertexSetRDD
|
||||
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
|
||||
assert(!thisTuplesIter.hasNext)
|
||||
val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next()
|
||||
assert(!otherTuplesIter.hasNext)
|
||||
// Preallocate the new Values array
|
||||
val newValues = new Array[(Seq[V], Seq[W])](newIndex.size)
|
||||
val newBS = new BitSet(newIndex.size)
|
||||
|
||||
// Lookup the sequences in both submaps
|
||||
for ((k,ind) <- newIndex) {
|
||||
// Get the left key
|
||||
val a = if (thisIndex.contains(k)) {
|
||||
val ind = thisIndex.get(k)
|
||||
if(thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
|
||||
} else Seq.empty[V]
|
||||
// Get the right key
|
||||
val b = if (otherIndex.contains(k)) {
|
||||
val ind = otherIndex.get(k)
|
||||
if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
|
||||
} else Seq.empty[W]
|
||||
// If at least one key was present then we generate a tuple.
|
||||
if (!a.isEmpty || !b.isEmpty) {
|
||||
newValues(ind) = (a, b)
|
||||
newBS.set(ind)
|
||||
}
|
||||
}
|
||||
Iterator((newValues.toIndexedSeq, newBS))
|
||||
})
|
||||
new VertexSetRDD(new VertexSetIndex(newIndex), newValues)
|
||||
}
|
||||
case _ => {
|
||||
// Get the partitioner from the index
|
||||
val partitioner = index.rdd.partitioner match {
|
||||
case Some(p) => p
|
||||
case None => throw new SparkException("An index must have a partitioner.")
|
||||
}
|
||||
// Shuffle the other RDD using the partitioner for this index
|
||||
val otherShuffled =
|
||||
if (other.partitioner == Some(partitioner)) {
|
||||
other
|
||||
} else {
|
||||
other.partitionBy(partitioner)
|
||||
}
|
||||
// Join the other RDD with this RDD building a new valueset and new index on the fly
|
||||
val groups = tuples.zipPartitions(otherShuffled)(
|
||||
(thisTuplesIter, otherTuplesIter) => {
|
||||
// Get the corresponding indicies and values for this VertexSetRDD
|
||||
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
|
||||
assert(!thisTuplesIter.hasNext())
|
||||
// Construct a new index
|
||||
val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap]
|
||||
// Construct a new array Buffer to store the values
|
||||
val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null)
|
||||
val newBS = new BitSet(thisValues.size)
|
||||
// populate the newValues with the values in this VertexSetRDD
|
||||
for ((k,i) <- thisIndex) {
|
||||
if (thisBS.get(i)) {
|
||||
newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W])
|
||||
newBS.set(i)
|
||||
}
|
||||
}
|
||||
// Now iterate through the other tuples updating the map
|
||||
for ((k,w) <- otherTuplesIter){
|
||||
if (newIndex.contains(k)) {
|
||||
val ind = newIndex.get(k)
|
||||
if(newBS.get(ind)) {
|
||||
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
|
||||
} else {
|
||||
// If the other key was in the index but not in the values
|
||||
// of this indexed RDD then create a new values entry for it
|
||||
newBS.set(ind)
|
||||
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
|
||||
}
|
||||
} else {
|
||||
// update the index
|
||||
val ind = newIndex.size
|
||||
newIndex.put(k, ind)
|
||||
newBS.set(ind)
|
||||
// Update the values
|
||||
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
|
||||
}
|
||||
}
|
||||
Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
|
||||
}).cache()
|
||||
|
||||
// Extract the index and values from the above RDD
|
||||
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
|
||||
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
|
||||
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
|
||||
|
||||
new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues)
|
||||
}
|
||||
}
|
||||
} // end of cogroup
|
||||
*/
|
||||
|
||||
} // End of VertexSetRDD
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* The VertexSetRDD singleton is used to construct VertexSets
|
||||
*/
|
||||
|
@ -627,7 +448,6 @@ object VertexSetRDD {
|
|||
new VertexSetRDD[V](new VertexSetIndex(index), values)
|
||||
} // end of apply
|
||||
|
||||
|
||||
/**
|
||||
* Construct a vertex set from an RDD using an existing index.
|
||||
*
|
||||
|
@ -642,7 +462,6 @@ object VertexSetRDD {
|
|||
rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] =
|
||||
apply(rdd, index, (a:V,b:V) => a)
|
||||
|
||||
|
||||
/**
|
||||
* Construct a vertex set from an RDD using an existing index and a
|
||||
* user defined `combiner` to merge duplicate vertices.
|
||||
|
@ -659,8 +478,17 @@ object VertexSetRDD {
|
|||
reduceFunc: (V, V) => V): VertexSetRDD[V] =
|
||||
apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
|
||||
|
||||
|
||||
def aggregate[V: ClassManifest](
|
||||
/**
|
||||
* Construct a vertex set from an RDD of AggregationMsgs
|
||||
*
|
||||
* @tparam V the vertex attribute type
|
||||
* @param rdd the rdd containing vertices
|
||||
* @param index the index which must be a superset of the vertices
|
||||
* in RDD
|
||||
* @param reduceFunc the user defined reduce function used to merge
|
||||
* duplicate vertex attributes.
|
||||
*/
|
||||
private[spark] def aggregate[V: ClassManifest](
|
||||
rdd: RDD[AggregationMsg[V]], index: VertexSetIndex,
|
||||
reduceFunc: (V, V) => V): VertexSetRDD[V] = {
|
||||
|
||||
|
@ -696,7 +524,6 @@ object VertexSetRDD {
|
|||
new VertexSetRDD(index, values)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Construct a vertex set from an RDD using an existing index and a
|
||||
* user defined `combiner` to merge duplicate vertices.
|
||||
|
@ -767,7 +594,6 @@ object VertexSetRDD {
|
|||
new VertexSetRDD(index, values)
|
||||
} // end of apply
|
||||
|
||||
|
||||
/**
|
||||
* Construct an index of the unique vertices. The resulting index
|
||||
* can be used to build VertexSets over subsets of the vertices in
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.apache.spark.util.ClosureCleaner
|
|||
import org.apache.spark.graph._
|
||||
import org.apache.spark.graph.impl.GraphImpl._
|
||||
import org.apache.spark.graph.impl.MsgRDDFunctions._
|
||||
import org.apache.spark.graph.util.BytecodeUtils
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
|
||||
|
@ -62,27 +63,25 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
|
|||
|
||||
/**
|
||||
* A Graph RDD that supports computation on graphs.
|
||||
*
|
||||
* @param localVidMap Stores the location of vertex attributes after they are
|
||||
* replicated. Within each partition, localVidMap holds a map from vertex ID to
|
||||
* the index where that vertex's attribute is stored. This index refers to the
|
||||
* arrays in the same partition in the variants of
|
||||
* [[VTableReplicatedValues]]. Therefore, localVidMap can be reused across
|
||||
* changes to the vertex attributes.
|
||||
*/
|
||||
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||
@transient val vTable: VertexSetRDD[VD],
|
||||
@transient val vid2pid: VertexSetRDD[Array[Pid]],
|
||||
@transient val vid2pid: Vid2Pid,
|
||||
@transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)],
|
||||
@transient val eTable: RDD[(Pid, EdgePartition[ED])] )
|
||||
extends Graph[VD, ED] {
|
||||
|
||||
def this() = this(null, null, null, null)
|
||||
|
||||
/**
|
||||
* (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
|
||||
* vertex data after it is replicated. Within each partition, it holds a map
|
||||
* from vertex ID to the index where that vertex's attribute is stored. This
|
||||
* index refers to an array in the same partition in vTableReplicatedValues.
|
||||
*
|
||||
* (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data
|
||||
* and is arranged as described above.
|
||||
*/
|
||||
@transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] =
|
||||
createVTableReplicated(vTable, vid2pid, localVidMap)
|
||||
@transient val vTableReplicatedValues: VTableReplicatedValues[VD] =
|
||||
new VTableReplicatedValues(vTable, vid2pid, localVidMap)
|
||||
|
||||
/** Return a RDD of vertices. */
|
||||
@transient override val vertices = vTable
|
||||
|
@ -94,7 +93,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
|
||||
/** Return a RDD that brings edges with its source and destination vertices together. */
|
||||
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
|
||||
makeTriplets(localVidMap, vTableReplicatedValues, eTable)
|
||||
makeTriplets(localVidMap, vTableReplicatedValues.bothAttrs, eTable)
|
||||
|
||||
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
|
||||
eTable.persist(newLevel)
|
||||
|
@ -110,15 +109,22 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
override def statistics: Map[String, Any] = {
|
||||
val numVertices = this.numVertices
|
||||
val numEdges = this.numEdges
|
||||
val replicationRatio =
|
||||
vid2pid.map(kv => kv._2.size).sum / vTable.count
|
||||
val replicationRatioBothAttrs =
|
||||
vid2pid.bothAttrs.map(kv => kv._2.size).sum / numVertices
|
||||
val replicationRatioSrcAttrOnly =
|
||||
vid2pid.srcAttrOnly.map(kv => kv._2.size).sum / numVertices
|
||||
val replicationRatioDstAttrOnly =
|
||||
vid2pid.dstAttrOnly.map(kv => kv._2.size).sum / numVertices
|
||||
val loadArray =
|
||||
eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
|
||||
val minLoad = loadArray.min
|
||||
val maxLoad = loadArray.max
|
||||
Map(
|
||||
"Num Vertices" -> numVertices, "Num Edges" -> numEdges,
|
||||
"Replication" -> replicationRatio, "Load Array" -> loadArray,
|
||||
"Replication (both)" -> replicationRatioBothAttrs,
|
||||
"Replication (src only)" -> replicationRatioSrcAttrOnly,
|
||||
"Replication (dest only)" -> replicationRatioDstAttrOnly,
|
||||
"Load Array" -> loadArray,
|
||||
"Min Load" -> minLoad, "Max Load" -> maxLoad)
|
||||
}
|
||||
|
||||
|
@ -161,18 +167,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
traverseLineage(vTable, " ", visited)
|
||||
visited += (vTable.id -> "vTable")
|
||||
|
||||
println("\n\nvid2pid -----------------------------------------")
|
||||
traverseLineage(vid2pid, " ", visited)
|
||||
visited += (vid2pid.id -> "vid2pid")
|
||||
visited += (vid2pid.valuesRDD.id -> "vid2pid.values")
|
||||
println("\n\nvid2pid.bothAttrs -------------------------------")
|
||||
traverseLineage(vid2pid.bothAttrs, " ", visited)
|
||||
visited += (vid2pid.bothAttrs.id -> "vid2pid")
|
||||
visited += (vid2pid.bothAttrs.valuesRDD.id -> "vid2pid.bothAttrs")
|
||||
|
||||
println("\n\nlocalVidMap -------------------------------------")
|
||||
traverseLineage(localVidMap, " ", visited)
|
||||
visited += (localVidMap.id -> "localVidMap")
|
||||
|
||||
println("\n\nvTableReplicatedValues --------------------------")
|
||||
traverseLineage(vTableReplicatedValues, " ", visited)
|
||||
visited += (vTableReplicatedValues.id -> "vTableReplicatedValues")
|
||||
println("\n\nvTableReplicatedValues.bothAttrs ----------------")
|
||||
traverseLineage(vTableReplicatedValues.bothAttrs, " ", visited)
|
||||
visited += (vTableReplicatedValues.bothAttrs.id -> "vTableReplicatedValues.bothAttrs")
|
||||
|
||||
println("\n\ntriplets ----------------------------------------")
|
||||
traverseLineage(triplets, " ", visited)
|
||||
|
@ -232,7 +238,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
|||
// Construct the Vid2Pid map. Here we assume that the filter operation
|
||||
// behaves deterministically.
|
||||
// @todo reindex the vertex and edge tables
|
||||
val newVid2Pid = createVid2Pid(newETable, newVTable.index)
|
||||
val newVid2Pid = new Vid2Pid(newETable, newVTable.index)
|
||||
val newVidMap = createLocalVidMap(newETable)
|
||||
|
||||
new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable)
|
||||
|
@ -328,7 +334,7 @@ object GraphImpl {
|
|||
*
|
||||
*/
|
||||
val etable = createETable(edges)
|
||||
val vid2pid = createVid2Pid(etable, vtable.index)
|
||||
val vid2pid = new Vid2Pid(etable, vtable.index)
|
||||
val localVidMap = createLocalVidMap(etable)
|
||||
new GraphImpl(vtable, vid2pid, localVidMap, etable)
|
||||
}
|
||||
|
@ -367,24 +373,9 @@ object GraphImpl {
|
|||
}, preservesPartitioning = true).cache()
|
||||
}
|
||||
|
||||
protected def createVid2Pid[ED: ClassManifest](
|
||||
eTable: RDD[(Pid, EdgePartition[ED])],
|
||||
vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = {
|
||||
val preAgg = eTable.mapPartitions { iter =>
|
||||
val (pid, edgePartition) = iter.next()
|
||||
val vSet = new VertexSet
|
||||
edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
|
||||
vSet.iterator.map { vid => (vid.toLong, pid) }
|
||||
}.partitionBy(vTableIndex.rdd.partitioner.get)
|
||||
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
|
||||
(p: Pid) => ArrayBuffer(p),
|
||||
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
|
||||
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
|
||||
.mapValues(a => a.toArray).cache()
|
||||
}
|
||||
|
||||
protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]):
|
||||
RDD[(Pid, VertexIdToIndexMap)] = {
|
||||
private def createLocalVidMap(
|
||||
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED }
|
||||
): RDD[(Pid, VertexIdToIndexMap)] = {
|
||||
eTable.mapPartitions( _.map{ case (pid, epart) =>
|
||||
val vidToIndex = new VertexIdToIndexMap
|
||||
epart.foreach{ e =>
|
||||
|
@ -395,36 +386,6 @@ object GraphImpl {
|
|||
}, preservesPartitioning = true).cache()
|
||||
}
|
||||
|
||||
protected def createVTableReplicated[VD: ClassManifest](
|
||||
vTable: VertexSetRDD[VD],
|
||||
vid2pid: VertexSetRDD[Array[Pid]],
|
||||
replicationMap: RDD[(Pid, VertexIdToIndexMap)]):
|
||||
RDD[(Pid, Array[VD])] = {
|
||||
// Join vid2pid and vTable, generate a shuffle dependency on the joined
|
||||
// result, and get the shuffle id so we can use it on the slave.
|
||||
val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) =>
|
||||
// TODO(rxin): reuse VertexBroadcastMessage
|
||||
pids.iterator.map { pid =>
|
||||
new VertexBroadcastMsg[VD](pid, vid, vdata)
|
||||
}
|
||||
}.partitionBy(replicationMap.partitioner.get).cache()
|
||||
|
||||
replicationMap.zipPartitions(msgsByPartition){
|
||||
(mapIter, msgsIter) =>
|
||||
val (pid, vidToIndex) = mapIter.next()
|
||||
assert(!mapIter.hasNext)
|
||||
// Populate the vertex array using the vidToIndex map
|
||||
val vertexArray = new Array[VD](vidToIndex.capacity)
|
||||
for (msg <- msgsIter) {
|
||||
val ind = vidToIndex.getPos(msg.vid) & OpenHashSet.POSITION_MASK
|
||||
vertexArray(ind) = msg.data
|
||||
}
|
||||
Iterator((pid, vertexArray))
|
||||
}.cache()
|
||||
|
||||
// @todo assert edge table has partitioner
|
||||
}
|
||||
|
||||
def makeTriplets[VD: ClassManifest, ED: ClassManifest](
|
||||
localVidMap: RDD[(Pid, VertexIdToIndexMap)],
|
||||
vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
|
||||
|
@ -441,7 +402,7 @@ object GraphImpl {
|
|||
def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
|
||||
g: GraphImpl[VD, ED],
|
||||
f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
|
||||
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
|
||||
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues.bothAttrs){
|
||||
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
|
||||
val (pid, edgePartition) = edgePartitionIter.next()
|
||||
val (_, vidToIndex) = vidToIndexIter.next()
|
||||
|
@ -467,8 +428,16 @@ object GraphImpl {
|
|||
ClosureCleaner.clean(mapFunc)
|
||||
ClosureCleaner.clean(reduceFunc)
|
||||
|
||||
// For each vertex, replicate its attribute only to partitions where it is
|
||||
// in the relevant position in an edge.
|
||||
val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
|
||||
val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
|
||||
|
||||
// Map and preaggregate
|
||||
val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
|
||||
val preAgg = g.eTable.zipPartitions(
|
||||
g.localVidMap,
|
||||
g.vTableReplicatedValues.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr)
|
||||
){
|
||||
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
|
||||
val (_, edgePartition) = edgePartitionIter.next()
|
||||
val (_, vidToIndex) = vidToIndexIter.next()
|
||||
|
@ -487,8 +456,12 @@ object GraphImpl {
|
|||
|
||||
edgePartition.foreach { e =>
|
||||
et.set(e)
|
||||
if (mapFuncUsesSrcAttr) {
|
||||
et.srcAttr = vmap(e.srcId)
|
||||
}
|
||||
if (mapFuncUsesDstAttr) {
|
||||
et.dstAttr = vmap(e.dstId)
|
||||
}
|
||||
// TODO(rxin): rewrite the foreach using a simple while loop to speed things up.
|
||||
// Also given we are only allowing zero, one, or two messages, we can completely unroll
|
||||
// the for loop.
|
||||
|
@ -591,4 +564,13 @@ object GraphImpl {
|
|||
math.abs((lower, higher).hashCode()) % numParts
|
||||
}
|
||||
|
||||
private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](
|
||||
closure: AnyRef, attrName: String): Boolean = {
|
||||
try {
|
||||
BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
|
||||
} catch {
|
||||
case _: ClassNotFoundException => true // if we don't know, be conservative
|
||||
}
|
||||
}
|
||||
|
||||
} // end of object GraphImpl
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
package org.apache.spark.graph.impl
|
||||
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.util.collection.{OpenHashSet, PrimitiveKeyOpenHashMap}
|
||||
|
||||
import org.apache.spark.graph._
|
||||
import org.apache.spark.graph.impl.MsgRDDFunctions._
|
||||
|
||||
/**
|
||||
* Stores the vertex attribute values after they are replicated. See
|
||||
* the description of localVidMap in [[GraphImpl]].
|
||||
*/
|
||||
class VTableReplicatedValues[VD: ClassManifest](
|
||||
vTable: VertexSetRDD[VD],
|
||||
vid2pid: Vid2Pid,
|
||||
localVidMap: RDD[(Pid, VertexIdToIndexMap)]) {
|
||||
|
||||
val bothAttrs: RDD[(Pid, Array[VD])] =
|
||||
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, true)
|
||||
val srcAttrOnly: RDD[(Pid, Array[VD])] =
|
||||
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, false)
|
||||
val dstAttrOnly: RDD[(Pid, Array[VD])] =
|
||||
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, true)
|
||||
val noAttrs: RDD[(Pid, Array[VD])] =
|
||||
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, false)
|
||||
|
||||
|
||||
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[(Pid, Array[VD])] =
|
||||
(includeSrcAttr, includeDstAttr) match {
|
||||
case (true, true) => bothAttrs
|
||||
case (true, false) => srcAttrOnly
|
||||
case (false, true) => dstAttrOnly
|
||||
case (false, false) => noAttrs
|
||||
}
|
||||
}
|
||||
|
||||
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD])
|
||||
|
||||
object VTableReplicatedValues {
|
||||
protected def createVTableReplicated[VD: ClassManifest](
|
||||
vTable: VertexSetRDD[VD],
|
||||
vid2pid: Vid2Pid,
|
||||
localVidMap: RDD[(Pid, VertexIdToIndexMap)],
|
||||
includeSrcAttr: Boolean,
|
||||
includeDstAttr: Boolean): RDD[(Pid, Array[VD])] = {
|
||||
|
||||
val pid2vid = vid2pid.getPid2Vid(includeSrcAttr, includeDstAttr)
|
||||
|
||||
val msgsByPartition = pid2vid.zipPartitions(vTable.index.rdd, vTable.valuesRDD) {
|
||||
(pid2vidIter, indexIter, valuesIter) =>
|
||||
val pid2vid = pid2vidIter.next()
|
||||
val index = indexIter.next()
|
||||
val values = valuesIter.next()
|
||||
val vmap = new PrimitiveKeyOpenHashMap(index, values._1)
|
||||
|
||||
// Send each partition the vertex attributes it wants
|
||||
val output = new Array[(Pid, VertexAttributeBlock[VD])](pid2vid.size)
|
||||
for (pid <- 0 until pid2vid.size) {
|
||||
val block = new VertexAttributeBlock(pid2vid(pid), pid2vid(pid).map(vid => vmap(vid)))
|
||||
output(pid) = (pid, block)
|
||||
}
|
||||
output.iterator
|
||||
}.partitionBy(localVidMap.partitioner.get).cache()
|
||||
|
||||
localVidMap.zipPartitions(msgsByPartition){
|
||||
(mapIter, msgsIter) =>
|
||||
val (pid, vidToIndex) = mapIter.next()
|
||||
assert(!mapIter.hasNext)
|
||||
// Populate the vertex array using the vidToIndex map
|
||||
val vertexArray = new Array[VD](vidToIndex.capacity)
|
||||
for ((_, block) <- msgsIter) {
|
||||
for (i <- 0 until block.vids.size) {
|
||||
val vid = block.vids(i)
|
||||
val attr = block.attrs(i)
|
||||
val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
|
||||
vertexArray(ind) = attr
|
||||
}
|
||||
}
|
||||
Iterator((pid, vertexArray))
|
||||
}.cache()
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
package org.apache.spark.graph.impl
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.mutable.ArrayBuilder
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
||||
import org.apache.spark.graph._
|
||||
|
||||
/**
|
||||
* Stores the layout of vertex attributes for GraphImpl.
|
||||
*/
|
||||
class Vid2Pid(
|
||||
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
|
||||
vTableIndex: VertexSetIndex) {
|
||||
|
||||
val bothAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(true, true)
|
||||
val srcAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(true, false)
|
||||
val dstAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(false, true)
|
||||
val noAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(false, false)
|
||||
|
||||
val pid2VidBothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(bothAttrs)
|
||||
val pid2VidSrcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(srcAttrOnly)
|
||||
val pid2VidDstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(dstAttrOnly)
|
||||
val pid2VidNoAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(noAttrs)
|
||||
|
||||
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] =
|
||||
(includeSrcAttr, includeDstAttr) match {
|
||||
case (true, true) => bothAttrs
|
||||
case (true, false) => srcAttrOnly
|
||||
case (false, true) => dstAttrOnly
|
||||
case (false, false) => noAttrs
|
||||
}
|
||||
|
||||
def getPid2Vid(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] =
|
||||
(includeSrcAttr, includeDstAttr) match {
|
||||
case (true, true) => pid2VidBothAttrs
|
||||
case (true, false) => pid2VidSrcAttrOnly
|
||||
case (false, true) => pid2VidDstAttrOnly
|
||||
case (false, false) => pid2VidNoAttrs
|
||||
}
|
||||
|
||||
def persist(newLevel: StorageLevel) {
|
||||
bothAttrs.persist(newLevel)
|
||||
srcAttrOnly.persist(newLevel)
|
||||
dstAttrOnly.persist(newLevel)
|
||||
noAttrs.persist(newLevel)
|
||||
}
|
||||
|
||||
private def createVid2Pid(
|
||||
includeSrcAttr: Boolean,
|
||||
includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] = {
|
||||
val preAgg = eTable.mapPartitions { iter =>
|
||||
val (pid, edgePartition) = iter.next()
|
||||
val vSet = new VertexSet
|
||||
if (includeSrcAttr || includeDstAttr) {
|
||||
edgePartition.foreach(e => {
|
||||
if (includeSrcAttr) vSet.add(e.srcId)
|
||||
if (includeDstAttr) vSet.add(e.dstId)
|
||||
})
|
||||
}
|
||||
vSet.iterator.map { vid => (vid.toLong, pid) }
|
||||
}
|
||||
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
|
||||
(p: Pid) => ArrayBuffer(p),
|
||||
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
|
||||
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
|
||||
.mapValues(a => a.toArray).cache()
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an intermediate pid2vid structure that tells each partition of the
|
||||
* vertex data where it should go.
|
||||
*/
|
||||
private def createPid2Vid(vid2pid: VertexSetRDD[Array[Pid]]): RDD[Array[Array[Vid]]] = {
|
||||
val numPartitions = vid2pid.partitions.size
|
||||
vid2pid.mapPartitions { iter =>
|
||||
val pid2vidLocal = Array.fill[ArrayBuilder[Vid]](numPartitions)(ArrayBuilder.make[Vid])
|
||||
for ((vid, pids) <- iter) {
|
||||
pids.foreach { pid => pid2vidLocal(pid) += vid }
|
||||
}
|
||||
Iterator(pid2vidLocal.map(_.result))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -33,6 +33,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("mapReduceTriplets") {
|
||||
withSpark(new SparkContext("local", "test")) { sc =>
|
||||
val n = 3
|
||||
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))))
|
||||
|
||||
val neighborDegreeSums = star.mapReduceTriplets(
|
||||
edge => Array((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
|
||||
(a: Int, b: Int) => a + b)
|
||||
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
test("aggregateNeighbors") {
|
||||
withSpark(new SparkContext("local", "test")) { sc =>
|
||||
val n = 3
|
||||
|
|
Loading…
Reference in a new issue