From bfedbee13a3c2355c307840d3f7548c2737d37bb Mon Sep 17 00:00:00 2001 From: Dan Crankshaw Date: Sat, 5 Oct 2013 16:04:57 -0700 Subject: [PATCH] Edge partitioner now partitions by canonical edge so all edges between two vertices (in either direction) will be sent to same machine. --- .../scala/org/apache/spark/graph/impl/GraphImpl.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index baf7291474..0ba39d8d80 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -381,12 +381,13 @@ object GraphImpl { } + /** + * @todo(crankshaw) how does this effect load balancing? + */ protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, numParts: Pid, ceilSqrtNumParts: Pid): Pid = { val mixingPrime: Vid = 1125899906842597L - // Partitions by canonical edge direction - // @todo(crankshaw) evaluate the cases val src = math.min(srcOrig, dstOrig) val dst = math.max(srcOrig, dstOrig) val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt @@ -411,7 +412,8 @@ object GraphImpl { .map { e => // Random partitioning based on the source vertex id. // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions) - val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) + //val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) + val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt) // Should we be using 3-tuple or an optimized class (part, (e.src, e.dst, e.data))