[SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions

If the users set “spark.default.parallelism” and the value is different with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions

Author: luluorta <luluorta@gmail.com>

Closes #1763 from luluorta/fix-graph-zip and squashes the following commits:

8338961 [luluorta] fix GraphX EdgeRDD zipPartitions
This commit is contained in:
luluorta 2014-09-02 19:25:52 -07:00 committed by Ankur Dave
parent e9bb12bea9
commit 9b225ac307
2 changed files with 18 additions and 2 deletions

View file

@ -19,7 +19,7 @@ package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag} import scala.reflect.{classTag, ClassTag}
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark._
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`. * partitioner that allows co-partitioning with `partitionsRDD`.
*/ */
override val partitioner = override val partitioner =
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size)))
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)

View file

@ -19,6 +19,7 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._ import org.apache.spark.graphx.PartitionStrategy._
@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
} }
} }
test("non-default number of edge partitions") {
val n = 10
val defaultParallelism = 3
val numEdgePartitions = 4
assert(defaultParallelism != numEdgePartitions)
val conf = new SparkConf()
.set("spark.default.parallelism", defaultParallelism.toString)
val sc = new SparkContext("local", "test", conf)
val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
numEdgePartitions)
val graph = Graph.fromEdgeTuples(edges, 1)
val neighborAttrSums = graph.mapReduceTriplets[Int](
et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
}
} }