diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index baf7291474..0ba39d8d80 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -381,12 +381,13 @@ object GraphImpl { } + /** + * @todo(crankshaw) how does this effect load balancing? + */ protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, numParts: Pid, ceilSqrtNumParts: Pid): Pid = { val mixingPrime: Vid = 1125899906842597L - // Partitions by canonical edge direction - // @todo(crankshaw) evaluate the cases val src = math.min(srcOrig, dstOrig) val dst = math.max(srcOrig, dstOrig) val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt @@ -411,7 +412,8 @@ object GraphImpl { .map { e => // Random partitioning based on the source vertex id. // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions) - val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) + //val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) + val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) // Should we be using 3-tuple or an optimized class (part, (e.src, e.dst, e.data))