diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index 987077dd8a..ee1b168028 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -71,7 +71,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, /** Set the value for a key */ - def setMerge(k: K, v: V, mergeF: (V,V) => V) { + def setMerge(k: K, v: V, mergeF: (V, V) => V) { val pos = keySet.addWithoutResize(k) val ind = pos & OpenHashSet.POSITION_MASK if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 62de470c96..ea501c7435 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -301,10 +301,10 @@ object Analytics extends Logging { def pickPartitioner(v: String): PartitionStrategy = { v match { - case "RandomVertexCut" => RandomVertexCut() - case "EdgePartition1D" => EdgePartition1D() - case "EdgePartition2D" => EdgePartition2D() - case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut() + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v) } } @@ -324,7 +324,7 @@ object Analytics extends Logging { var outFname = "" var numVPart = 4 var numEPart = 4 - var partitionStrategy: PartitionStrategy = RandomVertexCut() + var partitionStrategy: PartitionStrategy = RandomVertexCut options.foreach{ case ("numIter", v) => numIter = v.toInt @@ -379,7 +379,7 @@ object Analytics extends Logging { var numVPart = 4 var numEPart = 4 var isDynamic = false - var partitionStrategy: PartitionStrategy = RandomVertexCut() + var partitionStrategy: PartitionStrategy = RandomVertexCut options.foreach{ case ("numIter", v) => numIter = v.toInt @@ -413,7 +413,7 @@ object Analytics extends Logging { case "triangles" => { var numVPart = 4 var numEPart = 4 - var partitionStrategy: PartitionStrategy = RandomVertexCut() + var partitionStrategy: PartitionStrategy = RandomVertexCut options.foreach{ case ("numEPart", v) => numEPart = v.toInt diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 2ebe5cf083..425c9edefe 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -287,23 +287,10 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { /** - * The Graph object contains a collection of routines used to - * construct graphs from RDDs. - * + * The Graph object contains a collection of routines used to construct graphs from RDDs. */ object Graph { - /** - * Construct a graph from a collection of edges encoded as vertex id pairs. - * - * @param rawEdges the RDD containing the set of edges in the graph - * - * @return a graph with edge attributes containing the count of duplicate edges. - */ - def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): Graph[VD, Int] = { - Graph(rawEdges, defaultValue, false, RandomVertexCut()) - } - /** * Construct a graph from a collection of edges encoded as vertex id * pairs. @@ -316,13 +303,12 @@ object Graph { * @return a graph with edge attributes containing either the count * of duplicate edges or 1 (if `uniqueEdges=false`) and vertex * attributes containing the total degree of each vertex. - * */ - def apply[VD: ClassManifest]( + def fromEdgeTuples[VD: ClassManifest]( rawEdges: RDD[(Vid, Vid)], defaultValue: VD, - uniqueEdges: Boolean, - partitionStrategy: PartitionStrategy): Graph[VD, Int] = { + uniqueEdges: Boolean = false, + partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, Int] = { val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) val graph = GraphImpl(edges, defaultValue, partitionStrategy) if (uniqueEdges) graph.groupEdges((a, b) => a + b) else graph @@ -337,106 +323,42 @@ object Graph { * @return a graph with edge attributes described by `edges` and vertices * given by all vertices in `edges` with value `defaultValue` */ - def apply[VD: ClassManifest, ED: ClassManifest](edges: RDD[Edge[ED]], defaultValue: VD) - : Graph[VD, ED] = { - Graph(edges, defaultValue, RandomVertexCut()) - } - - /** - * Construct a graph from a collection of edges. - * - * @param edges the RDD containing the set of edges in the graph - * @param defaultValue the default vertex attribute to use for each vertex - * - * @return a graph with edge attributes described by `edges` and vertices - * given by all vertices in `edges` with value `defaultValue` - */ - def apply[VD: ClassManifest, ED: ClassManifest]( + def fromEdges[VD: ClassManifest, ED: ClassManifest]( edges: RDD[Edge[ED]], defaultValue: VD, - partitionStrategy: PartitionStrategy): Graph[VD, ED] = { + partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = { GraphImpl(edges, defaultValue, partitionStrategy) } /** * Construct a graph from a collection attributed vertices and - * edges. - * - * @note Duplicate vertices are removed arbitrarily and missing - * vertices (vertices in the edge collection that are not in the - * vertex collection) are replaced by null vertex attributes. - * - * @tparam VD the vertex attribute type - * @tparam ED the edge attribute type - * @param vertices the "set" of vertices and their attributes - * @param edges the collection of edges in the graph - * - */ - def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid,VD)], - edges: RDD[Edge[ED]]): Graph[VD, ED] = { - val defaultAttr: VD = null.asInstanceOf[VD] - Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a, RandomVertexCut()) - } - - /** - * Construct a graph from a collection attributed vertices and - * edges. Duplicate vertices are combined using the `mergeFunc` and + * edges. Duplicate vertices are picked arbitrarily and * vertices found in the edge collection but not in the input - * vertices are the default attribute `defautVertexAttr`. - * - * @note Duplicate vertices are removed arbitrarily . + * vertices are the default attribute. * * @tparam VD the vertex attribute type * @tparam ED the edge attribute type * @param vertices the "set" of vertices and their attributes * @param edges the collection of edges in the graph * @param defaultVertexAttr the default vertex attribute to use for - * vertices that are mentioned in `edges` but not in `vertices` - * - */ - def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid,VD)], - edges: RDD[Edge[ED]], - defaultVertexAttr: VD): Graph[VD, ED] = { - Graph(vertices, edges, defaultVertexAttr, (a,b) => a, RandomVertexCut()) - } - - /** - * Construct a graph from a collection attributed vertices and - * edges. Duplicate vertices are combined using the `mergeFunc` and - * vertices found in the edge collection but not in the input - * vertices are the default attribute `defautVertexAttr`. - * - * @tparam VD the vertex attribute type - * @tparam ED the edge attribute type - * @param vertices the "set" of vertices and their attributes - * @param edges the collection of edges in the graph - * @param defaultVertexAttr the default vertex attribute to use for - * vertices that are mentioned in `edges` but not in `vertices - * @param mergeFunc the function used to merge duplicate vertices - * in the `vertices` collection. + * vertices that are mentioned in edges but not in vertices * @param partitionStrategy the partition strategy to use when * partitioning the edges. - * */ def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid,VD)], + vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD, - mergeFunc: (VD, VD) => VD, - partitionStrategy: PartitionStrategy): Graph[VD, ED] = { - GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc, partitionStrategy) + defaultVertexAttr: VD = null.asInstanceOf[VD], + partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = { + GraphImpl(vertices, edges, defaultVertexAttr, partitionStrategy) } /** - * The implicit graphToGraphOPs function extracts the GraphOps - * member from a graph. + * The implicit graphToGraphOPs function extracts the GraphOps member from a graph. * - * To improve modularity the Graph type only contains a small set of - * basic operations. All the convenience operations are defined in - * the GraphOps class which may be shared across multiple graph - * implementations. + * To improve modularity the Graph type only contains a small set of basic operations. All the + * convenience operations are defined in the GraphOps class which may be shared across multiple + * graph implementations. */ implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops } // end of Graph object diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index d97c028faa..7623b2b596 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -26,7 +26,7 @@ object GraphLoader { path: String, edgeParser: Array[String] => ED, minEdgePartitions: Int = 1, - partitionStrategy: PartitionStrategy = RandomVertexCut()): + partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[Int, ED] = { // Parse the edge data table val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter => @@ -43,7 +43,7 @@ object GraphLoader { Edge(source, target, edata) }) val defaultVertexAttr = 1 - Graph(edges, defaultVertexAttr, partitionStrategy) + Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy) } /** @@ -78,7 +78,7 @@ object GraphLoader { path: String, canonicalOrientation: Boolean = false, minEdgePartitions: Int = 1, - partitionStrategy: PartitionStrategy = RandomVertexCut()): + partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[Int, Int] = { // Parse the edge data table val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter => @@ -97,7 +97,7 @@ object GraphLoader { } }) val defaultVertexAttr = 1 - Graph(edges, defaultVertexAttr, partitionStrategy) + Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy) } // end of edgeListFile } diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index b7e28186c6..3bfad2131e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -240,7 +240,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { case None => data } } - ClosureCleaner.clean(uf) graph.outerJoinVertices(table)(uf) } diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala index cf65f50657..293a9d588a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala +++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala @@ -50,7 +50,7 @@ sealed trait PartitionStrategy extends Serializable { * * */ -case class EdgePartition2D() extends PartitionStrategy { +case object EdgePartition2D extends PartitionStrategy { override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt val mixingPrime: Vid = 1125899906842597L @@ -61,7 +61,7 @@ case class EdgePartition2D() extends PartitionStrategy { } -case class EdgePartition1D() extends PartitionStrategy { +case object EdgePartition1D extends PartitionStrategy { override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { val mixingPrime: Vid = 1125899906842597L (math.abs(src) * mixingPrime).toInt % numParts @@ -73,7 +73,7 @@ case class EdgePartition1D() extends PartitionStrategy { * Assign edges to an aribtrary machine corresponding to a * random vertex cut. */ -case class RandomVertexCut() extends PartitionStrategy { +case object RandomVertexCut extends PartitionStrategy { override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { math.abs((src, dst).hashCode()) % numParts } @@ -85,7 +85,7 @@ case class RandomVertexCut() extends PartitionStrategy { * function ensures that edges of opposite direction between the same two vertices * will end up on the same partition. */ -case class CanonicalRandomVertexCut() extends PartitionStrategy { +case object CanonicalRandomVertexCut extends PartitionStrategy { override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { val lower = math.min(src, dst) val higher = math.max(src, dst) diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index ed4584bd1e..c3b15bed67 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -27,28 +27,6 @@ import org.apache.spark.graph.impl.VertexPartition import org.apache.spark.util.ClosureCleaner -/** - * Maintains the per-partition mapping from vertex id to the corresponding - * location in the per-partition values array. This class is meant to be an - * opaque type. - */ -private[graph] -class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) { - /** - * The persist function behaves like the standard RDD persist - */ - def persist(newLevel: StorageLevel): VertexSetIndex = { - rdd.persist(newLevel) - this - } - - /** - * Returns the partitioner object of the underlying RDD. This is - * used by the VertexSetRDD to partition the values RDD. - */ - def partitioner: Partitioner = rdd.partitioner.get -} // end of VertexSetIndex - /** * A `VertexSetRDD[VD]` extends the `RDD[(Vid, VD)]` by ensuring that there is * only one entry for each vertex and by pre-indexing the entries for fast, @@ -77,26 +55,20 @@ class VertexSetRDD[@specialized VD: ClassManifest]( @transient val partitionsRDD: RDD[VertexPartition[VD]]) extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { - /** - * The `VertexSetIndex` representing the layout of this `VertexSetRDD`. - */ - // TOOD: Consider removing the exposure of index to outside, and implement methods in this - // class to handle any operations that would require indexing. - def index = new VertexSetIndex(partitionsRDD.mapPartitions(_.map(_.index), - preservesPartitioning = true)) + require(partitionsRDD.partitioner.isDefined) /** * Construct a new VertexSetRDD that is indexed by only the keys in the RDD. * The resulting VertexSet will be based on a different index and can * no longer be quickly joined with this RDD. */ - def reindex(): VertexSetRDD[VD] = VertexSetRDD(this) + def reindex(): VertexSetRDD[VD] = new VertexSetRDD(partitionsRDD.map(_.reindex())) /** * An internal representation which joins the block indices with the values * This is used by the compute function to emulate `RDD[(Vid, VD)]` */ - protected[spark] val tuples = partitionsRDD.flatMap(_.iterator) + protected[spark] val tuples: RDD[(Vid, VD)] = partitionsRDD.flatMap(_.iterator) /** * The partitioner is defined by the index. @@ -305,10 +277,18 @@ class VertexSetRDD[@specialized VD: ClassManifest]( // If the other set is a VertexSetRDD then we use the much more efficient leftZipJoin other match { case other: VertexSetRDD[VD2] => + println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") leftZipJoin(other)(f) case _ => - val indexedOther: VertexSetRDD[VD2] = VertexSetRDD(other, this.index, (a, b) => a) - leftZipJoin(indexedOther)(f) + println("------------------------------------------------------") + new VertexSetRDD[VD3]( + partitionsRDD.zipPartitions( + other.partitionBy(this.partitioner.get), preservesPartitioning = true) + { (part, msgs) => + val vertexPartition: VertexPartition[VD] = part.next() + Iterator(vertexPartition.leftJoin(msgs)(f)) + } + ) } } @@ -316,7 +296,7 @@ class VertexSetRDD[@specialized VD: ClassManifest]( messages: RDD[VidVDPair], reduceFunc: (VD2, VD2) => VD2): VertexSetRDD[VD2] = { // TODO: use specialized shuffle serializer. - val shuffled = new ShuffledRDD[Vid, VD2, VidVDPair](messages, partitionsRDD.partitioner.get) + val shuffled = new ShuffledRDD[Vid, VD2, VidVDPair](messages, this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => val vertextPartition: VertexPartition[VD] = thisIter.next() Iterator(vertextPartition.aggregateUsingIndex(msgIter, reduceFunc)) @@ -352,166 +332,31 @@ object VertexSetRDD { } /** - * Construct a vertex set from an RDD using an existing index. - * - * @note duplicate vertices are discarded arbitrarily + * Construct a vertex set from an RDD of vertex-attribute pairs. + * Duplicate entries are merged using mergeFunc. * * @tparam VD the vertex attribute type - * @param rdd the rdd containing vertices - * @param index a VertexSetRDD whose indexes will be reused. The - * indexes must be a superset of the vertices in rdd - * in RDD - */ - // TODO: only used in testing. Consider removing. - def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)], index: VertexSetIndex): VertexSetRDD[VD] = - apply(rdd, index, (a: VD, b: VD) => a) - - /** - * Construct a vertex set from an RDD using an existing index and a - * user defined `combiner` to merge duplicate vertices. * - * @tparam VD the vertex attribute type - * @param rdd the rdd containing vertices - * @param index a VertexSetRDD whose indexes will be reused. The - * indexes must be a superset of the vertices in rdd - * @param reduceFunc the user defined reduce function used to merge - * duplicate vertex attributes. + * @param rdd the collection of vertex-attribute pairs + * @param mergeFunc the associative, commutative merge function. */ - def apply[VD: ClassManifest]( - rdd: RDD[(Vid, VD)], - index: VertexSetIndex, - reduceFunc: (VD, VD) => VD): VertexSetRDD[VD] = - // TODO: Considering removing the following apply. - apply(rdd, index, (v: VD) => v, reduceFunc, reduceFunc) - - /** - * Construct a vertex set from an RDD of Product2[Vid, VD] - * - * @tparam VD the vertex attribute type - * @param rdd the rdd containing vertices - * @param index a VertexSetRDD whose indexes will be reused. The - * indexes must be a superset of the vertices in rdd - * @param reduceFunc the user defined reduce function used to merge - * duplicate vertex attributes. - */ - private[spark] def aggregate[VD: ClassManifest, VidVDPair <: Product2[Vid, VD] : ClassManifest]( - rdd: RDD[VidVDPair], - index: VertexSetIndex, - reduceFunc: (VD, VD) => VD): VertexSetRDD[VD] = { - - val cReduceFunc = rdd.context.clean(reduceFunc) - assert(rdd.partitioner == Some(index.partitioner)) - // Use the index to build the new values table - val partitionsRDD = index.rdd.zipPartitions(rdd, preservesPartitioning = true) { - (indexIter, tblIter) => - // There is only one map - val index = indexIter.next() - val mask = new BitSet(index.capacity) - val values = new Array[VD](index.capacity) - for (vertexPair <- tblIter) { - // Get the location of the key in the index - val pos = index.getPos(vertexPair._1) - if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { - throw new SparkException("Error: Trying to bind an external index " + - "to an RDD which contains keys that are not in the index.") - } else { - // Get the actual index - val ind = pos & OpenHashSet.POSITION_MASK - // If this value has already been seen then merge - if (mask.get(ind)) { - values(ind) = cReduceFunc(values(ind), vertexPair._2) - } else { // otherwise just store the new value - mask.set(ind) - values(ind) = vertexPair._2 - } - } - } - Iterator(new VertexPartition(index, values, mask)) + def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexSetRDD[VD] = + { + val partitioned: RDD[(Vid, VD)] = rdd.partitioner match { + case Some(p) => rdd + case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) } - - new VertexSetRDD(partitionsRDD) + val vertexPartitions = partitioned.mapPartitions( + iter => Iterator(VertexPartition(iter)), + preservesPartitioning = true) + new VertexSetRDD(vertexPartitions) } - /** - * Construct a vertex set from an RDD using an existing index and a - * user defined `combiner` to merge duplicate vertices. - * - * @tparam VD the vertex attribute type - * @param rdd the rdd containing vertices - * @param index the index which must be a superset of the vertices - * in RDD - * @param createCombiner a user defined function to create a combiner - * from a vertex attribute - * @param mergeValue a user defined function to merge a vertex - * attribute into an existing combiner - * @param mergeCombiners a user defined function to merge combiners - * - */ - def apply[VD: ClassManifest, C: ClassManifest]( - rdd: RDD[(Vid, VD)], - index: VertexSetIndex, - createCombiner: VD => C, - mergeValue: (C, VD) => C, - mergeCombiners: (C, C) => C): VertexSetRDD[C] = { - val cCreateCombiner = rdd.context.clean(createCombiner) - val cMergeValue = rdd.context.clean(mergeValue) - val cMergeCombiners = rdd.context.clean(mergeCombiners) - val partitioner = index.partitioner - // Preaggregate and shuffle if necessary - val partitioned = - if (rdd.partitioner != Some(partitioner)) { - // Preaggregation. - val aggregator = new Aggregator[Vid, VD, C](cCreateCombiner, cMergeValue, cMergeCombiners) - rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) - } else { - rdd.mapValues(x => createCombiner(x)) - } - - aggregate(partitioned, index, mergeCombiners) - } // end of apply - - /** - * Construct an index of the unique vertices. The resulting index - * can be used to build VertexSets over subsets of the vertices in - * the input. - */ - def makeIndex( - keys: RDD[Vid], partitionerOpt: Option[Partitioner] = None): VertexSetIndex = { - val partitioner = partitionerOpt match { - case Some(p) => p - case None => Partitioner.defaultPartitioner(keys) + def apply[VD: ClassManifest](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD) + : VertexSetRDD[VD] = + { + VertexSetRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => + value.getOrElse(default) } - - val preAgg: RDD[(Vid, Unit)] = keys.mapPartitions(iter => { - val keys = new VertexIdToIndexMap - while (iter.hasNext) { keys.add(iter.next) } - keys.iterator.map(k => (k, ())) - }, preservesPartitioning = true).partitionBy(partitioner) - - val index = preAgg.mapPartitions(iter => { - val index = new VertexIdToIndexMap - while (iter.hasNext) { index.add(iter.next._1) } - Iterator(index) - }, preservesPartitioning = true).cache - - new VertexSetIndex(index) } - - /** - * Create a VertexSetRDD with all vertices initialized to the default value. - * - * @param index an index over the set of vertices - * @param defaultValue the default value to use when initializing the vertices - * @tparam VD the type of the vertex attribute - * @return - */ - def apply[VD: ClassManifest](index: VertexSetIndex, defaultValue: VD): VertexSetRDD[VD] = { - // Use the index to build the new values tables - val partitionsRDD = index.rdd.mapPartitions(_.map { index => - val values = Array.fill(index.capacity)(defaultValue) - val mask = index.getBitSet - new VertexPartition(index, values, mask) - }, preservesPartitioning = true) - new VertexSetRDD(partitionsRDD) - } // end of apply -} // end of object VertexSetRDD +} diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 154466d7a6..b445c8ad2b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -6,7 +6,7 @@ import org.apache.spark.graph._ import org.apache.spark.graph.impl.GraphImpl._ import org.apache.spark.graph.impl.MsgRDDFunctions._ import org.apache.spark.graph.util.BytecodeUtils -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{ShuffledRDD, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.ClosureCleaner import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap} @@ -252,7 +252,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } } } - // construct an iterator of tuples Iterator[(Vid, A)] + // construct an iterator of tuples. Iterator[(Vid, A)] msgBS.iterator.map { ind => new AggregationMsg[A](vidToIndex.getValue(ind), msgArray(ind)) } @@ -265,6 +265,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest] (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = { ClosureCleaner.clean(updateF) + println("type of --------------------------- " + updates) val newVTable = vTable.leftJoin(updates)(updateF) new GraphImpl(newVTable, eTable, vertexPlacement, partitioner) } @@ -276,18 +277,24 @@ object GraphImpl { def apply[VD: ClassManifest, ED: ClassManifest]( edges: RDD[Edge[ED]], defaultValue: VD, - partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = { - val etable = createETable(edges, partitionStrategy).cache + partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = + { + val etable = createETable(edges, partitionStrategy).cache() + // Get the set of all vids - val vids = etable.mapPartitions(iter => { - val (pid, epart) = iter.next() + val vids = etable.mapPartitions { iter => + val (_, epart) = iter.next() assert(!iter.hasNext) - epart.iterator.flatMap(e => Iterator(e.srcId, e.dstId)) - }, preservesPartitioning = true) - // Index the set of all vids - val index = VertexSetRDD.makeIndex(vids) - // Index the vertices and fill in missing attributes with the default - val vtable = VertexSetRDD(index, defaultValue) + epart.iterator.flatMap(e => Iterator((e.srcId, 0), (e.dstId, 0))) + } + + // Shuffle the vids and create the VertexSetRDD. + // TODO: Consider doing map side distinct before shuffle. + val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)]( + vids, new HashPartitioner(edges.partitions.size)) + shuffled.setSerializer(classOf[VidMsgSerializer].getName) + val vtable = VertexSetRDD(shuffled.mapValues(x => defaultValue)) + val vertexPlacement = new VertexPlacement(etable, vtable) new GraphImpl(vtable, etable, vertexPlacement, partitionStrategy) } @@ -296,23 +303,24 @@ object GraphImpl { vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD, - mergeFunc: (VD, VD) => VD, - partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = { - - vertices.cache - val etable = createETable(edges, partitionStrategy).cache - // Get the set of all vids, preserving partitions + partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = + { + vertices.cache() + val etable = createETable(edges, partitionStrategy).cache() + // Get the set of all vids val partitioner = Partitioner.defaultPartitioner(vertices) - val implicitVids = etable.flatMap { - case (pid, partition) => Array.concat(partition.srcIds, partition.dstIds) - }.map(vid => (vid, ())).partitionBy(partitioner) - val allVids = vertices.zipPartitions(implicitVids, preservesPartitioning = true) { - (a, b) => a.map(_._1) ++ b.map(_._1) + + val vPartitioned = vertices.partitionBy(partitioner) + + val vidsFromEdges = etable.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } + .map(vid => (vid, 0)) + .partitionBy(partitioner) + + val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => + vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) } - // Index the set of all vids - val index = VertexSetRDD.makeIndex(allVids, Some(partitioner)) - // Index the vertices and fill in missing attributes with the default - val vtable = VertexSetRDD(vertices, index, mergeFunc).fillMissing(defaultVertexAttr) + + val vtable = VertexSetRDD(vids, vPartitioned, defaultVertexAttr) val vertexPlacement = new VertexPlacement(etable, vtable) new GraphImpl(vtable, etable, vertexPlacement, partitionStrategy) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala index 2e768e85cf..9143820e13 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala @@ -3,9 +3,30 @@ package org.apache.spark.graph.impl import java.io.{EOFException, InputStream, OutputStream} import java.nio.ByteBuffer +import org.apache.spark.graph._ import org.apache.spark.serializer._ +class VidMsgSerializer extends Serializer { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[(Vid, _)] + writeVarLong(msg._1, optimizePositive = false) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + override def readObject[T](): T = { + (readVarLong(optimizePositive = false), null).asInstanceOf[T] + } + } + } +} + + /** A special shuffle serializer for VertexBroadcastMessage[Int]. */ class IntVertexBroadcastMsgSerializer extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index c6147b89a9..7007427e0d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -15,6 +15,16 @@ private[graph] object VertexPartition { } new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) } + + def apply[VD: ClassManifest](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD) + : VertexPartition[VD] = + { + val map = new PrimitiveKeyOpenHashMap[Vid, VD] + iter.foreach { case (k, v) => + map.setMerge(k, v, mergeFunc) + } + new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) + } } @@ -114,12 +124,36 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( new VertexPartition(index, newValues, mask) } + /** Left outer join another iterator of messages. */ + def leftJoin[VD2: ClassManifest, VD3: ClassManifest] + (other: Iterator[(Vid, VD2)]) + (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + leftJoin(createUsingIndex(other))(f) + } + + /** + * Similar effect as aggregateUsingIndex((a, b) => a) + */ + def createUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]]) + : VertexPartition[VD2] = { + val newMask = new BitSet(capacity) + val newValues = new Array[VD2](capacity) + iter.foreach { case (vid, vdata) => + val pos = index.getPos(vid) + newMask.set(pos) + newValues(pos) = vdata + } + new VertexPartition[VD2](index, newValues, newMask) + } + def aggregateUsingIndex[VD2: ClassManifest]( iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = { val newMask = new BitSet(capacity) val newValues = new Array[VD2](capacity) - iter.foreach { case (vid, vdata) => + iter.foreach { product => + val vid = product._1 + val vdata = product._2 val pos = index.getPos(vid) if (newMask.get(pos)) { newValues(pos) = reduceFunc(newValues(pos), vdata) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala index 5b90fc0cf1..f47b2cb587 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala @@ -1,12 +1,11 @@ package org.apache.spark.graph.impl -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.ArrayBuilder - +import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.graph._ +import org.apache.spark.util.collection.PrimitiveVector /** * Stores the layout of replicated vertex attributes for GraphImpl. Tells each @@ -39,7 +38,7 @@ class VertexPlacement( private def createPid2Vid( includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val preAgg = eTable.mapPartitions { iter => + val vid2pid: RDD[(Vid, Pid)] = eTable.mapPartitions { iter => val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next() val numEdges = edgePartition.size val vSet = new VertexSet @@ -59,21 +58,15 @@ class VertexPlacement( } vSet.iterator.map { vid => (vid, pid) } } - // Aggregate the mappings to determine where each vertex should go - val vid2pid = VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTable.index, - (p: Pid) => ArrayBuffer(p), - (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab}, - (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b) - .mapValues(a => a.toArray) - // Within each vertex partition, reorganize the placement information into - // columnar format keyed on the destination partition - val numPartitions = vid2pid.partitions.size - vid2pid.mapPartitions { iter => - val pid2vid = Array.fill[ArrayBuilder[Vid]](numPartitions)(ArrayBuilder.make[Vid]) - for ((vid, pids) <- iter) { - pids.foreach { pid => pid2vid(pid) += vid } + + val numPartitions = vTable.partitions.size + vid2pid.partitionBy(vTable.partitioner.get).mapPartitions { iter => + val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid]) + for ((vid, pid) <- iter) { + pid2vid(pid) += vid } - Iterator(pid2vid.map(_.result)) + + Iterator(pid2vid.map(_.trim().array)) } } } diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index 4c17bab0c4..a1e285816b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -268,14 +268,14 @@ object GraphGenerators { * Create a star graph with vertex 0 being the center. * * @param sc the spark context in which to construct the graph - * @param the number of vertices in the star + * @param nverts the number of vertices in the star * * @return A star graph containing `nverts` vertices with vertex 0 * being the center vertex. */ def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) - Graph(edges, 1) + Graph.fromEdgeTuples(edges, 1) } // end of starGraph diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index e1ff8df4ea..d098c17c74 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -132,9 +132,9 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { val chain1 = (0 until 9).map(x => (x, x+1) ) val chain2 = (10 until 20).map(x => (x, x+1) ) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } - val twoChains = Graph(rawEdges, 1.0) + val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0) val ccGraph = Analytics.connectedComponents(twoChains).cache() - val vertices = ccGraph.vertices.collect + val vertices = ccGraph.vertices.collect() for ( (id, cc) <- vertices ) { if(id < 10) { assert(cc === 0) } else { assert(cc === 10) } @@ -156,7 +156,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { val chain1 = (0 until 9).map(x => (x, x+1) ) val chain2 = (10 until 20).map(x => (x, x+1) ) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } - val twoChains = Graph(rawEdges, true).reverse + val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse val ccGraph = Analytics.connectedComponents(twoChains).cache() val vertices = ccGraph.vertices.collect for ( (id, cc) <- vertices ) { @@ -181,7 +181,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Count a single triangle") { withSpark(new SparkContext("local", "test")) { sc => val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2) - val graph = Graph(rawEdges, true).cache + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = Analytics.triangleCount(graph) val verts = triangleCount.vertices verts.collect.foreach { case (vid, count) => assert(count === 1) } @@ -193,10 +193,10 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ Array(0L -> -1L, -1L -> -2L, -2L -> 0L) val rawEdges = sc.parallelize(triangles, 2) - val graph = Graph(rawEdges, true).cache + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = Analytics.triangleCount(graph) val verts = triangleCount.vertices - verts.collect.foreach { case (vid, count) => + verts.collect().foreach { case (vid, count) => if (vid == 0) { assert(count === 2) } else { @@ -213,10 +213,10 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { Array(0L -> -1L, -1L -> -2L, -2L -> 0L) val revTriangles = triangles.map { case (a,b) => (b,a) } val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) - val graph = Graph(rawEdges, true).cache + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = Analytics.triangleCount(graph) val verts = triangleCount.vertices - verts.collect.foreach { case (vid, count) => + verts.collect().foreach { case (vid, count) => if (vid == 0) { assert(count === 4) } else { @@ -230,7 +230,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { withSpark(new SparkContext("local", "test")) { sc => val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) - val graph = Graph(rawEdges, true).cache + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = Analytics.triangleCount(graph) val verts = triangleCount.vertices verts.collect.foreach { case (vid, count) => assert(count === 1) } diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index da7b2bdd99..8f4f926d71 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -15,11 +15,20 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark(new SparkContext("local", "test")) { sc => val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L) val edges = sc.parallelize(rawEdges) - val graph = Graph(edges, 1.0F) + val graph = Graph.fromEdgeTuples(edges, 1.0F) assert(graph.edges.count() === rawEdges.size) } } + test("mapReduceTriplets") { + withSpark(new SparkContext("local", "test")) { sc => + val edges = sc.parallelize((0L to 100L).zip((1L to 99L) :+ 0L)) + val graph = Graph.fromEdgeTuples(edges, 1.0F) + + val d = graph.mapReduceTriplets[Int](et => Iterator((et.srcId, 0)), (a, b) => a + b) + } + } + test("Graph Creation with invalid vertices") { withSpark(new SparkContext("local", "test")) { sc => val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L) @@ -38,7 +47,9 @@ class GraphSuite extends FunSuite with LocalSparkContext { test("mapEdges") { withSpark(new SparkContext("local", "test")) { sc => val n = 3 - val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "defaultValue") + val star = Graph.fromEdgeTuples( + sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), + "defaultValue") val starWithEdgeAttrs = star.mapEdges(e => e.dstId) // map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25 @@ -51,7 +62,14 @@ class GraphSuite extends FunSuite with LocalSparkContext { test("mapReduceTriplets") { withSpark(new SparkContext("local", "test")) { sc => val n = 3 - val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0) + val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0) + + println("--------------------------------------- star vertices") + println(star.vertices.partitionsRDD.map { v => v.index.toString }.collect().toSeq) + + println("--------------------------------------- starDeg") + println(star.degrees.partitionsRDD.map { v => v.index.toString }.collect().toSeq) + val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg } val neighborDegreeSums = starDeg.mapReduceTriplets( edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)), @@ -63,7 +81,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { test("aggregateNeighbors") { withSpark(new SparkContext("local", "test")) { sc => val n = 3 - val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1) + val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1) val indegrees = star.aggregateNeighbors( (vid, edge) => Some(1), @@ -103,7 +121,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark(new SparkContext("local", "test")) { sc => val chain = (0 until 100).map(x => (x, (x+1)%100) ) val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) } - val graph = Graph(rawEdges, 1.0) + val graph = Graph.fromEdgeTuples(rawEdges, 1.0) val nbrs = graph.collectNeighborIds(EdgeDirection.Both) assert(nbrs.count === chain.size) assert(graph.numVertices === nbrs.count) @@ -122,7 +140,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val b = VertexSetRDD(a).mapValues(x => -x) assert(b.count === 101) assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0) - val c = VertexSetRDD(a, b.index) + val c = b.aggregateUsingIndex[Long, (Long, Long)](a, (x, y) => x) assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0) val d = c.filter(q => ((q._2 % 2) == 0)) val e = a.filter(q => ((q._2 % 2) == 0))