commit
4b46d519db
|
@ -2,23 +2,19 @@ package org.apache.spark.graph
|
||||||
|
|
||||||
import com.esotericsoftware.kryo.Kryo
|
import com.esotericsoftware.kryo.Kryo
|
||||||
|
|
||||||
|
import org.apache.spark.graph.impl.MessageToPartition
|
||||||
import org.apache.spark.serializer.KryoRegistrator
|
import org.apache.spark.serializer.KryoRegistrator
|
||||||
|
|
||||||
|
|
||||||
class GraphKryoRegistrator extends KryoRegistrator {
|
class GraphKryoRegistrator extends KryoRegistrator {
|
||||||
|
|
||||||
def registerClasses(kryo: Kryo) {
|
def registerClasses(kryo: Kryo) {
|
||||||
//kryo.register(classOf[(Int, Float, Float)])
|
kryo.register(classOf[Vertex[Object]])
|
||||||
registerClass[Int, Int, Int](kryo)
|
kryo.register(classOf[Edge[Object]])
|
||||||
|
kryo.register(classOf[MutableTuple2[Object, Object]])
|
||||||
|
kryo.register(classOf[MessageToPartition[Object]])
|
||||||
|
|
||||||
// This avoids a large number of hash table lookups.
|
// This avoids a large number of hash table lookups.
|
||||||
kryo.setReferences(false)
|
kryo.setReferences(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def registerClass[VD: Manifest, ED: Manifest, VD2: Manifest](kryo: Kryo) {
|
|
||||||
kryo.register(classOf[Vertex[VD]])
|
|
||||||
kryo.register(classOf[Edge[ED]])
|
|
||||||
kryo.register(classOf[MutableTuple2[VD, VD2]])
|
|
||||||
kryo.register(classOf[(Vid, VD2)])
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,9 +11,7 @@ import org.apache.spark.rdd.RDD
|
||||||
|
|
||||||
import org.apache.spark.graph._
|
import org.apache.spark.graph._
|
||||||
import org.apache.spark.graph.impl.GraphImpl._
|
import org.apache.spark.graph.impl.GraphImpl._
|
||||||
|
import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -309,11 +307,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
|
||||||
// Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get
|
// Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get
|
||||||
// the shuffle id so we can use it on the slave.
|
// the shuffle id so we can use it on the slave.
|
||||||
vTable
|
vTable
|
||||||
.flatMap { case (vid, (vdata, pids)) => pids.iterator.map { pid => (pid, (vid, vdata)) } }
|
.flatMap { case (vid, (vdata, pids)) =>
|
||||||
|
pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
|
||||||
|
}
|
||||||
.partitionBy(edgePartitioner)
|
.partitionBy(edgePartitioner)
|
||||||
.mapPartitions(
|
.mapPartitions({ part =>
|
||||||
{ part => part.map { case(pid, (vid, vdata)) => (vid, vdata) } },
|
part.map { message => (message.data._1, message.data._2) }
|
||||||
preservesPartitioning = true)
|
}, preservesPartitioning = true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -400,14 +400,16 @@ object GraphImpl {
|
||||||
val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
|
val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
|
||||||
|
|
||||||
// Should we be using 3-tuple or an optimized class
|
// Should we be using 3-tuple or an optimized class
|
||||||
(part, (e.src, e.dst, e.data))
|
MessageToPartition(part, (e.src, e.dst, e.data))
|
||||||
// (math.abs(e.src) % numPartitions, (e.src, e.dst, e.data))
|
// (math.abs(e.src) % numPartitions, (e.src, e.dst, e.data))
|
||||||
|
|
||||||
}
|
}
|
||||||
.partitionBy(new HashPartitioner(numPartitions))
|
.partitionBy(new HashPartitioner(numPartitions))
|
||||||
.mapPartitionsWithIndex({ (pid, iter) =>
|
.mapPartitionsWithIndex({ (pid, iter) =>
|
||||||
val edgePartition = new EdgePartition[ED]
|
val edgePartition = new EdgePartition[ED]
|
||||||
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
|
iter.foreach { message =>
|
||||||
|
val data = message.data
|
||||||
|
edgePartition.add(data._1, data._2, data._3)
|
||||||
|
}
|
||||||
edgePartition.trim()
|
edgePartition.trim()
|
||||||
Iterator((pid, edgePartition))
|
Iterator((pid, edgePartition))
|
||||||
}, preservesPartitioning = true)
|
}, preservesPartitioning = true)
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
package org.apache.spark.graph.impl
|
||||||
|
|
||||||
|
import org.apache.spark.Partitioner
|
||||||
|
import org.apache.spark.graph.Pid
|
||||||
|
import org.apache.spark.rdd.{ShuffledRDD, RDD}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A message used to send a specific value to a partition.
|
||||||
|
* @param partition index of the target partition.
|
||||||
|
* @param data value to send
|
||||||
|
*/
|
||||||
|
class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T](
|
||||||
|
@transient var partition: Pid,
|
||||||
|
var data: T)
|
||||||
|
extends Product2[Pid, T] {
|
||||||
|
|
||||||
|
override def _1 = partition
|
||||||
|
|
||||||
|
override def _2 = data
|
||||||
|
|
||||||
|
override def canEqual(that: Any): Boolean = that.isInstanceOf[MessageToPartition[_]]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Companion object for MessageToPartition.
|
||||||
|
*/
|
||||||
|
object MessageToPartition {
|
||||||
|
def apply[T](partition: Pid, value: T) = new MessageToPartition(partition, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class MessageToPartitionRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a copy of the RDD partitioned using the specified partitioner.
|
||||||
|
*/
|
||||||
|
def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = {
|
||||||
|
new ShuffledRDD[Pid, T, MessageToPartition[T]](self, partitioner)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
object MessageToPartitionRDDFunctions {
|
||||||
|
implicit def rdd2PartitionRDDFunctions[T: ClassManifest](rdd: RDD[MessageToPartition[T]]) = {
|
||||||
|
new MessageToPartitionRDDFunctions(rdd)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue