diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index 24844262bc..a34113b1eb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -1,6 +1,7 @@ package org.apache.spark.graph +import org.apache.spark.Partitioner import org.apache.spark.{TaskContext, Partition, OneToOneDependency} import org.apache.spark.graph.impl.EdgePartition import org.apache.spark.rdd.RDD @@ -13,10 +14,16 @@ class EdgeRDD[@specialized ED: ClassManifest]( partitionsRDD.setName("EdgeRDD") - override val partitioner = partitionsRDD.partitioner - override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + /** + * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in + * partitionsRDD correspond to the actual partitions and create a new partitioner that allows + * co-partitioning with partitionsRDD. + */ + override val partitioner = + partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) + override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = { val edgePartition = partitionsRDD.compute(split, context).next()._2 edgePartition.iterator diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 29d14452de..b00c7c4afe 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -1,9 +1,12 @@ package org.apache.spark.graph -import org.apache.spark.SparkContext +import java.util.{Arrays => JArrays} +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.graph.impl.{EdgePartition, GraphImpl} +import org.apache.spark.util.collection.PrimitiveVector -object GraphLoader { +object GraphLoader extends Logging { /** * Load an edge list from file initializing the Graph @@ -77,24 +80,42 @@ object GraphLoader { minEdgePartitions: Int = 1, partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[Int, Int] = { + val startTime = System.currentTimeMillis + // Parse the edge data table - val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter => - iter.filter(line => !line.isEmpty && line(0) != '#').map { line => - val lineArray = line.split("\\s+") - if(lineArray.length < 2) { - println("Invalid line: " + line) - assert(false) + val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (index, iter) => + val srcIds = new PrimitiveVector[Long] + val dstIds = new PrimitiveVector[Long] + iter.foreach { line => + if (!line.isEmpty && line(0) != '#') { + val lineArray = line.split("\\s+") + if (lineArray.length < 2) { + logWarning("Invalid line: " + line) + } + val srcId = lineArray(0).toLong + val dstId = lineArray(1).toLong + if (canonicalOrientation && dstId > srcId) { + srcIds += dstId + dstIds += srcId + } else { + srcIds += srcId + dstIds += dstId + } } - val source = lineArray(0).trim.toLong - val target = lineArray(1).trim.toLong - if (canonicalOrientation && target > source) { - Edge(target, source, 1) - } else { - Edge(source, target, 1) - } - }) + } + val srcIdArray = srcIds.trim().array + val dstIdArray = dstIds.trim().array + val data = new Array[Int](srcIdArray.length) + JArrays.fill(data, 1) + + Iterator((index, new EdgePartition[Int](srcIdArray, dstIdArray, data))) + }.cache() + edges.count() + + logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) + val defaultVertexAttr = 1 - Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy) + GraphImpl.fromEdgePartitions(edges, defaultVertexAttr, partitionStrategy) } // end of edgeListFile } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 771c460345..1e17fd5a67 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -253,25 +253,11 @@ object GraphImpl { def apply[VD: ClassManifest, ED: ClassManifest]( edges: RDD[Edge[ED]], - defaultValue: VD, + defaultVertexAttr: VD, partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = { val etable = createETable(edges, partitionStrategy).cache() - - // Get the set of all vids - val vids = etable.flatMap { e => - Iterator((e.srcId, 0), (e.dstId, 0)) - } - - // Shuffle the vids and create the VertexRDD. - // TODO: Consider doing map side distinct before shuffle. - val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)]( - vids, new HashPartitioner(edges.partitions.size)) - shuffled.setSerializer(classOf[VidMsgSerializer].getName) - val vtable = VertexRDD(shuffled.mapValues(x => defaultValue)) - - val vertexPlacement = new VertexPlacement(etable, vtable) - new GraphImpl(vtable, etable, vertexPlacement) + fromEdgeRDD(etable, defaultVertexAttr) } def apply[VD: ClassManifest, ED: ClassManifest]( @@ -303,6 +289,14 @@ object GraphImpl { new GraphImpl(vtable, etable, vertexPlacement) } + def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest]( + edges: RDD[(Pid, EdgePartition[ED])], + defaultVertexAttr: VD, + partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = { + val etable = createETableFromEdgePartitions(edges, partitionStrategy) + fromEdgeRDD(etable, defaultVertexAttr) + } + /** * Create the edge table RDD, which is much more efficient for Java heap storage than the * normal edges data structure (RDD[(Vid, Vid, ED)]). @@ -313,29 +307,55 @@ object GraphImpl { */ protected def createETable[ED: ClassManifest]( edges: RDD[Edge[ED]], - partitionStrategy: PartitionStrategy): EdgeRDD[ED] = { - // Get the number of partitions - val numPartitions = edges.partitions.size + partitionStrategy: PartitionStrategy): EdgeRDD[ED] = { + // Get the number of partitions + val numPartitions = edges.partitions.size - val eTable = edges.map { e => - val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) + val eTable = edges.map { e => + val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) - // Should we be using 3-tuple or an optimized class - new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) - } - .partitionBy(new HashPartitioner(numPartitions)) - .mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED] - iter.foreach { message => - val data = message.data - builder.add(data._1, data._2, data._3) - } - val edgePartition = builder.toEdgePartition - Iterator((pid, edgePartition)) - }, preservesPartitioning = true).cache() + // Should we be using 3-tuple or an optimized class + new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) + } + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitionsWithIndex( { (pid, iter) => + val builder = new EdgePartitionBuilder[ED] + iter.foreach { message => + val data = message.data + builder.add(data._1, data._2, data._3) + } + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true).cache() new EdgeRDD(eTable) } + protected def createETableFromEdgePartitions[ED: ClassManifest]( + edges: RDD[(Pid, EdgePartition[ED])], + partitionStrategy: PartitionStrategy): EdgeRDD[ED] = { + // TODO(ankurdave): provide option to repartition edges using partitionStrategy + new EdgeRDD(edges) + } + + private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest]( + edges: EdgeRDD[ED], + defaultVertexAttr: VD): GraphImpl[VD, ED] = { + // Get the set of all vids + val vids = edges.flatMap { e => + Iterator((e.srcId, 0), (e.dstId, 0)) + } + + // Shuffle the vids and create the VertexRDD. + // TODO: Consider doing map side distinct before shuffle. + val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)]( + vids, new HashPartitioner(edges.partitions.size)) + shuffled.setSerializer(classOf[VidMsgSerializer].getName) + val vtable = VertexRDD(shuffled.mapValues(x => defaultVertexAttr)) + + val vertexPlacement = new VertexPlacement(edges, vtable) + new GraphImpl(vtable, edges, vertexPlacement) + } + private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = { try { BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)