More, bigger cleaning for better encapsulation of VertexSetRDD and VertexPartition. This is work in progress as stuff doesn't really run.

This commit is contained in:
Reynold Xin 2013-11-27 00:30:26 -08:00
parent caba162861
commit 95e83af209
14 changed files with 203 additions and 363 deletions

View file

@ -71,7 +71,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
/** Set the value for a key */
def setMerge(k: K, v: V, mergeF: (V,V) => V) {
def setMerge(k: K, v: V, mergeF: (V, V) => V) {
val pos = keySet.addWithoutResize(k)
val ind = pos & OpenHashSet.POSITION_MASK
if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add

View file

@ -301,10 +301,10 @@ object Analytics extends Logging {
def pickPartitioner(v: String): PartitionStrategy = {
v match {
case "RandomVertexCut" => RandomVertexCut()
case "EdgePartition1D" => EdgePartition1D()
case "EdgePartition2D" => EdgePartition2D()
case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut()
case "RandomVertexCut" => RandomVertexCut
case "EdgePartition1D" => EdgePartition1D
case "EdgePartition2D" => EdgePartition2D
case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v)
}
}
@ -324,7 +324,7 @@ object Analytics extends Logging {
var outFname = ""
var numVPart = 4
var numEPart = 4
var partitionStrategy: PartitionStrategy = RandomVertexCut()
var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
case ("numIter", v) => numIter = v.toInt
@ -379,7 +379,7 @@ object Analytics extends Logging {
var numVPart = 4
var numEPart = 4
var isDynamic = false
var partitionStrategy: PartitionStrategy = RandomVertexCut()
var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
case ("numIter", v) => numIter = v.toInt
@ -413,7 +413,7 @@ object Analytics extends Logging {
case "triangles" => {
var numVPart = 4
var numEPart = 4
var partitionStrategy: PartitionStrategy = RandomVertexCut()
var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
case ("numEPart", v) => numEPart = v.toInt

View file

@ -287,23 +287,10 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
/**
* The Graph object contains a collection of routines used to
* construct graphs from RDDs.
*
* The Graph object contains a collection of routines used to construct graphs from RDDs.
*/
object Graph {
/**
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges the RDD containing the set of edges in the graph
*
* @return a graph with edge attributes containing the count of duplicate edges.
*/
def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): Graph[VD, Int] = {
Graph(rawEdges, defaultValue, false, RandomVertexCut())
}
/**
* Construct a graph from a collection of edges encoded as vertex id
* pairs.
@ -316,13 +303,12 @@ object Graph {
* @return a graph with edge attributes containing either the count
* of duplicate edges or 1 (if `uniqueEdges=false`) and vertex
* attributes containing the total degree of each vertex.
*
*/
def apply[VD: ClassManifest](
def fromEdgeTuples[VD: ClassManifest](
rawEdges: RDD[(Vid, Vid)],
defaultValue: VD,
uniqueEdges: Boolean,
partitionStrategy: PartitionStrategy): Graph[VD, Int] = {
uniqueEdges: Boolean = false,
partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, Int] = {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue, partitionStrategy)
if (uniqueEdges) graph.groupEdges((a, b) => a + b) else graph
@ -337,106 +323,42 @@ object Graph {
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def apply[VD: ClassManifest, ED: ClassManifest](edges: RDD[Edge[ED]], defaultValue: VD)
: Graph[VD, ED] = {
Graph(edges, defaultValue, RandomVertexCut())
}
/**
* Construct a graph from a collection of edges.
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def apply[VD: ClassManifest, ED: ClassManifest](
def fromEdges[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD,
partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, partitionStrategy)
}
/**
* Construct a graph from a collection attributed vertices and
* edges.
*
* @note Duplicate vertices are removed arbitrarily and missing
* vertices (vertices in the edge collection that are not in the
* vertex collection) are replaced by null vertex attributes.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph
*
*/
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]]): Graph[VD, ED] = {
val defaultAttr: VD = null.asInstanceOf[VD]
Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a, RandomVertexCut())
}
/**
* Construct a graph from a collection attributed vertices and
* edges. Duplicate vertices are combined using the `mergeFunc` and
* edges. Duplicate vertices are picked arbitrarily and
* vertices found in the edge collection but not in the input
* vertices are the default attribute `defautVertexAttr`.
*
* @note Duplicate vertices are removed arbitrarily .
* vertices are the default attribute.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for
* vertices that are mentioned in `edges` but not in `vertices`
*
*/
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): Graph[VD, ED] = {
Graph(vertices, edges, defaultVertexAttr, (a,b) => a, RandomVertexCut())
}
/**
* Construct a graph from a collection attributed vertices and
* edges. Duplicate vertices are combined using the `mergeFunc` and
* vertices found in the edge collection but not in the input
* vertices are the default attribute `defautVertexAttr`.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for
* vertices that are mentioned in `edges` but not in `vertices
* @param mergeFunc the function used to merge duplicate vertices
* in the `vertices` collection.
* vertices that are mentioned in edges but not in vertices
* @param partitionStrategy the partition strategy to use when
* partitioning the edges.
*
*/
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)],
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD,
partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc, partitionStrategy)
defaultVertexAttr: VD = null.asInstanceOf[VD],
partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, partitionStrategy)
}
/**
* The implicit graphToGraphOPs function extracts the GraphOps
* member from a graph.
* The implicit graphToGraphOPs function extracts the GraphOps member from a graph.
*
* To improve modularity the Graph type only contains a small set of
* basic operations. All the convenience operations are defined in
* the GraphOps class which may be shared across multiple graph
* implementations.
* To improve modularity the Graph type only contains a small set of basic operations. All the
* convenience operations are defined in the GraphOps class which may be shared across multiple
* graph implementations.
*/
implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops
} // end of Graph object

View file

@ -26,7 +26,7 @@ object GraphLoader {
path: String,
edgeParser: Array[String] => ED,
minEdgePartitions: Int = 1,
partitionStrategy: PartitionStrategy = RandomVertexCut()):
partitionStrategy: PartitionStrategy = RandomVertexCut):
Graph[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
@ -43,7 +43,7 @@ object GraphLoader {
Edge(source, target, edata)
})
val defaultVertexAttr = 1
Graph(edges, defaultVertexAttr, partitionStrategy)
Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy)
}
/**
@ -78,7 +78,7 @@ object GraphLoader {
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1,
partitionStrategy: PartitionStrategy = RandomVertexCut()):
partitionStrategy: PartitionStrategy = RandomVertexCut):
Graph[Int, Int] = {
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
@ -97,7 +97,7 @@ object GraphLoader {
}
})
val defaultVertexAttr = 1
Graph(edges, defaultVertexAttr, partitionStrategy)
Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy)
} // end of edgeListFile
}

View file

@ -240,7 +240,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
case None => data
}
}
ClosureCleaner.clean(uf)
graph.outerJoinVertices(table)(uf)
}

View file

@ -50,7 +50,7 @@ sealed trait PartitionStrategy extends Serializable {
*
*
*/
case class EdgePartition2D() extends PartitionStrategy {
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: Vid = 1125899906842597L
@ -61,7 +61,7 @@ case class EdgePartition2D() extends PartitionStrategy {
}
case class EdgePartition1D() extends PartitionStrategy {
case object EdgePartition1D extends PartitionStrategy {
override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
@ -73,7 +73,7 @@ case class EdgePartition1D() extends PartitionStrategy {
* Assign edges to an aribtrary machine corresponding to a
* random vertex cut.
*/
case class RandomVertexCut() extends PartitionStrategy {
case object RandomVertexCut extends PartitionStrategy {
override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
math.abs((src, dst).hashCode()) % numParts
}
@ -85,7 +85,7 @@ case class RandomVertexCut() extends PartitionStrategy {
* function ensures that edges of opposite direction between the same two vertices
* will end up on the same partition.
*/
case class CanonicalRandomVertexCut() extends PartitionStrategy {
case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)

View file

@ -27,28 +27,6 @@ import org.apache.spark.graph.impl.VertexPartition
import org.apache.spark.util.ClosureCleaner
/**
* Maintains the per-partition mapping from vertex id to the corresponding
* location in the per-partition values array. This class is meant to be an
* opaque type.
*/
private[graph]
class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
/**
* The persist function behaves like the standard RDD persist
*/
def persist(newLevel: StorageLevel): VertexSetIndex = {
rdd.persist(newLevel)
this
}
/**
* Returns the partitioner object of the underlying RDD. This is
* used by the VertexSetRDD to partition the values RDD.
*/
def partitioner: Partitioner = rdd.partitioner.get
} // end of VertexSetIndex
/**
* A `VertexSetRDD[VD]` extends the `RDD[(Vid, VD)]` by ensuring that there is
* only one entry for each vertex and by pre-indexing the entries for fast,
@ -77,26 +55,20 @@ class VertexSetRDD[@specialized VD: ClassManifest](
@transient val partitionsRDD: RDD[VertexPartition[VD]])
extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
/**
* The `VertexSetIndex` representing the layout of this `VertexSetRDD`.
*/
// TOOD: Consider removing the exposure of index to outside, and implement methods in this
// class to handle any operations that would require indexing.
def index = new VertexSetIndex(partitionsRDD.mapPartitions(_.map(_.index),
preservesPartitioning = true))
require(partitionsRDD.partitioner.isDefined)
/**
* Construct a new VertexSetRDD that is indexed by only the keys in the RDD.
* The resulting VertexSet will be based on a different index and can
* no longer be quickly joined with this RDD.
*/
def reindex(): VertexSetRDD[VD] = VertexSetRDD(this)
def reindex(): VertexSetRDD[VD] = new VertexSetRDD(partitionsRDD.map(_.reindex()))
/**
* An internal representation which joins the block indices with the values
* This is used by the compute function to emulate `RDD[(Vid, VD)]`
*/
protected[spark] val tuples = partitionsRDD.flatMap(_.iterator)
protected[spark] val tuples: RDD[(Vid, VD)] = partitionsRDD.flatMap(_.iterator)
/**
* The partitioner is defined by the index.
@ -305,10 +277,18 @@ class VertexSetRDD[@specialized VD: ClassManifest](
// If the other set is a VertexSetRDD then we use the much more efficient leftZipJoin
other match {
case other: VertexSetRDD[VD2] =>
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
leftZipJoin(other)(f)
case _ =>
val indexedOther: VertexSetRDD[VD2] = VertexSetRDD(other, this.index, (a, b) => a)
leftZipJoin(indexedOther)(f)
println("------------------------------------------------------")
new VertexSetRDD[VD3](
partitionsRDD.zipPartitions(
other.partitionBy(this.partitioner.get), preservesPartitioning = true)
{ (part, msgs) =>
val vertexPartition: VertexPartition[VD] = part.next()
Iterator(vertexPartition.leftJoin(msgs)(f))
}
)
}
}
@ -316,7 +296,7 @@ class VertexSetRDD[@specialized VD: ClassManifest](
messages: RDD[VidVDPair], reduceFunc: (VD2, VD2) => VD2): VertexSetRDD[VD2] =
{
// TODO: use specialized shuffle serializer.
val shuffled = new ShuffledRDD[Vid, VD2, VidVDPair](messages, partitionsRDD.partitioner.get)
val shuffled = new ShuffledRDD[Vid, VD2, VidVDPair](messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
val vertextPartition: VertexPartition[VD] = thisIter.next()
Iterator(vertextPartition.aggregateUsingIndex(msgIter, reduceFunc))
@ -352,166 +332,31 @@ object VertexSetRDD {
}
/**
* Construct a vertex set from an RDD using an existing index.
*
* @note duplicate vertices are discarded arbitrarily
* Construct a vertex set from an RDD of vertex-attribute pairs.
* Duplicate entries are merged using mergeFunc.
*
* @tparam VD the vertex attribute type
* @param rdd the rdd containing vertices
* @param index a VertexSetRDD whose indexes will be reused. The
* indexes must be a superset of the vertices in rdd
* in RDD
*/
// TODO: only used in testing. Consider removing.
def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)], index: VertexSetIndex): VertexSetRDD[VD] =
apply(rdd, index, (a: VD, b: VD) => a)
/**
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
*
* @tparam VD the vertex attribute type
* @param rdd the rdd containing vertices
* @param index a VertexSetRDD whose indexes will be reused. The
* indexes must be a superset of the vertices in rdd
* @param reduceFunc the user defined reduce function used to merge
* duplicate vertex attributes.
* @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function.
*/
def apply[VD: ClassManifest](
rdd: RDD[(Vid, VD)],
index: VertexSetIndex,
reduceFunc: (VD, VD) => VD): VertexSetRDD[VD] =
// TODO: Considering removing the following apply.
apply(rdd, index, (v: VD) => v, reduceFunc, reduceFunc)
/**
* Construct a vertex set from an RDD of Product2[Vid, VD]
*
* @tparam VD the vertex attribute type
* @param rdd the rdd containing vertices
* @param index a VertexSetRDD whose indexes will be reused. The
* indexes must be a superset of the vertices in rdd
* @param reduceFunc the user defined reduce function used to merge
* duplicate vertex attributes.
*/
private[spark] def aggregate[VD: ClassManifest, VidVDPair <: Product2[Vid, VD] : ClassManifest](
rdd: RDD[VidVDPair],
index: VertexSetIndex,
reduceFunc: (VD, VD) => VD): VertexSetRDD[VD] = {
val cReduceFunc = rdd.context.clean(reduceFunc)
assert(rdd.partitioner == Some(index.partitioner))
// Use the index to build the new values table
val partitionsRDD = index.rdd.zipPartitions(rdd, preservesPartitioning = true) {
(indexIter, tblIter) =>
// There is only one map
val index = indexIter.next()
val mask = new BitSet(index.capacity)
val values = new Array[VD](index.capacity)
for (vertexPair <- tblIter) {
// Get the location of the key in the index
val pos = index.getPos(vertexPair._1)
if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
throw new SparkException("Error: Trying to bind an external index " +
"to an RDD which contains keys that are not in the index.")
} else {
// Get the actual index
val ind = pos & OpenHashSet.POSITION_MASK
// If this value has already been seen then merge
if (mask.get(ind)) {
values(ind) = cReduceFunc(values(ind), vertexPair._2)
} else { // otherwise just store the new value
mask.set(ind)
values(ind) = vertexPair._2
}
}
}
Iterator(new VertexPartition(index, values, mask))
def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexSetRDD[VD] =
{
val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
new VertexSetRDD(partitionsRDD)
val vertexPartitions = partitioned.mapPartitions(
iter => Iterator(VertexPartition(iter)),
preservesPartitioning = true)
new VertexSetRDD(vertexPartitions)
}
/**
* Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices.
*
* @tparam VD the vertex attribute type
* @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices
* in RDD
* @param createCombiner a user defined function to create a combiner
* from a vertex attribute
* @param mergeValue a user defined function to merge a vertex
* attribute into an existing combiner
* @param mergeCombiners a user defined function to merge combiners
*
*/
def apply[VD: ClassManifest, C: ClassManifest](
rdd: RDD[(Vid, VD)],
index: VertexSetIndex,
createCombiner: VD => C,
mergeValue: (C, VD) => C,
mergeCombiners: (C, C) => C): VertexSetRDD[C] = {
val cCreateCombiner = rdd.context.clean(createCombiner)
val cMergeValue = rdd.context.clean(mergeValue)
val cMergeCombiners = rdd.context.clean(mergeCombiners)
val partitioner = index.partitioner
// Preaggregate and shuffle if necessary
val partitioned =
if (rdd.partitioner != Some(partitioner)) {
// Preaggregation.
val aggregator = new Aggregator[Vid, VD, C](cCreateCombiner, cMergeValue, cMergeCombiners)
rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
} else {
rdd.mapValues(x => createCombiner(x))
}
aggregate(partitioned, index, mergeCombiners)
} // end of apply
/**
* Construct an index of the unique vertices. The resulting index
* can be used to build VertexSets over subsets of the vertices in
* the input.
*/
def makeIndex(
keys: RDD[Vid], partitionerOpt: Option[Partitioner] = None): VertexSetIndex = {
val partitioner = partitionerOpt match {
case Some(p) => p
case None => Partitioner.defaultPartitioner(keys)
def apply[VD: ClassManifest](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD)
: VertexSetRDD[VD] =
{
VertexSetRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
value.getOrElse(default)
}
val preAgg: RDD[(Vid, Unit)] = keys.mapPartitions(iter => {
val keys = new VertexIdToIndexMap
while (iter.hasNext) { keys.add(iter.next) }
keys.iterator.map(k => (k, ()))
}, preservesPartitioning = true).partitionBy(partitioner)
val index = preAgg.mapPartitions(iter => {
val index = new VertexIdToIndexMap
while (iter.hasNext) { index.add(iter.next._1) }
Iterator(index)
}, preservesPartitioning = true).cache
new VertexSetIndex(index)
}
/**
* Create a VertexSetRDD with all vertices initialized to the default value.
*
* @param index an index over the set of vertices
* @param defaultValue the default value to use when initializing the vertices
* @tparam VD the type of the vertex attribute
* @return
*/
def apply[VD: ClassManifest](index: VertexSetIndex, defaultValue: VD): VertexSetRDD[VD] = {
// Use the index to build the new values tables
val partitionsRDD = index.rdd.mapPartitions(_.map { index =>
val values = Array.fill(index.capacity)(defaultValue)
val mask = index.getBitSet
new VertexPartition(index, values, mask)
}, preservesPartitioning = true)
new VertexSetRDD(partitionsRDD)
} // end of apply
} // end of object VertexSetRDD
}

View file

@ -6,7 +6,7 @@ import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
import org.apache.spark.graph.util.BytecodeUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{ShuffledRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ClosureCleaner
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
@ -252,7 +252,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
}
}
// construct an iterator of tuples Iterator[(Vid, A)]
// construct an iterator of tuples. Iterator[(Vid, A)]
msgBS.iterator.map { ind =>
new AggregationMsg[A](vidToIndex.getValue(ind), msgArray(ind))
}
@ -265,6 +265,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
(updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
ClosureCleaner.clean(updateF)
println("type of --------------------------- " + updates)
val newVTable = vTable.leftJoin(updates)(updateF)
new GraphImpl(newVTable, eTable, vertexPlacement, partitioner)
}
@ -276,18 +277,24 @@ object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD,
partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = {
val etable = createETable(edges, partitionStrategy).cache
partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] =
{
val etable = createETable(edges, partitionStrategy).cache()
// Get the set of all vids
val vids = etable.mapPartitions(iter => {
val (pid, epart) = iter.next()
val vids = etable.mapPartitions { iter =>
val (_, epart) = iter.next()
assert(!iter.hasNext)
epart.iterator.flatMap(e => Iterator(e.srcId, e.dstId))
}, preservesPartitioning = true)
// Index the set of all vids
val index = VertexSetRDD.makeIndex(vids)
// Index the vertices and fill in missing attributes with the default
val vtable = VertexSetRDD(index, defaultValue)
epart.iterator.flatMap(e => Iterator((e.srcId, 0), (e.dstId, 0)))
}
// Shuffle the vids and create the VertexSetRDD.
// TODO: Consider doing map side distinct before shuffle.
val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)](
vids, new HashPartitioner(edges.partitions.size))
shuffled.setSerializer(classOf[VidMsgSerializer].getName)
val vtable = VertexSetRDD(shuffled.mapValues(x => defaultValue))
val vertexPlacement = new VertexPlacement(etable, vtable)
new GraphImpl(vtable, etable, vertexPlacement, partitionStrategy)
}
@ -296,23 +303,24 @@ object GraphImpl {
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD,
partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = {
vertices.cache
val etable = createETable(edges, partitionStrategy).cache
// Get the set of all vids, preserving partitions
partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] =
{
vertices.cache()
val etable = createETable(edges, partitionStrategy).cache()
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
val implicitVids = etable.flatMap {
case (pid, partition) => Array.concat(partition.srcIds, partition.dstIds)
}.map(vid => (vid, ())).partitionBy(partitioner)
val allVids = vertices.zipPartitions(implicitVids, preservesPartitioning = true) {
(a, b) => a.map(_._1) ++ b.map(_._1)
val vPartitioned = vertices.partitionBy(partitioner)
val vidsFromEdges = etable.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
.map(vid => (vid, 0))
.partitionBy(partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
// Index the set of all vids
val index = VertexSetRDD.makeIndex(allVids, Some(partitioner))
// Index the vertices and fill in missing attributes with the default
val vtable = VertexSetRDD(vertices, index, mergeFunc).fillMissing(defaultVertexAttr)
val vtable = VertexSetRDD(vids, vPartitioned, defaultVertexAttr)
val vertexPlacement = new VertexPlacement(etable, vtable)
new GraphImpl(vtable, etable, vertexPlacement, partitionStrategy)

View file

@ -3,9 +3,30 @@ package org.apache.spark.graph.impl
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
import org.apache.spark.graph._
import org.apache.spark.serializer._
class VidMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[(Vid, _)]
writeVarLong(msg._1, optimizePositive = false)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
(readVarLong(optimizePositive = false), null).asInstanceOf[T]
}
}
}
}
/** A special shuffle serializer for VertexBroadcastMessage[Int]. */
class IntVertexBroadcastMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {

View file

@ -15,6 +15,16 @@ private[graph] object VertexPartition {
}
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
}
def apply[VD: ClassManifest](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD)
: VertexPartition[VD] =
{
val map = new PrimitiveKeyOpenHashMap[Vid, VD]
iter.foreach { case (k, v) =>
map.setMerge(k, v, mergeFunc)
}
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
}
}
@ -114,12 +124,36 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
new VertexPartition(index, newValues, mask)
}
/** Left outer join another iterator of messages. */
def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
(other: Iterator[(Vid, VD2)])
(f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
leftJoin(createUsingIndex(other))(f)
}
/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
def createUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]])
: VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
iter.foreach { case (vid, vdata) =>
val pos = index.getPos(vid)
newMask.set(pos)
newValues(pos) = vdata
}
new VertexPartition[VD2](index, newValues, newMask)
}
def aggregateUsingIndex[VD2: ClassManifest](
iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] =
{
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
iter.foreach { case (vid, vdata) =>
iter.foreach { product =>
val vid = product._1
val vdata = product._2
val pos = index.getPos(vid)
if (newMask.get(pos)) {
newValues(pos) = reduceFunc(newValues(pos), vdata)

View file

@ -1,12 +1,11 @@
package org.apache.spark.graph.impl
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuilder
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graph._
import org.apache.spark.util.collection.PrimitiveVector
/**
* Stores the layout of replicated vertex attributes for GraphImpl. Tells each
@ -39,7 +38,7 @@ class VertexPlacement(
private def createPid2Vid(
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
val preAgg = eTable.mapPartitions { iter =>
val vid2pid: RDD[(Vid, Pid)] = eTable.mapPartitions { iter =>
val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
@ -59,21 +58,15 @@ class VertexPlacement(
}
vSet.iterator.map { vid => (vid, pid) }
}
// Aggregate the mappings to determine where each vertex should go
val vid2pid = VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTable.index,
(p: Pid) => ArrayBuffer(p),
(ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
(a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
.mapValues(a => a.toArray)
// Within each vertex partition, reorganize the placement information into
// columnar format keyed on the destination partition
val numPartitions = vid2pid.partitions.size
vid2pid.mapPartitions { iter =>
val pid2vid = Array.fill[ArrayBuilder[Vid]](numPartitions)(ArrayBuilder.make[Vid])
for ((vid, pids) <- iter) {
pids.foreach { pid => pid2vid(pid) += vid }
val numPartitions = vTable.partitions.size
vid2pid.partitionBy(vTable.partitioner.get).mapPartitions { iter =>
val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
Iterator(pid2vid.map(_.result))
Iterator(pid2vid.map(_.trim().array))
}
}
}

View file

@ -268,14 +268,14 @@ object GraphGenerators {
* Create a star graph with vertex 0 being the center.
*
* @param sc the spark context in which to construct the graph
* @param the number of vertices in the star
* @param nverts the number of vertices in the star
*
* @return A star graph containing `nverts` vertices with vertex 0
* being the center vertex.
*/
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph(edges, 1)
Graph.fromEdgeTuples(edges, 1)
} // end of starGraph

View file

@ -132,9 +132,9 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph(rawEdges, 1.0)
val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0)
val ccGraph = Analytics.connectedComponents(twoChains).cache()
val vertices = ccGraph.vertices.collect
val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
if(id < 10) { assert(cc === 0) }
else { assert(cc === 10) }
@ -156,7 +156,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph(rawEdges, true).reverse
val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse
val ccGraph = Analytics.connectedComponents(twoChains).cache()
val vertices = ccGraph.vertices.collect
for ( (id, cc) <- vertices ) {
@ -181,7 +181,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Count a single triangle") {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
val graph = Graph(rawEdges, true).cache
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }
@ -193,10 +193,10 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val rawEdges = sc.parallelize(triangles, 2)
val graph = Graph(rawEdges, true).cache
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) =>
verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 2)
} else {
@ -213,10 +213,10 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val revTriangles = triangles.map { case (a,b) => (b,a) }
val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
val graph = Graph(rawEdges, true).cache
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) =>
verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 4)
} else {
@ -230,7 +230,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
val graph = Graph(rawEdges, true).cache
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = Analytics.triangleCount(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }

View file

@ -15,11 +15,20 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L)
val edges = sc.parallelize(rawEdges)
val graph = Graph(edges, 1.0F)
val graph = Graph.fromEdgeTuples(edges, 1.0F)
assert(graph.edges.count() === rawEdges.size)
}
}
test("mapReduceTriplets") {
withSpark(new SparkContext("local", "test")) { sc =>
val edges = sc.parallelize((0L to 100L).zip((1L to 99L) :+ 0L))
val graph = Graph.fromEdgeTuples(edges, 1.0F)
val d = graph.mapReduceTriplets[Int](et => Iterator((et.srcId, 0)), (a, b) => a + b)
}
}
test("Graph Creation with invalid vertices") {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
@ -38,7 +47,9 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mapEdges") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "defaultValue")
val star = Graph.fromEdgeTuples(
sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))),
"defaultValue")
val starWithEdgeAttrs = star.mapEdges(e => e.dstId)
// map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25
@ -51,7 +62,14 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mapReduceTriplets") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0)
val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0)
println("--------------------------------------- star vertices")
println(star.vertices.partitionsRDD.map { v => v.index.toString }.collect().toSeq)
println("--------------------------------------- starDeg")
println(star.degrees.partitionsRDD.map { v => v.index.toString }.collect().toSeq)
val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
val neighborDegreeSums = starDeg.mapReduceTriplets(
edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
@ -63,7 +81,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("aggregateNeighbors") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
val indegrees = star.aggregateNeighbors(
(vid, edge) => Some(1),
@ -103,7 +121,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val chain = (0 until 100).map(x => (x, (x+1)%100) )
val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
val graph = Graph(rawEdges, 1.0)
val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
val nbrs = graph.collectNeighborIds(EdgeDirection.Both)
assert(nbrs.count === chain.size)
assert(graph.numVertices === nbrs.count)
@ -122,7 +140,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val b = VertexSetRDD(a).mapValues(x => -x)
assert(b.count === 101)
assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0)
val c = VertexSetRDD(a, b.index)
val c = b.aggregateUsingIndex[Long, (Long, Long)](a, (x, y) => x)
assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0)
val d = c.filter(q => ((q._2 % 2) == 0))
val e = a.filter(q => ((q._2 % 2) == 0))