Merge branch 'simplify' into clean

This commit is contained in:
Reynold Xin 2013-11-26 13:55:26 -08:00
commit 2d19d0381b
12 changed files with 314 additions and 384 deletions

View file

@ -158,7 +158,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
/** Return the value at the specified position. */
def getValue(pos: Int): T = _data(pos)
def iterator() = new Iterator[T] {
def iterator = new Iterator[T] {
var pos = nextPos(0)
override def hasNext: Boolean = pos != INVALID_POS
override def next(): T = {

View file

@ -229,7 +229,7 @@ object Analytics extends Logging {
// Construct set representations of the neighborhoods
val nbrSets: VertexSetRDD[VertexSet] =
graph.collectNeighborIds(EdgeDirection.Both).mapValuesWithKeys { (vid, nbrs) =>
graph.collectNeighborIds(EdgeDirection.Both).mapValues { (vid, nbrs) =>
val set = new VertexSet(4)
var i = 0
while (i < nbrs.size) {
@ -254,7 +254,7 @@ object Analytics extends Logging {
} else {
(et.dstAttr, et.srcAttr)
}
val iter = smallSet.iterator()
val iter = smallSet.iterator
var counter: Int = 0
while (iter.hasNext) {
val vid = iter.next

View file

@ -1,8 +1,10 @@
package org.apache.spark.graph
import org.apache.spark.graph.impl._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
/**
* The Graph abstractly represents a graph with arbitrary objects
* associated with vertices and edges. The graph provides basic
@ -162,7 +164,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* Construct a new graph with all the edges reversed. If this graph
* contains an edge from a to b then the returned graph contains an
* edge from b to a.
*
*/
def reverse: Graph[VD, ED]
@ -292,9 +293,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
*/
object Graph {
import org.apache.spark.graph.impl._
import org.apache.spark.SparkContext._
/**
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
@ -324,15 +322,10 @@ object Graph {
rawEdges: RDD[(Vid, Vid)],
defaultValue: VD,
uniqueEdges: Boolean,
partitionStrategy: PartitionStrategy):
Graph[VD, Int] = {
partitionStrategy: PartitionStrategy): Graph[VD, Int] = {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue, partitionStrategy)
if (uniqueEdges) {
graph.groupEdges((a,b) => a+b)
} else {
graph
}
if (uniqueEdges) graph.groupEdges((a, b) => a + b) else graph
}
/**
@ -344,9 +337,8 @@ object Graph {
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = {
def apply[VD: ClassManifest, ED: ClassManifest](edges: RDD[Edge[ED]], defaultValue: VD)
: Graph[VD, ED] = {
Graph(edges, defaultValue, RandomVertexCut())
}

View file

@ -5,13 +5,12 @@ import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graph._
class GraphKryoRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MutableTuple2[Object, Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[AggregationMsg[Object]])

View file

@ -23,23 +23,22 @@ import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.graph.impl.AggregationMsg
import org.apache.spark.graph.impl.MsgRDDFunctions._
import org.apache.spark.graph.impl.VertexPartition
/**
* Maintains the per-partition mapping from vertex id to the corresponding
* location in the per-partition values array. This class is meant to be an
* opaque type.
*
*/
private[graph]
class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
/**
* The persist function behaves like the standard RDD persist
*/
def persist(newLevel: StorageLevel): VertexSetIndex = {
rdd.persist(newLevel)
return this
this
}
/**
@ -80,6 +79,8 @@ class VertexSetRDD[@specialized VD: ClassManifest](
/**
* The `VertexSetIndex` representing the layout of this `VertexSetRDD`.
*/
// TOOD: Consider removing the exposure of index to outside, and implement methods in this
// class to handle any operations that would require indexing.
def index = new VertexSetIndex(partitionsRDD.mapPartitions(_.map(_.index),
preservesPartitioning = true))
@ -134,13 +135,12 @@ class VertexSetRDD[@specialized VD: ClassManifest](
tuples.compute(part, context)
/**
* Return a new VertexSetRDD by applying a function to each VertexPartition of
* this RDD.
* Return a new VertexSetRDD by applying a function to each VertexPartition of this RDD.
*/
def mapVertexPartitions[VD2: ClassManifest](
f: VertexPartition[VD] => VertexPartition[VD2]): VertexSetRDD[VD2] = {
def mapVertexPartitions[VD2: ClassManifest](f: VertexPartition[VD] => VertexPartition[VD2])
: VertexSetRDD[VD2] = {
val cleanF = sparkContext.clean(f)
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(cleanF), preservesPartitioning = true)
new VertexSetRDD(newPartitionsRDD)
}
@ -175,6 +175,7 @@ class VertexSetRDD[@specialized VD: ClassManifest](
* the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
// TODO: Should we consider making pred taking two arguments, instead of a tuple?
override def filter(pred: Tuple2[Vid, VD] => Boolean): VertexSetRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred)))
@ -190,13 +191,26 @@ class VertexSetRDD[@specialized VD: ClassManifest](
* VertexSetRDD retains the same index.
*/
def mapValues[VD2: ClassManifest](f: VD => VD2): VertexSetRDD[VD2] =
this.mapVertexPartitions(_.map { case (vid, attr) => f(attr) })
this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
/**
* Pass each vertex attribute through a map function and retain the
* original RDD's partitioning and index.
*
* @tparam VD2 the type returned by the map function
*
* @param f the function applied to each value in the RDD
* @return a new VertexSetRDD with values obtained by applying `f` to
* each of the entries in the original VertexSet. The resulting
* VertexSetRDD retains the same index.
*/
def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexSetRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
* Fill in missing values for all vertices in the index.
*
* @param missingValue the value to be used for vertices in the
* index that don't currently have values.
* @param missingValue the value to use for vertices that don't currently have values.
* @return A VertexSetRDD with a value for all vertices.
*/
def fillMissing(missingValue: VD): VertexSetRDD[VD] = {
@ -213,29 +227,6 @@ class VertexSetRDD[@specialized VD: ClassManifest](
}
}
/**
* Pass each vertex attribute along with the vertex id through a map
* function and retain the original RDD's partitioning and index.
*
* @tparam VD2 the type returned by the map function
*
* @param f the function applied to each vertex id and vertex
* attribute in the RDD
* @return a new VertexSet with values obtained by applying `f` to
* each of the entries in the original VertexSet. The resulting
* VertexSetRDD retains the same index.
*/
def mapValuesWithKeys[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexSetRDD[VD2] = {
this.mapVertexPartitions { part =>
// Construct a view of the map transformation
val newValues = new Array[VD2](part.index.capacity)
part.mask.iterator.foreach { ind =>
newValues(ind) = f(part.index.getValueSafe(ind), part.values(ind))
}
new VertexPartition(part.index, newValues, part.mask)
}
} // end of mapValuesWithKeys
/**
* Inner join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
@ -269,35 +260,6 @@ class VertexSetRDD[@specialized VD: ClassManifest](
}
}
/**
* Inner join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
* share the same index.
*
* @param other the vertex set to join with this vertex set
* @param f the function mapping a vertex id and its attributes in
* this and the other vertex set to a collection of tuples.
* @tparam VD2 the type of the other vertex set attributes
* @tparam VD3 the type of the tuples emitted by `f`
* @return an RDD containing the tuples emitted by `f`
*/
def zipJoinFlatMap[VD2: ClassManifest, VD3: ClassManifest]
(other: VertexSetRDD[VD2])
(f: (Vid, VD, VD2) => Iterator[VD3]): RDD[VD3] = {
val cleanF = sparkContext.clean(f)
partitionsRDD.zipPartitions(other.partitionsRDD) {
(thisPartIter, otherPartIter) =>
val thisPart = thisPartIter.next()
val otherPart = otherPartIter.next()
if (thisPart.index != otherPart.index) {
throw new SparkException("can't zip join VertexSetRDDs with different indexes")
}
(thisPart.mask & otherPart.mask).iterator.flatMap { ind =>
cleanF(thisPart.index.getValueSafe(ind), thisPart.values(ind), otherPart.values(ind))
}
}
}
/**
* Left join this VertexSet with another VertexSet which has the
* same Index. This function will fail if both VertexSets do not
@ -333,7 +295,6 @@ class VertexSetRDD[@specialized VD: ClassManifest](
}
} // end of leftZipJoin
/**
* Left join this VertexSet with an RDD containing vertex attribute
* pairs. If the other RDD is backed by a VertexSet with the same
@ -343,7 +304,7 @@ class VertexSetRDD[@specialized VD: ClassManifest](
* VertexSet then a `None` attribute is generated
*
* @tparam VD2 the attribute type of the other VertexSet
* @tparam VD2 the attribute type of the resulting VertexSet
* @tparam VD3 the attribute type of the resulting VertexSet
*
* @param other the other VertexSet with which to join.
* @param f the function mapping a vertex id and its attributes in
@ -358,18 +319,14 @@ class VertexSetRDD[@specialized VD: ClassManifest](
(other: RDD[(Vid, VD2)])
(f: (Vid, VD, Option[VD2]) => VD3, merge: (VD2, VD2) => VD2 = (a: VD2, b: VD2) => a)
: VertexSetRDD[VD3] = {
// Test if the other vertex is a VertexSetRDD to choose the optimal
// join strategy
// Test if the other vertex is a VertexSetRDD to choose the optimal join strategy.
// If the other set is a VertexSetRDD then we use the much more efficient leftZipJoin
other match {
// If the other set is a VertexSetRDD then we use the much more efficient
// leftZipJoin
case other: VertexSetRDD[_] => {
case other: VertexSetRDD[VD2] =>
leftZipJoin(other)(f)
}
case _ => {
case _ =>
val indexedOther: VertexSetRDD[VD2] = VertexSetRDD(other, this.index, merge)
leftZipJoin(indexedOther)(f)
}
}
} // end of leftJoin
@ -433,12 +390,11 @@ object VertexSetRDD {
*
* @tparam VD the vertex attribute type
* @param rdd the rdd containing vertices
* @param indexPrototype a VertexSetRDD whose indexes will be reused. The
* @param index a VertexSetRDD whose indexes will be reused. The
* indexes must be a superset of the vertices in rdd
* in RDD
*/
def apply[VD: ClassManifest](
rdd: RDD[(Vid, VD)], index: VertexSetIndex): VertexSetRDD[VD] =
def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)], index: VertexSetIndex): VertexSetRDD[VD] =
apply(rdd, index, (a: VD, b: VD) => a)
/**
@ -447,8 +403,8 @@ object VertexSetRDD {
*
* @tparam VD the vertex attribute type
* @param rdd the rdd containing vertices
* @param indexPrototype a VertexSetRDD whose indexes will be reused. The
* indexes must be a superset of the vertices in rdd
* @param index a VertexSetRDD whose indexes will be reused. The
* indexes must be a superset of the vertices in rdd
* @param reduceFunc the user defined reduce function used to merge
* duplicate vertex attributes.
*/
@ -456,6 +412,7 @@ object VertexSetRDD {
rdd: RDD[(Vid, VD)],
index: VertexSetIndex,
reduceFunc: (VD, VD) => VD): VertexSetRDD[VD] =
// TODO: Considering removing the following apply.
apply(rdd, index, (v: VD) => v, reduceFunc, reduceFunc)
/**
@ -463,7 +420,7 @@ object VertexSetRDD {
*
* @tparam VD the vertex attribute type
* @param rdd the rdd containing vertices
* @param indexPrototype a VertexSetRDD whose indexes will be reused. The
* @param index a VertexSetRDD whose indexes will be reused. The
* indexes must be a superset of the vertices in rdd
* @param reduceFunc the user defined reduce function used to merge
* duplicate vertex attributes.
@ -476,9 +433,7 @@ object VertexSetRDD {
val cReduceFunc = rdd.context.clean(reduceFunc)
assert(rdd.partitioner == Some(index.partitioner))
// Use the index to build the new values table
val partitionsRDD = index.rdd.zipPartitions(
rdd, preservesPartitioning = true
) {
val partitionsRDD = index.rdd.zipPartitions(rdd, preservesPartitioning = true) {
(indexIter, tblIter) =>
// There is only one map
val index = indexIter.next()

View file

@ -1,27 +1,24 @@
package org.apache.spark.graph.impl
import scala.collection.mutable.ArrayBuilder
import org.apache.spark.graph._
import org.apache.spark.util.collection.PrimitiveVector
//private[graph]
class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
ED: ClassManifest]{
val srcIds = new VertexArrayList
val dstIds = new VertexArrayList
var dataBuilder = ArrayBuilder.make[ED]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassManifest] {
val srcIds = new PrimitiveVector[Vid]
val dstIds = new PrimitiveVector[Vid]
var dataBuilder = new PrimitiveVector[ED]
/** Add a new edge to the partition. */
def add(src: Vid, dst: Vid, d: ED) {
srcIds.add(src)
dstIds.add(dst)
srcIds += src
dstIds += dst
dataBuilder += d
}
def toEdgePartition: EdgePartition[ED] = {
new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result())
new EdgePartition(srcIds.trim().array, dstIds.trim().array, dataBuilder.trim().array)
}
}

View file

@ -0,0 +1,60 @@
package org.apache.spark.graph.impl
import scala.collection.mutable
import org.apache.spark.graph._
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
/**
* The Iterator type returned when constructing edge triplets. This class technically could be
* an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
* debug / profile.
*/
private[impl]
class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED])
extends Iterator[EdgeTriplet[VD, ED]] {
// Current position in the array.
private var pos = 0
// A triplet object that this iterator.next() call returns. We reuse this object to avoid
// allocating too many temporary Java objects.
private val triplet = new EdgeTriplet[VD, ED]
private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
override def next() = {
triplet.srcId = edgePartition.srcIds(pos)
// assert(vmap.containsKey(e.src.id))
triplet.srcAttr = vmap(triplet.srcId)
triplet.dstId = edgePartition.dstIds(pos)
// assert(vmap.containsKey(e.dst.id))
triplet.dstAttr = vmap(triplet.dstId)
triplet.attr = edgePartition.data(pos)
pos += 1
triplet
}
// TODO: Why do we need this?
override def toList: List[EdgeTriplet[VD, ED]] = {
val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
val currentEdge = new EdgeTriplet[VD, ED]
for (i <- 0 until edgePartition.size) {
currentEdge.srcId = edgePartition.srcIds(i)
// assert(vmap.containsKey(e.src.id))
currentEdge.srcAttr = vmap(currentEdge.srcId)
currentEdge.dstId = edgePartition.dstIds(i)
// assert(vmap.containsKey(e.dst.id))
currentEdge.dstAttr = vmap(currentEdge.dstId)
currentEdge.attr = edgePartition.data(i)
lb += currentEdge
}
lb.toList
}
}

View file

@ -1,68 +1,17 @@
package org.apache.spark.graph.impl
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
import org.apache.spark.util.ClosureCleaner
import org.apache.spark.SparkException
import org.apache.spark.Partitioner
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
import org.apache.spark.graph.util.BytecodeUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ClosureCleaner
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
/**
* The Iterator type returned when constructing edge triplets
*/
class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
private var pos = 0
private val et = new EdgeTriplet[VD, ED]
private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
override def next() = {
et.srcId = edgePartition.srcIds(pos)
// assert(vmap.containsKey(e.src.id))
et.srcAttr = vmap(et.srcId)
et.dstId = edgePartition.dstIds(pos)
// assert(vmap.containsKey(e.dst.id))
et.dstAttr = vmap(et.dstId)
et.attr = edgePartition.data(pos)
pos += 1
et
}
override def toList: List[EdgeTriplet[VD, ED]] = {
val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
val currentEdge = new EdgeTriplet[VD, ED]
for (i <- (0 until edgePartition.size)) {
currentEdge.srcId = edgePartition.srcIds(i)
// assert(vmap.containsKey(e.src.id))
currentEdge.srcAttr = vmap(currentEdge.srcId)
currentEdge.dstId = edgePartition.dstIds(i)
// assert(vmap.containsKey(e.dst.id))
currentEdge.dstAttr = vmap(currentEdge.dstId)
currentEdge.attr = edgePartition.data(i)
lb += currentEdge
}
lb.toList
}
} // end of Edge Triplet Iterator
/**
* A Graph RDD that supports computation on graphs.
*
@ -95,8 +44,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
eTable.mapPartitions(_.next()._2.iterator, true)
/** Return a RDD that brings edges with its source and destination vertices together. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
makeTriplets(vTableReplicated.bothAttrs, eTable)
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
eTable.zipPartitions(vTableReplicated.bothAttrs) { (eTableIter, vTableReplicatedIter) =>
val (_, edgePartition) = eTableIter.next()
val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
}
}
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
vTable.persist(newLevel)
@ -108,25 +62,29 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
override def statistics: Map[String, Any] = {
val numVertices = this.numVertices
val numEdges = this.numEdges
val replicationRatioBothAttrs =
vertexPlacement.bothAttrs.map(_.map(_.size).sum).sum / numVertices
val replicationRatioSrcAttrOnly =
vertexPlacement.srcAttrOnly.map(_.map(_.size).sum).sum / numVertices
val replicationRatioDstAttrOnly =
vertexPlacement.dstAttrOnly.map(_.map(_.size).sum).sum / numVertices
val loadArray =
eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
// Get the total number of vertices after replication, used to compute the replication ratio.
def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = {
vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
}
val numVertices = this.ops.numVertices
val numEdges = this.ops.numEdges
val replicationRatioBoth = numReplicatedVertices(vertexPlacement.bothAttrs) / numVertices
val replicationRatioSrcOnly = numReplicatedVertices(vertexPlacement.srcAttrOnly) / numVertices
val replicationRatioDstOnly = numReplicatedVertices(vertexPlacement.dstAttrOnly) / numVertices
// One entry for each partition, indicate the total number of edges on that partition.
val loadArray = eTable.map { case (_, e) => e.size }.collect().map(_.toDouble / numEdges)
val minLoad = loadArray.min
val maxLoad = loadArray.max
Map(
"Num Vertices" -> numVertices, "Num Edges" -> numEdges,
"Replication (both)" -> replicationRatioBothAttrs,
"Replication (src only)" -> replicationRatioSrcAttrOnly,
"Replication (dest only)" -> replicationRatioDstAttrOnly,
"Num Vertices" -> numVertices,
"Num Edges" -> numEdges,
"Replication (both)" -> replicationRatioBoth,
"Replication (src only)" -> replicationRatioSrcOnly,
"Replication (dest only)" -> replicationRatioDstOnly,
"Load Array" -> loadArray,
"Min Load" -> minLoad, "Max Load" -> maxLoad)
"Min Load" -> minLoad,
"Max Load" -> maxLoad)
}
/**
@ -137,7 +95,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
rdd: RDD[_],
indent: String = "",
visited: Map[Int, String] = Map.empty[Int, String]) {
if(visited.contains(rdd.id)) {
if (visited.contains(rdd.id)) {
println(indent + visited(rdd.id))
println(indent)
} else {
@ -169,7 +127,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
println(visited)
} // end of print lineage
} // end of printLineage
override def reverse: Graph[VD, ED] = {
val newETable = eTable.mapPartitions(_.map { case (pid, epart) => (pid, epart.reverse) },
@ -186,12 +144,31 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
new GraphImpl(vTable, newETable, vertexPlacement, partitioner)
}
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] =
GraphImpl.mapTriplets(this, f)
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
// manifest from GraphImpl (which would require serializing GraphImpl).
val vdManifest = classManifest[VD]
val newETable = eTable.zipPartitions(vTableReplicated.bothAttrs, preservesPartitioning = true) {
(eTableIter, vTableReplicatedIter) =>
val (pid, edgePartition) = eTableIter.next()
val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
val et = new EdgeTriplet[VD, ED]
val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](
vidToIndex, vertexArray)(classManifest[Vid], vdManifest)
val newEdgePartition = edgePartition.map { e =>
et.set(e)
et.srcAttr = vmap(e.srcId)
et.dstAttr = vmap(e.dstId)
f(et)
}
Iterator((pid, newEdgePartition))
}
new GraphImpl(vTable, newETable, vertexPlacement, partitioner)
}
override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
vpred: (Vid, VD) => Boolean = ((a,b) => true)): Graph[VD, ED] = {
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
vpred: (Vid, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
// Filter the vertices, reusing the partitioner (but not the index) from
// this graph
@ -222,8 +199,68 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A): VertexSetRDD[A] =
GraphImpl.mapReduceTriplets(this, mapFunc, reduceFunc)
reduceFunc: (A, A) => A): VertexSetRDD[A] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// Use explicit manifest in PrimitiveKeyOpenHashMap so we don't have to serialize GraphImpl.
val vdManifest = classManifest[VD]
// For each vertex, replicate its attribute only to partitions where it is
// in the relevant position in an edge.
val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
val vs = vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
// Map and combine.
val preAgg = eTable.zipPartitions(vs) { (edgePartitionIter, vTableReplicatedIter) =>
val (_, edgePartition) = edgePartitionIter.next()
val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
assert(vidToIndex.capacity == vertexArray.size)
val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)(
classManifest[Vid], vdManifest)
// Note: This doesn't allow users to send messages to arbitrary vertices.
val msgArray = new Array[A](vertexArray.size)
val msgBS = new BitSet(vertexArray.size)
// Iterate over the partition
val et = new EdgeTriplet[VD, ED]
edgePartition.foreach { e =>
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vmap(e.srcId)
}
if (mapUsesDstAttr) {
et.dstAttr = vmap(e.dstId)
}
// TODO(rxin): rewrite the foreach using a simple while loop to speed things up.
// Also given we are only allowing zero, one, or two messages, we can completely unroll
// the for loop.
mapFunc(et).foreach { case (vid, msg) =>
// verify that the vid is valid
assert(vid == et.srcId || vid == et.dstId)
// Get the index of the key
val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
// Populate the aggregator map
if (msgBS.get(ind)) {
msgArray(ind) = reduceFunc(msgArray(ind), msg)
} else {
msgArray(ind) = msg
msgBS.set(ind)
}
}
}
// construct an iterator of tuples Iterator[(Vid, A)]
msgBS.iterator.map { ind =>
new AggregationMsg[A](vidToIndex.getValue(ind), msgArray(ind))
}
}.partitionBy(vTable.partitioner.get)
// do the final reduction reusing the index map
VertexSetRDD.aggregate(preAgg, vTable.index, reduceFunc)
} // end of mapReduceTriplets
override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
(updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
@ -255,29 +292,6 @@ object GraphImpl {
new GraphImpl(vtable, etable, vertexPlacement, partitionStrategy)
}
// def apply[VD: ClassManifest, ED: ClassManifest](
// vertices: RDD[(Vid, VD)],
// edges: RDD[Edge[ED]],
// defaultVertexAttr: VD): GraphImpl[VD,ED] = {
// apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut())
// }
// def apply[VD: ClassManifest, ED: ClassManifest](
// vertices: RDD[(Vid, VD)],
// edges: RDD[Edge[ED]],
// defaultVertexAttr: VD,
// partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = {
// apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy)
// }
// def apply[VD: ClassManifest, ED: ClassManifest](
// vertices: RDD[(Vid, VD)],
// edges: RDD[Edge[ED]],
// defaultVertexAttr: VD,
// mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
// apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut())
// }
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
@ -336,100 +350,7 @@ object GraphImpl {
}, preservesPartitioning = true).cache()
}
protected def makeTriplets[VD: ClassManifest, ED: ClassManifest](
vTableReplicated: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))],
eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
eTable.zipPartitions(vTableReplicated) {
(eTableIter, vTableReplicatedIter) =>
val (_, edgePartition) = eTableIter.next()
val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
}
}
protected def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
g: GraphImpl[VD, ED],
f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
val newETable = g.eTable.zipPartitions(
g.vTableReplicated.bothAttrs, preservesPartitioning = true
) {
(eTableIter, vTableReplicatedIter) =>
val (pid, edgePartition) = eTableIter.next()
val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
val et = new EdgeTriplet[VD, ED]
val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
val newEdgePartition = edgePartition.map { e =>
et.set(e)
et.srcAttr = vmap(e.srcId)
et.dstAttr = vmap(e.dstId)
f(et)
}
Iterator((pid, newEdgePartition))
}
new GraphImpl(g.vTable, newETable, g.vertexPlacement, g.partitioner)
}
protected def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
g: GraphImpl[VD, ED],
mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A): VertexSetRDD[A] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// For each vertex, replicate its attribute only to partitions where it is
// in the relevant position in an edge.
val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
// Map and preaggregate
val preAgg = g.eTable.zipPartitions(
g.vTableReplicated.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr)
) {
(edgePartitionIter, vTableReplicatedIter) =>
val (_, edgePartition) = edgePartitionIter.next()
val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
assert(vidToIndex.capacity == vertexArray.size)
val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
// TODO(jegonzal): This doesn't allow users to send messages to arbitrary vertices.
val msgArray = new Array[A](vertexArray.size)
val msgBS = new BitSet(vertexArray.size)
// Iterate over the partition
val et = new EdgeTriplet[VD, ED]
edgePartition.foreach { e =>
et.set(e)
if (mapFuncUsesSrcAttr) {
et.srcAttr = vmap(e.srcId)
}
if (mapFuncUsesDstAttr) {
et.dstAttr = vmap(e.dstId)
}
// TODO(rxin): rewrite the foreach using a simple while loop to speed things up.
// Also given we are only allowing zero, one, or two messages, we can completely unroll
// the for loop.
mapFunc(et).foreach { case (vid, msg) =>
// verify that the vid is valid
assert(vid == et.srcId || vid == et.dstId)
// Get the index of the key
val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
// Populate the aggregator map
if (msgBS.get(ind)) {
msgArray(ind) = reduceFunc(msgArray(ind), msg)
} else {
msgArray(ind) = msg
msgBS.set(ind)
}
}
}
// construct an iterator of tuples Iterator[(Vid, A)]
msgBS.iterator.map { ind =>
new AggregationMsg[A](vidToIndex.getValue(ind), msgArray(ind))
}
}.partitionBy(g.vTable.partitioner.get)
// do the final reduction reusing the index map
VertexSetRDD.aggregate(preAgg, g.vTable.index, reduceFunc)
}
private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](
closure: AnyRef, attrName: String): Boolean = {
private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
try {
BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
} catch {

View file

@ -5,61 +5,60 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.{OpenHashSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.graph._
import org.apache.spark.graph.impl.MsgRDDFunctions._
/**
* Stores the vertex attribute values after they are replicated.
*/
private[impl]
class VTableReplicated[VD: ClassManifest](
vTable: VertexSetRDD[VD],
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
vertexPlacement: VertexPlacement) {
val bothAttrs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
VTableReplicated.createVTableReplicated(vTable, eTable, vertexPlacement, true, true)
createVTableReplicated(vTable, eTable, vertexPlacement, true, true)
val srcAttrOnly: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
VTableReplicated.createVTableReplicated(vTable, eTable, vertexPlacement, true, false)
createVTableReplicated(vTable, eTable, vertexPlacement, true, false)
val dstAttrOnly: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
VTableReplicated.createVTableReplicated(vTable, eTable, vertexPlacement, false, true)
createVTableReplicated(vTable, eTable, vertexPlacement, false, true)
val noAttrs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
VTableReplicated.createVTableReplicated(vTable, eTable, vertexPlacement, false, false)
createVTableReplicated(vTable, eTable, vertexPlacement, false, false)
def get(includeSrcAttr: Boolean, includeDstAttr: Boolean)
: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
(includeSrcAttr, includeDstAttr) match {
def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = {
(includeSrc, includeDst) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
case (false, true) => dstAttrOnly
case (false, false) => noAttrs
}
}
}
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD])
private def createVTableReplicated[VD: ClassManifest](
vTable: VertexSetRDD[VD],
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
vertexPlacement: VertexPlacement,
includeSrcAttr: Boolean,
includeDstAttr: Boolean): RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = {
object VTableReplicated {
protected def createVTableReplicated[VD: ClassManifest](
vTable: VertexSetRDD[VD],
eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
vertexPlacement: VertexPlacement,
includeSrcAttr: Boolean,
includeDstAttr: Boolean): RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = {
val placement = vertexPlacement.get(includeSrcAttr, includeDstAttr)
// Send each edge partition the vertex attributes it wants, as specified in
// vertexPlacement
val msgsByPartition = placement.zipPartitions(vTable.partitionsRDD) {
(pid2vidIter, vertexPartIter) =>
val pid2vid = pid2vidIter.next()
val vertexPart = vertexPartIter.next()
val pid2vid = pid2vidIter.next()
val vertexPart = vertexPartIter.next()
val vmap = new PrimitiveKeyOpenHashMap(vertexPart.index, vertexPart.values)
val output = new Array[(Pid, VertexAttributeBlock[VD])](pid2vid.size)
for (pid <- 0 until pid2vid.size) {
val block = new VertexAttributeBlock(pid2vid(pid), pid2vid(pid).map(vid => vmap(vid)))
output(pid) = (pid, block)
}
output.iterator
val vmap = new PrimitiveKeyOpenHashMap(vertexPart.index, vertexPart.values)
val output = new Array[(Pid, VertexAttributeBlock[VD])](pid2vid.size)
for (pid <- 0 until pid2vid.size) {
val block = new VertexAttributeBlock(pid2vid(pid), pid2vid(pid).map(vid => vmap(vid)))
output(pid) = (pid, block)
}
output.iterator
}.partitionBy(eTable.partitioner.get).cache()
// Within each edge partition, create a local map from vid to an index into
@ -81,20 +80,22 @@ object VTableReplicated {
// msgsByPartition into the correct locations specified in localVidMap
localVidMap.zipPartitions(msgsByPartition) {
(mapIter, msgsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.capacity)
for ((_, block) <- msgsIter) {
for (i <- 0 until block.vids.size) {
val vid = block.vids(i)
val attr = block.attrs(i)
val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
vertexArray(ind) = attr
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.capacity)
for ((_, block) <- msgsIter) {
for (i <- 0 until block.vids.size) {
val vid = block.vids(i)
val attr = block.attrs(i)
val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
vertexArray(ind) = attr
}
}
}
Iterator((pid, (vidToIndex, vertexArray)))
Iterator((pid, (vidToIndex, vertexArray)))
}.cache()
}
}
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD])

View file

@ -4,11 +4,17 @@ import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.graph._
class VertexPartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest](
private[graph]
class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
val index: VertexIdToIndexMap,
val values: Array[VD],
val mask: BitSet) {
// TODO: Encapsulate the internal data structures in this class so callers don't need to
// understand the internal data structures. This can possibly be achieved by implementing
// the aggregate and join functions in this class, and VertexSetRDD can simply call into
// that.
/**
* Pass each vertex attribute along with the vertex id through a map
* function and retain the original RDD's partitioning and index.
@ -25,42 +31,39 @@ class VertexPartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double
def map[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexPartition[VD2] = {
// Construct a view of the map transformation
val newValues = new Array[VD2](index.capacity)
mask.iterator.foreach { ind =>
newValues(ind) = f(index.getValueSafe(ind), values(ind))
var i = mask.nextSetBit(0)
while (i >= 0) {
newValues(i) = f(index.getValue(i), values(i))
i = mask.nextSetBit(i + 1)
}
new VertexPartition[VD2](index, newValues, mask)
}
/**
* Restrict the vertex set to the set of vertices satisfying the
* given predicate.
* Restrict the vertex set to the set of vertices satisfying the given predicate.
*
* @param pred the user defined predicate
*
* @note The vertex set preserves the original index structure
* which means that the returned RDD can be easily joined with
* the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
* @note The vertex set preserves the original index structure which means that the returned
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
def filter(pred: (Vid, VD) => Boolean): VertexPartition[VD] = {
// Allocate the array to store the results into
val newMask = new BitSet(index.capacity)
// Iterate over the active bits in the old bitset and
// evaluate the predicate
var ind = mask.nextSetBit(0)
while (ind >= 0) {
val k = index.getValueSafe(ind)
if (pred(k, values(ind))) {
newMask.set(ind)
// Iterate over the active bits in the old mask and evaluate the predicate
var i = mask.nextSetBit(0)
while (i >= 0) {
if (pred(index.getValue(i), values(i))) {
newMask.set(i)
}
ind = mask.nextSetBit(ind + 1)
i = mask.nextSetBit(i + 1)
}
new VertexPartition(index, values, newMask)
}
/**
* Construct a new VertexPartition whose index contains only the vertices in
* the mask.
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): VertexPartition[VD] = {
val hashMap = new PrimitiveKeyOpenHashMap[Vid, VD]
@ -68,8 +71,9 @@ class VertexPartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double
for ((k, v) <- this.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
}
// TODO: Is this a bug? Why are we using index.getBitSet here?
new VertexPartition(hashMap.keySet, hashMap._values, index.getBitSet)
}
def iterator = mask.iterator.map(ind => (index.getValueSafe(ind), values(ind)))
def iterator = mask.iterator.map(ind => (index.getValue(ind), values(ind)))
}

View file

@ -1,6 +1,5 @@
package org.apache.spark.graph.impl
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuilder
@ -39,18 +38,26 @@ class VertexPlacement(
private def createPid2Vid(
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
// Determine which vertices each edge partition needs by creating a mapping
// from vid to pid
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
val preAgg = eTable.mapPartitions { iter =>
val (pid, edgePartition) = iter.next()
val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
if (includeSrcAttr || includeDstAttr) {
edgePartition.foreach { e =>
if (includeSrcAttr) vSet.add(e.srcId)
if (includeDstAttr) vSet.add(e.dstId)
if (includeSrcAttr) { // Add src vertices to the set.
var i = 0
while (i < numEdges) {
vSet.add(edgePartition.srcIds(i))
i += 1
}
}
vSet.iterator.map { vid => (vid.toLong, pid) }
if (includeDstAttr) { // Add dst vertices to the set.
var i = 0
while (i < numEdges) {
vSet.add(edgePartition.dstIds(i))
i += 1
}
}
vSet.iterator.map { vid => (vid, pid) }
}
// Aggregate the mappings to determine where each vertex should go
val vid2pid = VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTable.index,

View file

@ -6,10 +6,11 @@ import org.apache.spark.util.collection.OpenHashSet
package object graph {
type Vid = Long
// TODO: Consider using Char.
type Pid = Int
type VertexSet = OpenHashSet[Vid]
type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList
// type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
type VertexIdToIndexMap = OpenHashSet[Vid]
@ -18,11 +19,4 @@ package object graph {
* Return the default null-like value for a data type T.
*/
def nullValue[T] = null.asInstanceOf[T]
private[graph]
case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U,
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) V](
var _1: U, var _2: V)
}