Enable repartitioning of graph over different number of partitions

It is currently very difficult to repartition a graph over a different number of partitions.  This PR adds an additional `partitionBy` function that takes the number of partitions.

Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>

Closes #719 from jegonzal/graph_partitioning_options and squashes the following commits:

730b405 [Joseph E. Gonzalez] adding an additional number of partitions option to partitionBy
This commit is contained in:
Joseph E. Gonzalez 2014-06-03 20:49:14 -07:00 committed by Ankur Dave
parent e8d93ee528
commit 5284ca78d1
3 changed files with 20 additions and 4 deletions

View file

@ -106,9 +106,19 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
/**
* Repartitions the edges in the graph according to `partitionStrategy`.
*
* @param the partitioning strategy to use when partitioning the edges in the graph.
*/
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
/**
* Repartitions the edges in the graph according to `partitionStrategy`.
*
* @param the partitioning strategy to use when partitioning the edges in the graph.
* @param numPartitions the number of edge partitions in the new graph.
*/
def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
/**
* Transforms each vertex attribute in the graph using the map function.
*

View file

@ -114,9 +114,11 @@ object PartitionStrategy {
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
if (src < dst) {
math.abs((src, dst).hashCode()) % numParts
} else {
math.abs((dst, src).hashCode()) % numParts
}
}
}

View file

@ -74,7 +74,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
val numPartitions = edges.partitions.size
partitionBy(partitionStrategy, edges.partitions.size)
}
override def partitionBy(
partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
val edTag = classTag[ED]
val vdTag = classTag[VD]
val newEdges = edges.withPartitionsRDD(edges.map { e =>