[SPARK-20333] HashPartitioner should be compatible with num of child RDD's partitions.

## What changes were proposed in this pull request?

Fix test
"don't submit stage until its dependencies map outputs are registered (SPARK-5259)" ,
"run trivial shuffle with out-of-band executor failure and retry",
"reduce tasks should be placed locally with map output"
in DAGSchedulerSuite.

Author: jinxing <jinxing6042@126.com>

Closes #17634 from jinxing64/SPARK-20333.
This commit is contained in:
jinxing 2017-05-30 14:02:33 -05:00 committed by Imran Rashid
parent 4d57981cfb
commit de953c214c

View file

@ -1277,10 +1277,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
*/
test("don't submit stage until its dependencies map outputs are registered (SPARK-5259)") {
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2))
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
@ -1583,7 +1583,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
*/
test("run trivial shuffle with out-of-band executor failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
@ -1791,7 +1791,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
test("reduce tasks should be placed locally with map output") {
// Create a shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))