From 4f916f5302ef56cf0f53a8f214602623ccdae841 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 7 Oct 2013 11:31:00 -0700 Subject: [PATCH 1/2] Created a MessageToPartition class to send messages without saving the partition id. --- .../apache/spark/graph/impl/GraphImpl.scala | 22 +++++---- .../spark/graph/impl/MessageToPartition.scala | 49 +++++++++++++++++++ 2 files changed, 61 insertions(+), 10 deletions(-) create mode 100644 graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index e397293a3d..fb7698f915 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -11,9 +11,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.graph._ import org.apache.spark.graph.impl.GraphImpl._ - - - +import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._ /** @@ -309,11 +307,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get // the shuffle id so we can use it on the slave. vTable - .flatMap { case (vid, (vdata, pids)) => pids.iterator.map { pid => (pid, (vid, vdata)) } } + .flatMap { case (vid, (vdata, pids)) => + pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } + } .partitionBy(edgePartitioner) - .mapPartitions( - { part => part.map { case(pid, (vid, vdata)) => (vid, vdata) } }, - preservesPartitioning = true) + .mapPartitions({ part => + part.map { message => (message.data._1, message.data._2) } + }, preservesPartitioning = true) } } @@ -400,14 +400,16 @@ object GraphImpl { val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) // Should we be using 3-tuple or an optimized class - (part, (e.src, e.dst, e.data)) + MessageToPartition(part, (e.src, e.dst, e.data)) // (math.abs(e.src) % numPartitions, (e.src, e.dst, e.data)) - } .partitionBy(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex({ (pid, iter) => val edgePartition = new EdgePartition[ED] - iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) } + iter.foreach { message => + val data = message.data + edgePartition.add(data._1, data._2, data._3) + } edgePartition.trim() Iterator((pid, edgePartition)) }, preservesPartitioning = true) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala new file mode 100644 index 0000000000..b7bbf257a4 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala @@ -0,0 +1,49 @@ +package org.apache.spark.graph.impl + +import org.apache.spark.Partitioner +import org.apache.spark.graph.Pid +import org.apache.spark.rdd.{ShuffledRDD, RDD} + + +/** + * A message used to send a specific value to a partition. + * @param partition index of the target partition. + * @param data value to send + */ +class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T]( + @transient var partition: Pid, + var data: T) + extends Product2[Pid, T] { + + override def _1 = partition + + override def _2 = data + + override def canEqual(that: Any): Boolean = that.isInstanceOf[MessageToPartition[_]] +} + +/** + * Companion object for MessageToPartition. + */ +object MessageToPartition { + def apply[T](partition: Pid, value: T) = new MessageToPartition(partition, value) +} + + +class MessageToPartitionRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) { + + /** + * Return a copy of the RDD partitioned using the specified partitioner. + */ + def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = { + new ShuffledRDD[Pid, T, MessageToPartition[T]](self, partitioner) + } + +} + + +object MessageToPartitionRDDFunctions { + implicit def rdd2PartitionRDDFunctions[T: ClassManifest](rdd: RDD[MessageToPartition[T]]) = { + new MessageToPartitionRDDFunctions(rdd) + } +} From 5218e46178030c05306ea2dcecc1736f5f796d7c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 7 Oct 2013 11:48:50 -0700 Subject: [PATCH 2/2] Updated Kryo registration. --- .../apache/spark/graph/GraphKryoRegistrator.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index 13a22f9051..297506e1e3 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -2,23 +2,19 @@ package org.apache.spark.graph import com.esotericsoftware.kryo.Kryo +import org.apache.spark.graph.impl.MessageToPartition import org.apache.spark.serializer.KryoRegistrator class GraphKryoRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo) { - //kryo.register(classOf[(Int, Float, Float)]) - registerClass[Int, Int, Int](kryo) + kryo.register(classOf[Vertex[Object]]) + kryo.register(classOf[Edge[Object]]) + kryo.register(classOf[MutableTuple2[Object, Object]]) + kryo.register(classOf[MessageToPartition[Object]]) // This avoids a large number of hash table lookups. kryo.setReferences(false) } - - private def registerClass[VD: Manifest, ED: Manifest, VD2: Manifest](kryo: Kryo) { - kryo.register(classOf[Vertex[VD]]) - kryo.register(classOf[Edge[ED]]) - kryo.register(classOf[MutableTuple2[VD, VD2]]) - kryo.register(classOf[(Vid, VD2)]) - } }