Load edges in columnar format

In GraphLoader.edgeListFile, load edges directly into EdgePartitions,
avoiding repartitioning.
This commit is contained in:
Ankur Dave 2013-12-06 22:32:47 -08:00
parent 9bf192b01c
commit 1e98840128
3 changed files with 101 additions and 53 deletions

View file

@ -1,6 +1,7 @@
package org.apache.spark.graph
import org.apache.spark.Partitioner
import org.apache.spark.{TaskContext, Partition, OneToOneDependency}
import org.apache.spark.graph.impl.EdgePartition
import org.apache.spark.rdd.RDD
@ -13,10 +14,16 @@ class EdgeRDD[@specialized ED: ClassManifest](
partitionsRDD.setName("EdgeRDD")
override val partitioner = partitionsRDD.partitioner
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
/**
* If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in
* partitionsRDD correspond to the actual partitions and create a new partitioner that allows
* co-partitioning with partitionsRDD.
*/
override val partitioner =
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val edgePartition = partitionsRDD.compute(split, context).next()._2
edgePartition.iterator

View file

@ -1,9 +1,12 @@
package org.apache.spark.graph
import org.apache.spark.SparkContext
import java.util.{Arrays => JArrays}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graph.impl.{EdgePartition, GraphImpl}
import org.apache.spark.util.collection.PrimitiveVector
object GraphLoader {
object GraphLoader extends Logging {
/**
* Load an edge list from file initializing the Graph
@ -77,24 +80,42 @@ object GraphLoader {
minEdgePartitions: Int = 1,
partitionStrategy: PartitionStrategy = RandomVertexCut):
Graph[Int, Int] = {
val startTime = System.currentTimeMillis
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (index, iter) =>
val srcIds = new PrimitiveVector[Long]
val dstIds = new PrimitiveVector[Long]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if (lineArray.length < 2) {
println("Invalid line: " + line)
assert(false)
logWarning("Invalid line: " + line)
}
val source = lineArray(0).trim.toLong
val target = lineArray(1).trim.toLong
if (canonicalOrientation && target > source) {
Edge(target, source, 1)
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
if (canonicalOrientation && dstId > srcId) {
srcIds += dstId
dstIds += srcId
} else {
Edge(source, target, 1)
srcIds += srcId
dstIds += dstId
}
})
}
}
val srcIdArray = srcIds.trim().array
val dstIdArray = dstIds.trim().array
val data = new Array[Int](srcIdArray.length)
JArrays.fill(data, 1)
Iterator((index, new EdgePartition[Int](srcIdArray, dstIdArray, data)))
}.cache()
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
val defaultVertexAttr = 1
Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy)
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr, partitionStrategy)
} // end of edgeListFile
}

View file

@ -253,25 +253,11 @@ object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD,
defaultVertexAttr: VD,
partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] =
{
val etable = createETable(edges, partitionStrategy).cache()
// Get the set of all vids
val vids = etable.flatMap { e =>
Iterator((e.srcId, 0), (e.dstId, 0))
}
// Shuffle the vids and create the VertexRDD.
// TODO: Consider doing map side distinct before shuffle.
val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)](
vids, new HashPartitioner(edges.partitions.size))
shuffled.setSerializer(classOf[VidMsgSerializer].getName)
val vtable = VertexRDD(shuffled.mapValues(x => defaultValue))
val vertexPlacement = new VertexPlacement(etable, vtable)
new GraphImpl(vtable, etable, vertexPlacement)
fromEdgeRDD(etable, defaultVertexAttr)
}
def apply[VD: ClassManifest, ED: ClassManifest](
@ -303,6 +289,14 @@ object GraphImpl {
new GraphImpl(vtable, etable, vertexPlacement)
}
def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest](
edges: RDD[(Pid, EdgePartition[ED])],
defaultVertexAttr: VD,
partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = {
val etable = createETableFromEdgePartitions(edges, partitionStrategy)
fromEdgeRDD(etable, defaultVertexAttr)
}
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
@ -336,6 +330,32 @@ object GraphImpl {
new EdgeRDD(eTable)
}
protected def createETableFromEdgePartitions[ED: ClassManifest](
edges: RDD[(Pid, EdgePartition[ED])],
partitionStrategy: PartitionStrategy): EdgeRDD[ED] = {
// TODO(ankurdave): provide option to repartition edges using partitionStrategy
new EdgeRDD(edges)
}
private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
edges: EdgeRDD[ED],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
// Get the set of all vids
val vids = edges.flatMap { e =>
Iterator((e.srcId, 0), (e.dstId, 0))
}
// Shuffle the vids and create the VertexRDD.
// TODO: Consider doing map side distinct before shuffle.
val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)](
vids, new HashPartitioner(edges.partitions.size))
shuffled.setSerializer(classOf[VidMsgSerializer].getName)
val vtable = VertexRDD(shuffled.mapValues(x => defaultVertexAttr))
val vertexPlacement = new VertexPlacement(edges, vtable)
new GraphImpl(vtable, edges, vertexPlacement)
}
private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
try {
BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)