[SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions
See https://github.com/aray/e2d/blob/master/EdgePartition2D.ipynb Author: Andrew Ray <ray.andrew@gmail.com> Closes #7104 from aray/edge-partition-2d-improvement and squashes the following commits: 3729f84 [Andrew Ray] correct bounds and remove unneeded comments 97f8464 [Andrew Ray] change less 5141ab4 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement 925fd2c [Andrew Ray] use new interface for partitioning 001bfd0 [Andrew Ray] Refactor PartitionStrategy so that we can return a prtition function for a given number of parts. To keep compatibility we define default methods that translate between the two implementation options. Made EdgePartition2D use old strategy when we have a perfect square and implement new interface. 5d42105 [Andrew Ray] % -> / 3560084 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement f006364 [Andrew Ray] remove unneeded comments cfa2c5e [Andrew Ray] Modifications to EdgePartition2D so that it works for non perfect squares.
This commit is contained in:
parent
d267c2834a
commit
0a4071eab3
|
@ -32,7 +32,7 @@ trait PartitionStrategy extends Serializable {
|
|||
object PartitionStrategy {
|
||||
/**
|
||||
* Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
|
||||
* guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication.
|
||||
* guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
|
||||
*
|
||||
* Suppose we have a graph with 12 vertices that we want to partition
|
||||
* over 9 machines. We can use the following sparse matrix representation:
|
||||
|
@ -61,26 +61,36 @@ object PartitionStrategy {
|
|||
* that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3,
|
||||
* P6)` or the last
|
||||
* row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
|
||||
* replicated to at most `2 * sqrt(numParts) - 1` machines.
|
||||
* replicated to at most `2 * sqrt(numParts)` machines.
|
||||
*
|
||||
* Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
|
||||
* balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
|
||||
* vertex locations.
|
||||
*
|
||||
* One of the limitations of this approach is that the number of machines must either be a
|
||||
* perfect square. We partially address this limitation by computing the machine assignment to
|
||||
* the next
|
||||
* largest perfect square and then mapping back down to the actual number of machines.
|
||||
* Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect
|
||||
* square is used.
|
||||
* When the number of partitions requested is not a perfect square we use a slightly different
|
||||
* method where the last column can have a different number of rows than the others while still
|
||||
* maintaining the same size per block.
|
||||
*/
|
||||
case object EdgePartition2D extends PartitionStrategy {
|
||||
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
|
||||
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
|
||||
val mixingPrime: VertexId = 1125899906842597L
|
||||
val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
|
||||
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
|
||||
(col * ceilSqrtNumParts + row) % numParts
|
||||
if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
|
||||
// Use old method for perfect squared to ensure we get same results
|
||||
val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
|
||||
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
|
||||
(col * ceilSqrtNumParts + row) % numParts
|
||||
|
||||
} else {
|
||||
// Otherwise use new method
|
||||
val cols = ceilSqrtNumParts
|
||||
val rows = (numParts + cols - 1) / cols
|
||||
val lastColRows = numParts - rows * (cols - 1)
|
||||
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
|
||||
val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
|
||||
col * rows + row
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue