This commit is contained in:
Dan Crankshaw 2013-11-13 23:31:14 +00:00
commit 958d7213a5
8 changed files with 283 additions and 289 deletions

View file

@ -27,7 +27,7 @@ the challenges of graph construction and transformation and provide
limited fault-tolerance and support for interactive analysis.
<p align="center">
<img src="https://raw.github.com/jegonzal/graphx/Documentation/docs/img/data_parallel_vs_graph_parallel.png" />
<img src="https://raw.github.com/amplab/graphx/master/docs/img/data_parallel_vs_graph_parallel.png" />
</p>
@ -47,7 +47,7 @@ Finally, by exploiting the Scala foundation of Spark, we enable users
to interactively load, transform, and compute on massive graphs.
<p align="center">
<img src="https://raw.github.com/jegonzal/graphx/Documentation/docs/img/tables_and_graphs.png" />
<img src="https://raw.github.com/amplab/graphx/master/docs/img/tables_and_graphs.png" />
</p>
## Examples

View file

@ -41,9 +41,11 @@ class BitSet(numBits: Int) {
val wordIndex = bitIndex >> 6 // divide by 64
var i = 0
while(i < wordIndex) { words(i) = -1; i += 1 }
// Set the remaining bits
val mask = ~(-1L << (bitIndex & 0x3f))
words(wordIndex) |= mask
if(wordIndex < words.size) {
// Set the remaining bits (note that the mask could still be zero)
val mask = ~(-1L << (bitIndex & 0x3f))
words(wordIndex) |= mask
}
}

View file

@ -18,6 +18,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
}

View file

@ -25,6 +25,7 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
import org.apache.spark.graph.impl.AggregationMsg
import org.apache.spark.graph.impl.MsgRDDFunctions._
/**
* The `VertexSetIndex` maintains the per-partition mapping from
* vertex id to the corresponding location in the per-partition values
@ -88,7 +89,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
extends RDD[(Vid, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
/**
* Construct a new VertexSetRDD that is indexed by only the keys in the RDD.
* The resulting VertexSet will be based on a different index and can
@ -96,7 +96,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
*/
def reindex(): VertexSetRDD[V] = VertexSetRDD(this)
/**
* An internal representation which joins the block indices with the values
* This is used by the compute function to emulate RDD[(Vid, V)]
@ -104,19 +103,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
protected[spark] val tuples =
new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
/**
* The partitioner is defined by the index.
*/
override val partitioner = index.rdd.partitioner
/**
* The actual partitions are defined by the tuples.
*/
override def getPartitions: Array[Partition] = tuples.getPartitions
/**
* The preferred locations are computed based on the preferred
* locations of the tuples.
@ -124,7 +120,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
override def getPreferredLocations(s: Partition): Seq[String] =
tuples.getPreferredLocations(s)
/**
* Caching an VertexSetRDD causes the index and values to be cached separately.
*/
@ -134,15 +129,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
return this
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): VertexSetRDD[V] = persist()
/**
* Provide the RDD[(K,V)] equivalent output.
*/
@ -152,7 +144,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
}
} // end of compute
/**
* Restrict the vertex set to the set of vertices satisfying the
* given predicate.
@ -190,7 +181,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
new VertexSetRDD[V](index, newValues)
} // end of filter
/**
* Pass each vertex attribute through a map function and retain the
* original RDD's partitioning and index.
@ -214,7 +204,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
new VertexSetRDD[U](index, newValuesRDD)
} // end of mapValues
/**
* Pass each vertex attribute along with the vertex id through a map
* function and retain the original RDD's partitioning and index.
@ -247,8 +236,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* @todo update docs to reflect function argument
*
* Inner join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set will only contain
@ -257,6 +244,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @return a VertexSetRDD containing only the vertices in both this
* and the other VertexSet and with tuple attributes.
*
@ -287,13 +276,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* @todo document
* Inner join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index.
*
* @param other
* @param f
* @tparam W
* @tparam Z
* @return
* @param other the vertex set to join with this vertex set
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a collection of tuples.
* @tparam W the type of the other vertex set attributes
* @tparam Z the type of the tuples emitted by `f`
* @return an RDD containing the tuples emitted by `f`
*/
def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]):
RDD[Z] = {
@ -316,8 +308,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
/**
* @todo update docs to reflect function argument
* Left join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index. The resulting vertex set contains an entry
@ -327,6 +317,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @return a VertexSetRDD containing all the vertices in this
* VertexSet with `None` attributes used for Vertices missing in the
* other VertexSet.
@ -368,11 +360,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
* @tparam W the attribute type of the other VertexSet
*
* @param other the other VertexSet with which to join.
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a new vertex attribute.
* @param merge the function used combine duplicate vertex
* attributes
* @return a VertexSetRDD containing all the vertices in this
* VertexSet with `None` attributes used for Vertices missing in the
* other VertexSet.
* VertexSet with the attribute emitted by f.
*
*/
def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)])
@ -396,181 +389,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
}
} // end of leftJoin
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this` as well as `other`.
*/
/*
def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner):
VertexSetRDD[(Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = {
other match {
case other: VertexSetRDD[_] if index == other.index => {
// if both RDDs share exactly the same index and therefore the same
// super set of keys then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
valuesRDD.zipPartitions(other.valuesRDD){
(thisIter, otherIter) =>
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
val (otherValues, otherBS) = otherIter.next()
assert(!otherIter.hasNext)
/**
* @todo consider implementing this with a view as in leftJoin to
* reduce array allocations
*/
val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
val newBS = thisBS | otherBS
var ind = newBS.nextSetBit(0)
while(ind >= 0) {
val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
newValues(ind) = (a, b)
ind = newBS.nextSetBit(ind+1)
}
Iterator((newValues.toIndexedSeq, newBS))
}
new VertexSetRDD(index, newValues)
}
case other: VertexSetRDD[_]
if index.rdd.partitioner == other.index.rdd.partitioner => {
// If both RDDs are indexed using different indices but with the same partitioners
// then we we need to first merge the indicies and then use the merged index to
// merge the values.
val newIndex =
index.rdd.zipPartitions(other.index.rdd)(
(thisIter, otherIter) => {
val thisIndex = thisIter.next()
assert(!thisIter.hasNext)
val otherIndex = otherIter.next()
assert(!otherIter.hasNext)
// Merge the keys
val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity)
var ind = thisIndex.nextPos(0)
while(ind >= 0) {
newIndex.fastAdd(thisIndex.getValue(ind))
ind = thisIndex.nextPos(ind+1)
}
var ind = otherIndex.nextPos(0)
while(ind >= 0) {
newIndex.fastAdd(otherIndex.getValue(ind))
ind = otherIndex.nextPos(ind+1)
}
List(newIndex).iterator
}).cache()
// Use the new index along with the this and the other indices to merge the values
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
newIndex.zipPartitions(tuples, other.tuples)(
(newIndexIter, thisTuplesIter, otherTuplesIter) => {
// Get the new index for this partition
val newIndex = newIndexIter.next()
assert(!newIndexIter.hasNext)
// Get the corresponding indicies and values for this and the other VertexSetRDD
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext)
val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next()
assert(!otherTuplesIter.hasNext)
// Preallocate the new Values array
val newValues = new Array[(Seq[V], Seq[W])](newIndex.size)
val newBS = new BitSet(newIndex.size)
// Lookup the sequences in both submaps
for ((k,ind) <- newIndex) {
// Get the left key
val a = if (thisIndex.contains(k)) {
val ind = thisIndex.get(k)
if(thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
} else Seq.empty[V]
// Get the right key
val b = if (otherIndex.contains(k)) {
val ind = otherIndex.get(k)
if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
} else Seq.empty[W]
// If at least one key was present then we generate a tuple.
if (!a.isEmpty || !b.isEmpty) {
newValues(ind) = (a, b)
newBS.set(ind)
}
}
Iterator((newValues.toIndexedSeq, newBS))
})
new VertexSetRDD(new VertexSetIndex(newIndex), newValues)
}
case _ => {
// Get the partitioner from the index
val partitioner = index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Shuffle the other RDD using the partitioner for this index
val otherShuffled =
if (other.partitioner == Some(partitioner)) {
other
} else {
other.partitionBy(partitioner)
}
// Join the other RDD with this RDD building a new valueset and new index on the fly
val groups = tuples.zipPartitions(otherShuffled)(
(thisTuplesIter, otherTuplesIter) => {
// Get the corresponding indicies and values for this VertexSetRDD
val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
// Construct a new index
val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap]
// Construct a new array Buffer to store the values
val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null)
val newBS = new BitSet(thisValues.size)
// populate the newValues with the values in this VertexSetRDD
for ((k,i) <- thisIndex) {
if (thisBS.get(i)) {
newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W])
newBS.set(i)
}
}
// Now iterate through the other tuples updating the map
for ((k,w) <- otherTuplesIter){
if (newIndex.contains(k)) {
val ind = newIndex.get(k)
if(newBS.get(ind)) {
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
} else {
// If the other key was in the index but not in the values
// of this indexed RDD then create a new values entry for it
newBS.set(ind)
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
}
} else {
// update the index
val ind = newIndex.size
newIndex.put(k, ind)
newBS.set(ind)
// Update the values
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
}
}
Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
}).cache()
// Extract the index and values from the above RDD
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues)
}
}
} // end of cogroup
*/
} // End of VertexSetRDD
/**
* The VertexSetRDD singleton is used to construct VertexSets
*/
@ -627,7 +448,6 @@ object VertexSetRDD {
new VertexSetRDD[V](new VertexSetIndex(index), values)
} // end of apply
/**
* Construct a vertex set from an RDD using an existing index.
*
@ -642,7 +462,6 @@ object VertexSetRDD {
rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] =
apply(rdd, index, (a:V,b:V) => a)
/**
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
@ -659,8 +478,17 @@ object VertexSetRDD {
reduceFunc: (V, V) => V): VertexSetRDD[V] =
apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
def aggregate[V: ClassManifest](
/**
* Construct a vertex set from an RDD of AggregationMsgs
*
* @tparam V the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
* @param reduceFunc the user defined reduce function used to merge
* duplicate vertex attributes.
*/
private[spark] def aggregate[V: ClassManifest](
rdd: RDD[AggregationMsg[V]], index: VertexSetIndex,
reduceFunc: (V, V) => V): VertexSetRDD[V] = {
@ -696,7 +524,6 @@ object VertexSetRDD {
new VertexSetRDD(index, values)
}
/**
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
@ -767,7 +594,6 @@ object VertexSetRDD {
new VertexSetRDD(index, values)
} // end of apply
/**
* Construct an index of the unique vertices. The resulting index
* can be used to build VertexSets over subsets of the vertices in

View file

@ -12,6 +12,7 @@ import org.apache.spark.util.ClosureCleaner
import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
import org.apache.spark.graph.util.BytecodeUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
@ -21,9 +22,9 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
* The Iterator type returned when constructing edge triplets
*/
class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
private var pos = 0
private val et = new EdgeTriplet[VD, ED]
@ -62,27 +63,25 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
/**
* A Graph RDD that supports computation on graphs.
*
* @param localVidMap Stores the location of vertex attributes after they are
* replicated. Within each partition, localVidMap holds a map from vertex ID to
* the index where that vertex's attribute is stored. This index refers to the
* arrays in the same partition in the variants of
* [[VTableReplicatedValues]]. Therefore, localVidMap can be reused across
* changes to the vertex attributes.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vTable: VertexSetRDD[VD],
@transient val vid2pid: VertexSetRDD[Array[Pid]],
@transient val vid2pid: Vid2Pid,
@transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)],
@transient val eTable: RDD[(Pid, EdgePartition[ED])] )
extends Graph[VD, ED] {
def this() = this(null, null, null, null)
/**
* (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
* vertex data after it is replicated. Within each partition, it holds a map
* from vertex ID to the index where that vertex's attribute is stored. This
* index refers to an array in the same partition in vTableReplicatedValues.
*
* (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data
* and is arranged as described above.
*/
@transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] =
createVTableReplicated(vTable, vid2pid, localVidMap)
@transient val vTableReplicatedValues: VTableReplicatedValues[VD] =
new VTableReplicatedValues(vTable, vid2pid, localVidMap)
/** Return a RDD of vertices. */
@transient override val vertices = vTable
@ -94,7 +93,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
/** Return a RDD that brings edges with its source and destination vertices together. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
makeTriplets(localVidMap, vTableReplicatedValues, eTable)
makeTriplets(localVidMap, vTableReplicatedValues.bothAttrs, eTable)
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
eTable.persist(newLevel)
@ -110,15 +109,22 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def statistics: Map[String, Any] = {
val numVertices = this.numVertices
val numEdges = this.numEdges
val replicationRatio =
vid2pid.map(kv => kv._2.size).sum / vTable.count
val replicationRatioBothAttrs =
vid2pid.bothAttrs.map(kv => kv._2.size).sum / numVertices
val replicationRatioSrcAttrOnly =
vid2pid.srcAttrOnly.map(kv => kv._2.size).sum / numVertices
val replicationRatioDstAttrOnly =
vid2pid.dstAttrOnly.map(kv => kv._2.size).sum / numVertices
val loadArray =
eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
val minLoad = loadArray.min
val maxLoad = loadArray.max
Map(
"Num Vertices" -> numVertices, "Num Edges" -> numEdges,
"Replication" -> replicationRatio, "Load Array" -> loadArray,
"Replication (both)" -> replicationRatioBothAttrs,
"Replication (src only)" -> replicationRatioSrcAttrOnly,
"Replication (dest only)" -> replicationRatioDstAttrOnly,
"Load Array" -> loadArray,
"Min Load" -> minLoad, "Max Load" -> maxLoad)
}
@ -161,18 +167,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
traverseLineage(vTable, " ", visited)
visited += (vTable.id -> "vTable")
println("\n\nvid2pid -----------------------------------------")
traverseLineage(vid2pid, " ", visited)
visited += (vid2pid.id -> "vid2pid")
visited += (vid2pid.valuesRDD.id -> "vid2pid.values")
println("\n\nvid2pid.bothAttrs -------------------------------")
traverseLineage(vid2pid.bothAttrs, " ", visited)
visited += (vid2pid.bothAttrs.id -> "vid2pid")
visited += (vid2pid.bothAttrs.valuesRDD.id -> "vid2pid.bothAttrs")
println("\n\nlocalVidMap -------------------------------------")
traverseLineage(localVidMap, " ", visited)
visited += (localVidMap.id -> "localVidMap")
println("\n\nvTableReplicatedValues --------------------------")
traverseLineage(vTableReplicatedValues, " ", visited)
visited += (vTableReplicatedValues.id -> "vTableReplicatedValues")
println("\n\nvTableReplicatedValues.bothAttrs ----------------")
traverseLineage(vTableReplicatedValues.bothAttrs, " ", visited)
visited += (vTableReplicatedValues.bothAttrs.id -> "vTableReplicatedValues.bothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
@ -232,7 +238,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Construct the Vid2Pid map. Here we assume that the filter operation
// behaves deterministically.
// @todo reindex the vertex and edge tables
val newVid2Pid = createVid2Pid(newETable, newVTable.index)
val newVid2Pid = new Vid2Pid(newETable, newVTable.index)
val newVidMap = createLocalVidMap(newETable)
new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable)
@ -328,7 +334,7 @@ object GraphImpl {
*
*/
val etable = createETable(edges)
val vid2pid = createVid2Pid(etable, vtable.index)
val vid2pid = new Vid2Pid(etable, vtable.index)
val localVidMap = createLocalVidMap(etable)
new GraphImpl(vtable, vid2pid, localVidMap, etable)
}
@ -367,24 +373,9 @@ object GraphImpl {
}, preservesPartitioning = true).cache()
}
protected def createVid2Pid[ED: ClassManifest](
eTable: RDD[(Pid, EdgePartition[ED])],
vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = {
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
vSet.iterator.map { vid => (vid.toLong, pid) }
}.partitionBy(vTableIndex.rdd.partitioner.get)
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray).cache()
}
protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]):
RDD[(Pid, VertexIdToIndexMap)] = {
private def createLocalVidMap(
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED }
): RDD[(Pid, VertexIdToIndexMap)] = {
eTable.mapPartitions( _.map{ case (pid, epart) =>
val vidToIndex = new VertexIdToIndexMap
epart.foreach{ e =>
@ -395,36 +386,6 @@ object GraphImpl {
}, preservesPartitioning = true).cache()
}
protected def createVTableReplicated[VD: ClassManifest](
vTable: VertexSetRDD[VD],
vid2pid: VertexSetRDD[Array[Pid]],
replicationMap: RDD[(Pid, VertexIdToIndexMap)]):
RDD[(Pid, Array[VD])] = {
// Join vid2pid and vTable, generate a shuffle dependency on the joined
// result, and get the shuffle id so we can use it on the slave.
val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) =>
// TODO(rxin): reuse VertexBroadcastMessage
pids.iterator.map { pid =>
new VertexBroadcastMsg[VD](pid, vid, vdata)
}
}.partitionBy(replicationMap.partitioner.get).cache()
replicationMap.zipPartitions(msgsByPartition){
(mapIter, msgsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.capacity)
for (msg <- msgsIter) {
val ind = vidToIndex.getPos(msg.vid) & OpenHashSet.POSITION_MASK
vertexArray(ind) = msg.data
}
Iterator((pid, vertexArray))
}.cache()
// @todo assert edge table has partitioner
}
def makeTriplets[VD: ClassManifest, ED: ClassManifest](
localVidMap: RDD[(Pid, VertexIdToIndexMap)],
vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
@ -441,7 +402,7 @@ object GraphImpl {
def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
g: GraphImpl[VD, ED],
f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues.bothAttrs){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (pid, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
@ -467,8 +428,16 @@ object GraphImpl {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// For each vertex, replicate its attribute only to partitions where it is
// in the relevant position in an edge.
val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
// Map and preaggregate
val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
val preAgg = g.eTable.zipPartitions(
g.localVidMap,
g.vTableReplicatedValues.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr)
){
(edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
val (_, edgePartition) = edgePartitionIter.next()
val (_, vidToIndex) = vidToIndexIter.next()
@ -487,8 +456,12 @@ object GraphImpl {
edgePartition.foreach { e =>
et.set(e)
et.srcAttr = vmap(e.srcId)
et.dstAttr = vmap(e.dstId)
if (mapFuncUsesSrcAttr) {
et.srcAttr = vmap(e.srcId)
}
if (mapFuncUsesDstAttr) {
et.dstAttr = vmap(e.dstId)
}
// TODO(rxin): rewrite the foreach using a simple while loop to speed things up.
// Also given we are only allowing zero, one, or two messages, we can completely unroll
// the for loop.
@ -591,4 +564,13 @@ object GraphImpl {
math.abs((lower, higher).hashCode()) % numParts
}
private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](
closure: AnyRef, attrName: String): Boolean = {
try {
BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
} catch {
case _: ClassNotFoundException => true // if we don't know, be conservative
}
}
} // end of object GraphImpl

View file

@ -0,0 +1,84 @@
package org.apache.spark.graph.impl
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.{OpenHashSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.graph._
import org.apache.spark.graph.impl.MsgRDDFunctions._
/**
* Stores the vertex attribute values after they are replicated. See
* the description of localVidMap in [[GraphImpl]].
*/
class VTableReplicatedValues[VD: ClassManifest](
vTable: VertexSetRDD[VD],
vid2pid: Vid2Pid,
localVidMap: RDD[(Pid, VertexIdToIndexMap)]) {
val bothAttrs: RDD[(Pid, Array[VD])] =
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, true)
val srcAttrOnly: RDD[(Pid, Array[VD])] =
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, false)
val dstAttrOnly: RDD[(Pid, Array[VD])] =
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, true)
val noAttrs: RDD[(Pid, Array[VD])] =
VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, false)
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[(Pid, Array[VD])] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
case (false, true) => dstAttrOnly
case (false, false) => noAttrs
}
}
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD])
object VTableReplicatedValues {
protected def createVTableReplicated[VD: ClassManifest](
vTable: VertexSetRDD[VD],
vid2pid: Vid2Pid,
localVidMap: RDD[(Pid, VertexIdToIndexMap)],
includeSrcAttr: Boolean,
includeDstAttr: Boolean): RDD[(Pid, Array[VD])] = {
val pid2vid = vid2pid.getPid2Vid(includeSrcAttr, includeDstAttr)
val msgsByPartition = pid2vid.zipPartitions(vTable.index.rdd, vTable.valuesRDD) {
(pid2vidIter, indexIter, valuesIter) =>
val pid2vid = pid2vidIter.next()
val index = indexIter.next()
val values = valuesIter.next()
val vmap = new PrimitiveKeyOpenHashMap(index, values._1)
// Send each partition the vertex attributes it wants
val output = new Array[(Pid, VertexAttributeBlock[VD])](pid2vid.size)
for (pid <- 0 until pid2vid.size) {
val block = new VertexAttributeBlock(pid2vid(pid), pid2vid(pid).map(vid => vmap(vid)))
output(pid) = (pid, block)
}
output.iterator
}.partitionBy(localVidMap.partitioner.get).cache()
localVidMap.zipPartitions(msgsByPartition){
(mapIter, msgsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.capacity)
for ((_, block) <- msgsIter) {
for (i <- 0 until block.vids.size) {
val vid = block.vids(i)
val attr = block.attrs(i)
val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
vertexArray(ind) = attr
}
}
Iterator((pid, vertexArray))
}.cache()
}
}

View file

@ -0,0 +1,87 @@
package org.apache.spark.graph.impl
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuilder
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graph._
/**
* Stores the layout of vertex attributes for GraphImpl.
*/
class Vid2Pid(
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
vTableIndex: VertexSetIndex) {
val bothAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(true, true)
val srcAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(true, false)
val dstAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(false, true)
val noAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(false, false)
val pid2VidBothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(bothAttrs)
val pid2VidSrcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(srcAttrOnly)
val pid2VidDstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(dstAttrOnly)
val pid2VidNoAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(noAttrs)
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
case (false, true) => dstAttrOnly
case (false, false) => noAttrs
}
def getPid2Vid(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => pid2VidBothAttrs
case (true, false) => pid2VidSrcAttrOnly
case (false, true) => pid2VidDstAttrOnly
case (false, false) => pid2VidNoAttrs
}
def persist(newLevel: StorageLevel) {
bothAttrs.persist(newLevel)
srcAttrOnly.persist(newLevel)
dstAttrOnly.persist(newLevel)
noAttrs.persist(newLevel)
}
private def createVid2Pid(
includeSrcAttr: Boolean,
includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] = {
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val vSet = new VertexSet
if (includeSrcAttr || includeDstAttr) {
edgePartition.foreach(e => {
if (includeSrcAttr) vSet.add(e.srcId)
if (includeDstAttr) vSet.add(e.dstId)
})
}
vSet.iterator.map { vid => (vid.toLong, pid) }
}
VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray).cache()
}
/**
* Creates an intermediate pid2vid structure that tells each partition of the
* vertex data where it should go.
*/
private def createPid2Vid(vid2pid: VertexSetRDD[Array[Pid]]): RDD[Array[Array[Vid]]] = {
val numPartitions = vid2pid.partitions.size
vid2pid.mapPartitions { iter =>
val pid2vidLocal = Array.fill[ArrayBuilder[Vid]](numPartitions)(ArrayBuilder.make[Vid])
for ((vid, pids) <- iter) {
pids.foreach { pid => pid2vidLocal(pid) += vid }
}
Iterator(pid2vidLocal.map(_.result))
}
}
}

View file

@ -33,6 +33,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
test("mapReduceTriplets") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))))
val neighborDegreeSums = star.mapReduceTriplets(
edge => Array((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
(a: Int, b: Int) => a + b)
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
}
}
test("aggregateNeighbors") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
@ -87,6 +99,6 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
}
}
}
}