Use specialized shuffler for aggregation.

This commit is contained in:
Reynold Xin 2013-12-01 21:55:50 -08:00
parent 55edbb4209
commit 8701cb55e6
12 changed files with 55 additions and 104 deletions

View file

@ -92,19 +92,21 @@ object Analytics extends Logging {
minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
val startTime = System.currentTimeMillis
logInfo("GRAPHX: starting tasks")
logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
logInfo("GRAPHX: Number of edges " + graph.edges.count)
println("GRAPHX: starting tasks")
println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
//val pr = Analytics.pagerank(graph, numIter)
val pr = if(isDynamic) PageRank.runUntillConvergence(graph, tol, numIter)
else PageRank.run(graph, numIter)
logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) )
println("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) )
if (!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname)
pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
println("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
Thread.sleep(100000)
sc.stop()
}

View file

@ -13,7 +13,6 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[AggregationMsg[Object]])
kryo.register(classOf[(Vid, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])

View file

@ -1,9 +1,6 @@
package org.apache.spark.graph
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graph.impl.GraphImpl
object GraphLoader {

View file

@ -1,7 +1,5 @@
package org.apache.spark.graph
import org.apache.spark.rdd.RDD
/**
* This object implements a Pregel-like bulk-synchronous

View file

@ -21,10 +21,9 @@ import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.graph.impl.MsgRDDFunctions
import org.apache.spark.graph.impl.VertexPartition
import org.apache.spark.util.ClosureCleaner
/**
@ -269,11 +268,10 @@ class VertexRDD[@specialized VD: ClassManifest](
}
}
def aggregateUsingIndex[VD2: ClassManifest, VidVDPair <: Product2[Vid, VD2] : ClassManifest](
messages: RDD[VidVDPair], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
def aggregateUsingIndex[VD2: ClassManifest](
messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
{
// TODO: use specialized shuffle serializer.
val shuffled = new ShuffledRDD[Vid, VD2, VidVDPair](messages, this.partitioner.get)
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
val vertextPartition: VertexPartition[VD] = thisIter.next()
Iterator(vertextPartition.aggregateUsingIndex(msgIter, reduceFunc))

View file

@ -5,19 +5,16 @@ import org.apache.spark.graph._
object ConnectedComponents {
/**
* Compute the connected component membership of each vertex and
* return an RDD with the vertex value containing the lowest vertex
* id in the connected component containing that vertex.
* Compute the connected component membership of each vertex and return an RDD with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @tparam VD the vertex attribute type (discarded in the
* computation)
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
*
* @param graph the graph for which to compute the connected
* components
* @param graph the graph for which to compute the connected components
*
* @return a graph with vertex attributes containing the smallest
* vertex in each connected component
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[Vid, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }

View file

@ -1,7 +1,5 @@
package org.apache.spark.graph.impl
import scala.collection.mutable
import org.apache.spark.graph._
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
@ -40,21 +38,4 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
pos += 1
triplet
}
// TODO: Why do we need this?
override def toList: List[EdgeTriplet[VD, ED]] = {
val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
val currentEdge = new EdgeTriplet[VD, ED]
for (i <- 0 until edgePartition.size) {
currentEdge.srcId = edgePartition.srcIds(i)
// assert(vmap.containsKey(e.src.id))
currentEdge.srcAttr = vmap(currentEdge.srcId)
currentEdge.dstId = edgePartition.dstIds(i)
// assert(vmap.containsKey(e.dst.id))
currentEdge.dstAttr = vmap(currentEdge.dstId)
currentEdge.attr = edgePartition.data(i)
lb += currentEdge
}
lb.toList
}
}

View file

@ -1,7 +1,7 @@
package org.apache.spark.graph.impl
import org.apache.spark.SparkContext._
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
@ -213,7 +213,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// in the relevant position in an edge.
val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
val vs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
val vs = vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
// Map and combine.
val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) =>
@ -254,9 +254,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
}
// construct an iterator of tuples. Iterator[(Vid, A)]
msgBS.iterator.map { ind =>
new AggregationMsg[A](vidToIndex.getValue(ind), msgArray(ind))
}
msgBS.iterator.map { ind => (vidToIndex.getValue(ind), msgArray(ind)) }
}
// do the final reduction reusing the index map
@ -336,16 +334,16 @@ object GraphImpl {
*/
protected def createETable[ED: ClassManifest](
edges: RDD[Edge[ED]],
partitionStrategy: PartitionStrategy): EdgeRDD[ED] = {
// Get the number of partitions
val numPartitions = edges.partitions.size
partitionStrategy: PartitionStrategy): EdgeRDD[ED] = {
// Get the number of partitions
val numPartitions = edges.partitions.size
val eTable = edges.map { e =>
val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
val eTable = edges.map { e =>
val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
// Should we be using 3-tuple or an optimized class
new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
}
// Should we be using 3-tuple or an optimized class
new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex( { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]

View file

@ -19,17 +19,6 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
}
class AggregationMsg[@specialized(Int, Long, Double, Boolean) T](var vid: Vid, var data: T)
extends Product2[Vid, T] {
override def _1 = vid
override def _2 = data
override def canEqual(that: Any): Boolean = that.isInstanceOf[AggregationMsg[_]]
}
/**
* A message used to send a specific value to a partition.
* @param partition index of the target partition.
@ -65,23 +54,6 @@ class VertexBroadcastMsgRDDFunctions[T: ClassManifest](self: RDD[VertexBroadcast
}
class AggregationMessageRDDFunctions[T: ClassManifest](self: RDD[AggregationMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[AggregationMsg[T]] = {
val rdd = new ShuffledRDD[Vid, T, AggregationMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classManifest[T] == ClassManifest.Int) {
rdd.setSerializer(classOf[IntAggMsgSerializer].getName)
} else if (classManifest[T] == ClassManifest.Long) {
rdd.setSerializer(classOf[LongAggMsgSerializer].getName)
} else if (classManifest[T] == ClassManifest.Double) {
rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName)
}
rdd
}
}
class MsgRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) {
/**
@ -103,7 +75,17 @@ object MsgRDDFunctions {
new VertexBroadcastMsgRDDFunctions(rdd)
}
implicit def rdd2aggMessageRDDFunctions[T: ClassManifest](rdd: RDD[AggregationMsg[T]]) = {
new AggregationMessageRDDFunctions(rdd)
def partitionForAggregation[T: ClassManifest](msgs: RDD[(Vid, T)], partitioner: Partitioner) = {
val rdd = new ShuffledRDD[Vid, T, (Vid, T)](msgs, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classManifest[T] == ClassManifest.Int) {
rdd.setSerializer(classOf[IntAggMsgSerializer].getName)
} else if (classManifest[T] == ClassManifest.Long) {
rdd.setSerializer(classOf[LongAggMsgSerializer].getName)
} else if (classManifest[T] == ClassManifest.Double) {
rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName)
}
rdd
}
}

View file

@ -101,9 +101,9 @@ class IntAggMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Int]]
writeLong(msg.vid)
writeUnsignedVarInt(msg.data)
val msg = t.asInstanceOf[(Vid, Int)]
writeLong(msg._1)
writeUnsignedVarInt(msg._2)
this
}
}
@ -112,7 +112,7 @@ class IntAggMsgSerializer extends Serializer {
override def readObject[T](): T = {
val a = readLong()
val b = readUnsignedVarInt()
new AggregationMsg[Int](a, b).asInstanceOf[T]
(a, b).asInstanceOf[T]
}
}
}
@ -124,9 +124,9 @@ class LongAggMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Long]]
writeVarLong(msg.vid, optimizePositive = false)
writeVarLong(msg.data, optimizePositive = true)
val msg = t.asInstanceOf[(Vid, Long)]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg._2, optimizePositive = true)
this
}
}
@ -135,7 +135,7 @@ class LongAggMsgSerializer extends Serializer {
override def readObject[T](): T = {
val a = readVarLong(optimizePositive = false)
val b = readVarLong(optimizePositive = true)
new AggregationMsg[Long](a, b).asInstanceOf[T]
(a, b).asInstanceOf[T]
}
}
}
@ -148,9 +148,9 @@ class DoubleAggMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Double]]
writeVarLong(msg.vid, optimizePositive = false)
writeDouble(msg.data)
val msg = t.asInstanceOf[(Vid, Double)]
writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg._2)
this
}
}
@ -159,7 +159,7 @@ class DoubleAggMsgSerializer extends Serializer {
def readObject[T](): T = {
val a = readVarLong(optimizePositive = false)
val b = readDouble()
new AggregationMsg[Double](a, b).asInstanceOf[T]
(a, b).asInstanceOf[T]
}
}
}

View file

@ -2,7 +2,7 @@ package org.apache.spark.graph.impl
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.{OpenHashSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.graph._

View file

@ -1,10 +1,9 @@
package org.apache.spark.graph.impl
import org.apache.spark.SparkContext._
import org.apache.spark.graph._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graph._
import org.apache.spark.util.collection.PrimitiveVector
/**
@ -41,7 +40,7 @@ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
val numEdges = edgePartition.size
val vSet = new VertexSet
if (includeSrcAttr) { // Add src vertices to the set.
var i = 0
var i = 0
while (i < numEdges) {
vSet.add(edgePartition.srcIds(i))
i += 1