Fixed issue with canonical edge partitioner.

This commit is contained in:
Dan Crankshaw 2013-10-30 15:03:21 -07:00 committed by Ankur Dave
parent 143c01dbd6
commit 8d8056da14
2 changed files with 10 additions and 14 deletions

View file

@ -1,6 +1,7 @@
package org.apache.spark.graph
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClosureCleaner
import org.apache.spark.storage.StorageLevel
/**

View file

@ -253,8 +253,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
.toList
// groups all ETs in this partition that have the same src and dst
// Because all ETs with the same src and dst will live on the same
// partition due to the EdgePartitioner, this guarantees that these
// ET groups will be complete.
// partition due to the canonicalRandomVertexCut partitioner, this
// guarantees that these ET groups will be complete.
.groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) }
.mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
.toList
@ -357,7 +357,6 @@ object GraphImpl {
// val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
// val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions)
//val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
// Should we be using 3-tuple or an optimized class
new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
@ -555,18 +554,14 @@ object GraphImpl {
}
/**
* @todo This will only partition edges to the upper diagonal
* of the 2D processor space.
* Assign edges to an arbitrary machine corresponding to a random vertex cut. This
* function ensures that edges of opposite direction between the same two vertices
* will end up on the same partition.
*/
protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L
// Partitions by canonical edge direction
val src = math.min(srcOrig, dstOrig)
val dst = math.max(srcOrig, dstOrig)
val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
protected def canonicalRandomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
}
private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](