diff --git a/README.md b/README.md index 7790139c8f..5b06d82225 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ the challenges of graph construction and transformation and provide limited fault-tolerance and support for interactive analysis.

- +

@@ -47,7 +47,7 @@ Finally, by exploiting the Scala foundation of Spark, we enable users to interactively load, transform, and compute on massive graphs.

- +

## Examples diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index 82b9198e43..baf8099556 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -18,6 +18,7 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[EdgePartition[Object]]) kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) + kryo.register(classOf[VertexAttributeBlock[Object]]) // This avoids a large number of hash table lookups. kryo.setReferences(false) } diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 62608e506d..401d5f29bc 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -25,6 +25,7 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa import org.apache.spark.graph.impl.AggregationMsg import org.apache.spark.graph.impl.MsgRDDFunctions._ + /** * The `VertexSetIndex` maintains the per-partition mapping from * vertex id to the corresponding location in the per-partition values @@ -88,7 +89,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( extends RDD[(Vid, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { - /** * Construct a new VertexSetRDD that is indexed by only the keys in the RDD. * The resulting VertexSet will be based on a different index and can @@ -96,7 +96,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( */ def reindex(): VertexSetRDD[V] = VertexSetRDD(this) - /** * An internal representation which joins the block indices with the values * This is used by the compute function to emulate RDD[(Vid, V)] @@ -104,19 +103,16 @@ class VertexSetRDD[@specialized V: ClassManifest]( protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) - /** * The partitioner is defined by the index. */ override val partitioner = index.rdd.partitioner - /** * The actual partitions are defined by the tuples. */ override def getPartitions: Array[Partition] = tuples.getPartitions - /** * The preferred locations are computed based on the preferred * locations of the tuples. @@ -124,7 +120,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( override def getPreferredLocations(s: Partition): Seq[String] = tuples.getPreferredLocations(s) - /** * Caching an VertexSetRDD causes the index and values to be cached separately. */ @@ -134,15 +129,12 @@ class VertexSetRDD[@specialized V: ClassManifest]( return this } - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY) - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def cache(): VertexSetRDD[V] = persist() - /** * Provide the RDD[(K,V)] equivalent output. */ @@ -152,7 +144,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( } } // end of compute - /** * Restrict the vertex set to the set of vertices satisfying the * given predicate. @@ -190,7 +181,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( new VertexSetRDD[V](index, newValues) } // end of filter - /** * Pass each vertex attribute through a map function and retain the * original RDD's partitioning and index. @@ -214,7 +204,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( new VertexSetRDD[U](index, newValuesRDD) } // end of mapValues - /** * Pass each vertex attribute along with the vertex id through a map * function and retain the original RDD's partitioning and index. @@ -247,8 +236,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * @todo update docs to reflect function argument - * * Inner join this VertexSet with another VertexSet which has the * same Index. This function will fail if both VertexSets do not * share the same index. The resulting vertex set will only contain @@ -257,6 +244,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( * @tparam W the attribute type of the other VertexSet * * @param other the other VertexSet with which to join. + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a new vertex attribute. * @return a VertexSetRDD containing only the vertices in both this * and the other VertexSet and with tuple attributes. * @@ -287,13 +276,16 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * @todo document + * Inner join this VertexSet with another VertexSet which has the + * same Index. This function will fail if both VertexSets do not + * share the same index. * - * @param other - * @param f - * @tparam W - * @tparam Z - * @return + * @param other the vertex set to join with this vertex set + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a collection of tuples. + * @tparam W the type of the other vertex set attributes + * @tparam Z the type of the tuples emitted by `f` + * @return an RDD containing the tuples emitted by `f` */ def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]): RDD[Z] = { @@ -316,8 +308,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( /** - * @todo update docs to reflect function argument - * Left join this VertexSet with another VertexSet which has the * same Index. This function will fail if both VertexSets do not * share the same index. The resulting vertex set contains an entry @@ -327,6 +317,8 @@ class VertexSetRDD[@specialized V: ClassManifest]( * @tparam W the attribute type of the other VertexSet * * @param other the other VertexSet with which to join. + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a new vertex attribute. * @return a VertexSetRDD containing all the vertices in this * VertexSet with `None` attributes used for Vertices missing in the * other VertexSet. @@ -368,11 +360,12 @@ class VertexSetRDD[@specialized V: ClassManifest]( * @tparam W the attribute type of the other VertexSet * * @param other the other VertexSet with which to join. + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a new vertex attribute. * @param merge the function used combine duplicate vertex * attributes * @return a VertexSetRDD containing all the vertices in this - * VertexSet with `None` attributes used for Vertices missing in the - * other VertexSet. + * VertexSet with the attribute emitted by f. * */ def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)]) @@ -396,181 +389,9 @@ class VertexSetRDD[@specialized V: ClassManifest]( } } // end of leftJoin - - - /** - * For each key k in `this` or `other`, return a resulting RDD that contains a - * tuple with the list of values for that key in `this` as well as `other`. - */ - /* - def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): - VertexSetRDD[(Seq[V], Seq[W])] = { - //RDD[(K, (Seq[V], Seq[W]))] = { - other match { - case other: VertexSetRDD[_] if index == other.index => { - // if both RDDs share exactly the same index and therefore the same - // super set of keys then we simply merge the value RDDs. - // However it is possible that both RDDs are missing a value for a given key in - // which case the returned RDD should have a null value - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter, otherIter) => - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - val (otherValues, otherBS) = otherIter.next() - assert(!otherIter.hasNext) - /** - * @todo consider implementing this with a view as in leftJoin to - * reduce array allocations - */ - val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) - val newBS = thisBS | otherBS - - var ind = newBS.nextSetBit(0) - while(ind >= 0) { - val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V] - val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W] - newValues(ind) = (a, b) - ind = newBS.nextSetBit(ind+1) - } - Iterator((newValues.toIndexedSeq, newBS)) - } - new VertexSetRDD(index, newValues) - } - case other: VertexSetRDD[_] - if index.rdd.partitioner == other.index.rdd.partitioner => { - // If both RDDs are indexed using different indices but with the same partitioners - // then we we need to first merge the indicies and then use the merged index to - // merge the values. - val newIndex = - index.rdd.zipPartitions(other.index.rdd)( - (thisIter, otherIter) => { - val thisIndex = thisIter.next() - assert(!thisIter.hasNext) - val otherIndex = otherIter.next() - assert(!otherIter.hasNext) - // Merge the keys - val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity) - var ind = thisIndex.nextPos(0) - while(ind >= 0) { - newIndex.fastAdd(thisIndex.getValue(ind)) - ind = thisIndex.nextPos(ind+1) - } - var ind = otherIndex.nextPos(0) - while(ind >= 0) { - newIndex.fastAdd(otherIndex.getValue(ind)) - ind = otherIndex.nextPos(ind+1) - } - List(newIndex).iterator - }).cache() - // Use the new index along with the this and the other indices to merge the values - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - newIndex.zipPartitions(tuples, other.tuples)( - (newIndexIter, thisTuplesIter, otherTuplesIter) => { - // Get the new index for this partition - val newIndex = newIndexIter.next() - assert(!newIndexIter.hasNext) - // Get the corresponding indicies and values for this and the other VertexSetRDD - val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() - assert(!thisTuplesIter.hasNext) - val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next() - assert(!otherTuplesIter.hasNext) - // Preallocate the new Values array - val newValues = new Array[(Seq[V], Seq[W])](newIndex.size) - val newBS = new BitSet(newIndex.size) - - // Lookup the sequences in both submaps - for ((k,ind) <- newIndex) { - // Get the left key - val a = if (thisIndex.contains(k)) { - val ind = thisIndex.get(k) - if(thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V] - } else Seq.empty[V] - // Get the right key - val b = if (otherIndex.contains(k)) { - val ind = otherIndex.get(k) - if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W] - } else Seq.empty[W] - // If at least one key was present then we generate a tuple. - if (!a.isEmpty || !b.isEmpty) { - newValues(ind) = (a, b) - newBS.set(ind) - } - } - Iterator((newValues.toIndexedSeq, newBS)) - }) - new VertexSetRDD(new VertexSetIndex(newIndex), newValues) - } - case _ => { - // Get the partitioner from the index - val partitioner = index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - // Shuffle the other RDD using the partitioner for this index - val otherShuffled = - if (other.partitioner == Some(partitioner)) { - other - } else { - other.partitionBy(partitioner) - } - // Join the other RDD with this RDD building a new valueset and new index on the fly - val groups = tuples.zipPartitions(otherShuffled)( - (thisTuplesIter, otherTuplesIter) => { - // Get the corresponding indicies and values for this VertexSetRDD - val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() - assert(!thisTuplesIter.hasNext()) - // Construct a new index - val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap] - // Construct a new array Buffer to store the values - val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null) - val newBS = new BitSet(thisValues.size) - // populate the newValues with the values in this VertexSetRDD - for ((k,i) <- thisIndex) { - if (thisBS.get(i)) { - newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) - newBS.set(i) - } - } - // Now iterate through the other tuples updating the map - for ((k,w) <- otherTuplesIter){ - if (newIndex.contains(k)) { - val ind = newIndex.get(k) - if(newBS.get(ind)) { - newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) - } else { - // If the other key was in the index but not in the values - // of this indexed RDD then create a new values entry for it - newBS.set(ind) - newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) - } - } else { - // update the index - val ind = newIndex.size - newIndex.put(k, ind) - newBS.set(ind) - // Update the values - newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) - } - } - Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) - }).cache() - - // Extract the index and values from the above RDD - val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - - new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues) - } - } - } // end of cogroup - */ - } // End of VertexSetRDD - /** * The VertexSetRDD singleton is used to construct VertexSets */ @@ -627,7 +448,6 @@ object VertexSetRDD { new VertexSetRDD[V](new VertexSetIndex(index), values) } // end of apply - /** * Construct a vertex set from an RDD using an existing index. * @@ -642,7 +462,6 @@ object VertexSetRDD { rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = apply(rdd, index, (a:V,b:V) => a) - /** * Construct a vertex set from an RDD using an existing index and a * user defined `combiner` to merge duplicate vertices. @@ -659,8 +478,17 @@ object VertexSetRDD { reduceFunc: (V, V) => V): VertexSetRDD[V] = apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc) - - def aggregate[V: ClassManifest]( + /** + * Construct a vertex set from an RDD of AggregationMsgs + * + * @tparam V the vertex attribute type + * @param rdd the rdd containing vertices + * @param index the index which must be a superset of the vertices + * in RDD + * @param reduceFunc the user defined reduce function used to merge + * duplicate vertex attributes. + */ + private[spark] def aggregate[V: ClassManifest]( rdd: RDD[AggregationMsg[V]], index: VertexSetIndex, reduceFunc: (V, V) => V): VertexSetRDD[V] = { @@ -696,7 +524,6 @@ object VertexSetRDD { new VertexSetRDD(index, values) } - /** * Construct a vertex set from an RDD using an existing index and a * user defined `combiner` to merge duplicate vertices. @@ -767,7 +594,6 @@ object VertexSetRDD { new VertexSetRDD(index, values) } // end of apply - /** * Construct an index of the unique vertices. The resulting index * can be used to build VertexSets over subsets of the vertices in diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index defe34dfc9..693bb888bc 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -12,6 +12,7 @@ import org.apache.spark.util.ClosureCleaner import org.apache.spark.graph._ import org.apache.spark.graph.impl.GraphImpl._ import org.apache.spark.graph.impl.MsgRDDFunctions._ +import org.apache.spark.graph.util.BytecodeUtils import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap} @@ -21,9 +22,9 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa * The Iterator type returned when constructing edge triplets */ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( - val vidToIndex: VertexIdToIndexMap, - val vertexArray: Array[VD], - val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] { + val vidToIndex: VertexIdToIndexMap, + val vertexArray: Array[VD], + val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] { private var pos = 0 private val et = new EdgeTriplet[VD, ED] @@ -62,27 +63,25 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( /** * A Graph RDD that supports computation on graphs. + * + * @param localVidMap Stores the location of vertex attributes after they are + * replicated. Within each partition, localVidMap holds a map from vertex ID to + * the index where that vertex's attribute is stored. This index refers to the + * arrays in the same partition in the variants of + * [[VTableReplicatedValues]]. Therefore, localVidMap can be reused across + * changes to the vertex attributes. */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vTable: VertexSetRDD[VD], - @transient val vid2pid: VertexSetRDD[Array[Pid]], + @transient val vid2pid: Vid2Pid, @transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)], @transient val eTable: RDD[(Pid, EdgePartition[ED])] ) extends Graph[VD, ED] { def this() = this(null, null, null, null) - /** - * (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the - * vertex data after it is replicated. Within each partition, it holds a map - * from vertex ID to the index where that vertex's attribute is stored. This - * index refers to an array in the same partition in vTableReplicatedValues. - * - * (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data - * and is arranged as described above. - */ - @transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] = - createVTableReplicated(vTable, vid2pid, localVidMap) + @transient val vTableReplicatedValues: VTableReplicatedValues[VD] = + new VTableReplicatedValues(vTable, vid2pid, localVidMap) /** Return a RDD of vertices. */ @transient override val vertices = vTable @@ -94,7 +93,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( /** Return a RDD that brings edges with its source and destination vertices together. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = - makeTriplets(localVidMap, vTableReplicatedValues, eTable) + makeTriplets(localVidMap, vTableReplicatedValues.bothAttrs, eTable) override def persist(newLevel: StorageLevel): Graph[VD, ED] = { eTable.persist(newLevel) @@ -110,15 +109,22 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def statistics: Map[String, Any] = { val numVertices = this.numVertices val numEdges = this.numEdges - val replicationRatio = - vid2pid.map(kv => kv._2.size).sum / vTable.count + val replicationRatioBothAttrs = + vid2pid.bothAttrs.map(kv => kv._2.size).sum / numVertices + val replicationRatioSrcAttrOnly = + vid2pid.srcAttrOnly.map(kv => kv._2.size).sum / numVertices + val replicationRatioDstAttrOnly = + vid2pid.dstAttrOnly.map(kv => kv._2.size).sum / numVertices val loadArray = eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges) val minLoad = loadArray.min val maxLoad = loadArray.max Map( "Num Vertices" -> numVertices, "Num Edges" -> numEdges, - "Replication" -> replicationRatio, "Load Array" -> loadArray, + "Replication (both)" -> replicationRatioBothAttrs, + "Replication (src only)" -> replicationRatioSrcAttrOnly, + "Replication (dest only)" -> replicationRatioDstAttrOnly, + "Load Array" -> loadArray, "Min Load" -> minLoad, "Max Load" -> maxLoad) } @@ -161,18 +167,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( traverseLineage(vTable, " ", visited) visited += (vTable.id -> "vTable") - println("\n\nvid2pid -----------------------------------------") - traverseLineage(vid2pid, " ", visited) - visited += (vid2pid.id -> "vid2pid") - visited += (vid2pid.valuesRDD.id -> "vid2pid.values") + println("\n\nvid2pid.bothAttrs -------------------------------") + traverseLineage(vid2pid.bothAttrs, " ", visited) + visited += (vid2pid.bothAttrs.id -> "vid2pid") + visited += (vid2pid.bothAttrs.valuesRDD.id -> "vid2pid.bothAttrs") println("\n\nlocalVidMap -------------------------------------") traverseLineage(localVidMap, " ", visited) visited += (localVidMap.id -> "localVidMap") - println("\n\nvTableReplicatedValues --------------------------") - traverseLineage(vTableReplicatedValues, " ", visited) - visited += (vTableReplicatedValues.id -> "vTableReplicatedValues") + println("\n\nvTableReplicatedValues.bothAttrs ----------------") + traverseLineage(vTableReplicatedValues.bothAttrs, " ", visited) + visited += (vTableReplicatedValues.bothAttrs.id -> "vTableReplicatedValues.bothAttrs") println("\n\ntriplets ----------------------------------------") traverseLineage(triplets, " ", visited) @@ -232,7 +238,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Construct the Vid2Pid map. Here we assume that the filter operation // behaves deterministically. // @todo reindex the vertex and edge tables - val newVid2Pid = createVid2Pid(newETable, newVTable.index) + val newVid2Pid = new Vid2Pid(newETable, newVTable.index) val newVidMap = createLocalVidMap(newETable) new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable) @@ -328,7 +334,7 @@ object GraphImpl { * */ val etable = createETable(edges) - val vid2pid = createVid2Pid(etable, vtable.index) + val vid2pid = new Vid2Pid(etable, vtable.index) val localVidMap = createLocalVidMap(etable) new GraphImpl(vtable, vid2pid, localVidMap, etable) } @@ -367,24 +373,9 @@ object GraphImpl { }, preservesPartitioning = true).cache() } - protected def createVid2Pid[ED: ClassManifest]( - eTable: RDD[(Pid, EdgePartition[ED])], - vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = { - val preAgg = eTable.mapPartitions { iter => - val (pid, edgePartition) = iter.next() - val vSet = new VertexSet - edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)}) - vSet.iterator.map { vid => (vid.toLong, pid) } - }.partitionBy(vTableIndex.rdd.partitioner.get) - VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, - (p: Pid) => ArrayBuffer(p), - (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab}, - (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b) - .mapValues(a => a.toArray).cache() - } - - protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]): - RDD[(Pid, VertexIdToIndexMap)] = { + private def createLocalVidMap( + eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED } + ): RDD[(Pid, VertexIdToIndexMap)] = { eTable.mapPartitions( _.map{ case (pid, epart) => val vidToIndex = new VertexIdToIndexMap epart.foreach{ e => @@ -395,36 +386,6 @@ object GraphImpl { }, preservesPartitioning = true).cache() } - protected def createVTableReplicated[VD: ClassManifest]( - vTable: VertexSetRDD[VD], - vid2pid: VertexSetRDD[Array[Pid]], - replicationMap: RDD[(Pid, VertexIdToIndexMap)]): - RDD[(Pid, Array[VD])] = { - // Join vid2pid and vTable, generate a shuffle dependency on the joined - // result, and get the shuffle id so we can use it on the slave. - val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) => - // TODO(rxin): reuse VertexBroadcastMessage - pids.iterator.map { pid => - new VertexBroadcastMsg[VD](pid, vid, vdata) - } - }.partitionBy(replicationMap.partitioner.get).cache() - - replicationMap.zipPartitions(msgsByPartition){ - (mapIter, msgsIter) => - val (pid, vidToIndex) = mapIter.next() - assert(!mapIter.hasNext) - // Populate the vertex array using the vidToIndex map - val vertexArray = new Array[VD](vidToIndex.capacity) - for (msg <- msgsIter) { - val ind = vidToIndex.getPos(msg.vid) & OpenHashSet.POSITION_MASK - vertexArray(ind) = msg.data - } - Iterator((pid, vertexArray)) - }.cache() - - // @todo assert edge table has partitioner - } - def makeTriplets[VD: ClassManifest, ED: ClassManifest]( localVidMap: RDD[(Pid, VertexIdToIndexMap)], vTableReplicatedValues: RDD[(Pid, Array[VD]) ], @@ -441,7 +402,7 @@ object GraphImpl { def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest]( g: GraphImpl[VD, ED], f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { - val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){ + val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues.bothAttrs){ (edgePartitionIter, vidToIndexIter, vertexArrayIter) => val (pid, edgePartition) = edgePartitionIter.next() val (_, vidToIndex) = vidToIndexIter.next() @@ -467,8 +428,16 @@ object GraphImpl { ClosureCleaner.clean(mapFunc) ClosureCleaner.clean(reduceFunc) + // For each vertex, replicate its attribute only to partitions where it is + // in the relevant position in an edge. + val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr") + val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr") + // Map and preaggregate - val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){ + val preAgg = g.eTable.zipPartitions( + g.localVidMap, + g.vTableReplicatedValues.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr) + ){ (edgePartitionIter, vidToIndexIter, vertexArrayIter) => val (_, edgePartition) = edgePartitionIter.next() val (_, vidToIndex) = vidToIndexIter.next() @@ -487,8 +456,12 @@ object GraphImpl { edgePartition.foreach { e => et.set(e) - et.srcAttr = vmap(e.srcId) - et.dstAttr = vmap(e.dstId) + if (mapFuncUsesSrcAttr) { + et.srcAttr = vmap(e.srcId) + } + if (mapFuncUsesDstAttr) { + et.dstAttr = vmap(e.dstId) + } // TODO(rxin): rewrite the foreach using a simple while loop to speed things up. // Also given we are only allowing zero, one, or two messages, we can completely unroll // the for loop. @@ -591,4 +564,13 @@ object GraphImpl { math.abs((lower, higher).hashCode()) % numParts } + private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest]( + closure: AnyRef, attrName: String): Boolean = { + try { + BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName) + } catch { + case _: ClassNotFoundException => true // if we don't know, be conservative + } + } + } // end of object GraphImpl diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicatedValues.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicatedValues.scala new file mode 100644 index 0000000000..fee2d40ee4 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicatedValues.scala @@ -0,0 +1,84 @@ +package org.apache.spark.graph.impl + +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.util.collection.{OpenHashSet, PrimitiveKeyOpenHashMap} + +import org.apache.spark.graph._ +import org.apache.spark.graph.impl.MsgRDDFunctions._ + +/** + * Stores the vertex attribute values after they are replicated. See + * the description of localVidMap in [[GraphImpl]]. + */ +class VTableReplicatedValues[VD: ClassManifest]( + vTable: VertexSetRDD[VD], + vid2pid: Vid2Pid, + localVidMap: RDD[(Pid, VertexIdToIndexMap)]) { + + val bothAttrs: RDD[(Pid, Array[VD])] = + VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, true) + val srcAttrOnly: RDD[(Pid, Array[VD])] = + VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, false) + val dstAttrOnly: RDD[(Pid, Array[VD])] = + VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, true) + val noAttrs: RDD[(Pid, Array[VD])] = + VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, false) + + + def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[(Pid, Array[VD])] = + (includeSrcAttr, includeDstAttr) match { + case (true, true) => bothAttrs + case (true, false) => srcAttrOnly + case (false, true) => dstAttrOnly + case (false, false) => noAttrs + } +} + +class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) + +object VTableReplicatedValues { + protected def createVTableReplicated[VD: ClassManifest]( + vTable: VertexSetRDD[VD], + vid2pid: Vid2Pid, + localVidMap: RDD[(Pid, VertexIdToIndexMap)], + includeSrcAttr: Boolean, + includeDstAttr: Boolean): RDD[(Pid, Array[VD])] = { + + val pid2vid = vid2pid.getPid2Vid(includeSrcAttr, includeDstAttr) + + val msgsByPartition = pid2vid.zipPartitions(vTable.index.rdd, vTable.valuesRDD) { + (pid2vidIter, indexIter, valuesIter) => + val pid2vid = pid2vidIter.next() + val index = indexIter.next() + val values = valuesIter.next() + val vmap = new PrimitiveKeyOpenHashMap(index, values._1) + + // Send each partition the vertex attributes it wants + val output = new Array[(Pid, VertexAttributeBlock[VD])](pid2vid.size) + for (pid <- 0 until pid2vid.size) { + val block = new VertexAttributeBlock(pid2vid(pid), pid2vid(pid).map(vid => vmap(vid))) + output(pid) = (pid, block) + } + output.iterator + }.partitionBy(localVidMap.partitioner.get).cache() + + localVidMap.zipPartitions(msgsByPartition){ + (mapIter, msgsIter) => + val (pid, vidToIndex) = mapIter.next() + assert(!mapIter.hasNext) + // Populate the vertex array using the vidToIndex map + val vertexArray = new Array[VD](vidToIndex.capacity) + for ((_, block) <- msgsIter) { + for (i <- 0 until block.vids.size) { + val vid = block.vids(i) + val attr = block.attrs(i) + val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK + vertexArray(ind) = attr + } + } + Iterator((pid, vertexArray)) + }.cache() + } + +} diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Vid2Pid.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Vid2Pid.scala new file mode 100644 index 0000000000..363adbbce9 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/Vid2Pid.scala @@ -0,0 +1,87 @@ +package org.apache.spark.graph.impl + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ArrayBuilder + +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +import org.apache.spark.graph._ + +/** + * Stores the layout of vertex attributes for GraphImpl. + */ +class Vid2Pid( + eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED }, + vTableIndex: VertexSetIndex) { + + val bothAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(true, true) + val srcAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(true, false) + val dstAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(false, true) + val noAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(false, false) + + val pid2VidBothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(bothAttrs) + val pid2VidSrcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(srcAttrOnly) + val pid2VidDstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(dstAttrOnly) + val pid2VidNoAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(noAttrs) + + def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] = + (includeSrcAttr, includeDstAttr) match { + case (true, true) => bothAttrs + case (true, false) => srcAttrOnly + case (false, true) => dstAttrOnly + case (false, false) => noAttrs + } + + def getPid2Vid(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = + (includeSrcAttr, includeDstAttr) match { + case (true, true) => pid2VidBothAttrs + case (true, false) => pid2VidSrcAttrOnly + case (false, true) => pid2VidDstAttrOnly + case (false, false) => pid2VidNoAttrs + } + + def persist(newLevel: StorageLevel) { + bothAttrs.persist(newLevel) + srcAttrOnly.persist(newLevel) + dstAttrOnly.persist(newLevel) + noAttrs.persist(newLevel) + } + + private def createVid2Pid( + includeSrcAttr: Boolean, + includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] = { + val preAgg = eTable.mapPartitions { iter => + val (pid, edgePartition) = iter.next() + val vSet = new VertexSet + if (includeSrcAttr || includeDstAttr) { + edgePartition.foreach(e => { + if (includeSrcAttr) vSet.add(e.srcId) + if (includeDstAttr) vSet.add(e.dstId) + }) + } + vSet.iterator.map { vid => (vid.toLong, pid) } + } + VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, + (p: Pid) => ArrayBuffer(p), + (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab}, + (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b) + .mapValues(a => a.toArray).cache() + } + + /** + * Creates an intermediate pid2vid structure that tells each partition of the + * vertex data where it should go. + */ + private def createPid2Vid(vid2pid: VertexSetRDD[Array[Pid]]): RDD[Array[Array[Vid]]] = { + val numPartitions = vid2pid.partitions.size + vid2pid.mapPartitions { iter => + val pid2vidLocal = Array.fill[ArrayBuilder[Vid]](numPartitions)(ArrayBuilder.make[Vid]) + for ((vid, pids) <- iter) { + pids.foreach { pid => pid2vidLocal(pid) += vid } + } + Iterator(pid2vidLocal.map(_.result)) + } + } +} diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index ec548bda16..9c22608554 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -33,6 +33,18 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("mapReduceTriplets") { + withSpark(new SparkContext("local", "test")) { sc => + val n = 3 + val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)))) + + val neighborDegreeSums = star.mapReduceTriplets( + edge => Array((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)), + (a: Int, b: Int) => a + b) + assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) + } + } + test("aggregateNeighbors") { withSpark(new SparkContext("local", "test")) { sc => val n = 3 @@ -87,6 +99,6 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0) } - } - + } + }