Edge partitioner now partitions by canonical edge so all edges between two vertices (in either direction) will be sent to same machine.

This commit is contained in:
Dan Crankshaw 2013-10-05 16:04:57 -07:00
parent e096cbe90e
commit bfedbee13a

View file

@ -381,12 +381,13 @@ object GraphImpl {
} }
/**
* @todo(crankshaw) how does this effect load balancing?
*/
protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
numParts: Pid, ceilSqrtNumParts: Pid): Pid = { numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L val mixingPrime: Vid = 1125899906842597L
// Partitions by canonical edge direction // Partitions by canonical edge direction
// @todo(crankshaw) evaluate the cases
val src = math.min(srcOrig, dstOrig) val src = math.min(srcOrig, dstOrig)
val dst = math.max(srcOrig, dstOrig) val dst = math.max(srcOrig, dstOrig)
val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
@ -411,7 +412,8 @@ object GraphImpl {
.map { e => .map { e =>
// Random partitioning based on the source vertex id. // Random partitioning based on the source vertex id.
// val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions) // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions)
val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) //val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
// Should we be using 3-tuple or an optimized class // Should we be using 3-tuple or an optimized class
(part, (e.src, e.dst, e.data)) (part, (e.src, e.dst, e.data))