diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index bdf79bf9f0..d7cabce34f 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -270,8 +270,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( .toList // groups all ETs in this partition that have the same src and dst // Because all ETs with the same src and dst will live on the same - // partition due to the EdgePartitioner, this guarantees that these - // ET groups will be complete. + // partition due to the canonicalRandomVertexCut partitioner, this + // guarantees that these ET groups will be complete. .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) } .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) } .toList @@ -447,7 +447,6 @@ object GraphImpl { // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions) // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions) - //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) // Should we be using 3-tuple or an optimized class MessageToPartition(part, (e.srcId, e.dstId, e.attr)) @@ -622,20 +621,15 @@ object GraphImpl { math.abs((src, dst).hashCode()) % numParts } - /** - * @todo This will only partition edges to the upper diagonal - * of the 2D processor space. + * Assign edges to an arbitrary machine corresponding to a random vertex cut. This + * function ensures that edges of opposite direction between the same two vertices + * will end up on the same partition. */ - protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, - numParts: Pid, ceilSqrtNumParts: Pid): Pid = { - val mixingPrime: Vid = 1125899906842597L - // Partitions by canonical edge direction - val src = math.min(srcOrig, dstOrig) - val dst = math.max(srcOrig, dstOrig) - val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt - val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts + protected def canonicalRandomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = { + val lower = math.min(src, dst) + val higher = math.max(src, dst) + math.abs((lower, higher).hashCode()) % numParts } } // end of object GraphImpl