diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index e397293a3d..baf7291474 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -361,7 +361,7 @@ object GraphImpl { * Notice that P0 has many edges and as a consequence this * partitioning would lead to poor work balance. To improve * balance we first multiply each vertex id by a large prime - * to effectively suffle the vertex locations. + * to effectively shuffle the vertex locations. * * One of the limitations of this approach is that the number of * machines must either be a perfect square. We partially address @@ -381,6 +381,20 @@ object GraphImpl { } + protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, + numParts: Pid, ceilSqrtNumParts: Pid): Pid = { + val mixingPrime: Vid = 1125899906842597L + + // Partitions by canonical edge direction + // @todo(crankshaw) evaluate the cases + val src = math.min(srcOrig, dstOrig) + val dst = math.max(srcOrig, dstOrig) + val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt + val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt + (col * ceilSqrtNumParts + row) % numParts + } + + /** * Create the edge table RDD, which is much more efficient for Java heap storage than the * normal edges data structure (RDD[(Vid, Vid, ED)]).