[SPARK-31730][CORE][TEST] Fix flaky tests in BarrierTaskContextSuite

### What changes were proposed in this pull request?

To wait until all the executors have started before submitting any job. This could avoid the flakiness caused by waiting for executors coming up.

### How was this patch tested?

Existing tests.

Closes #28584 from jiangxb1987/barrierTest.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
This commit is contained in:
Xingbo Jiang 2020-05-27 16:37:02 -07:00
parent d19b173b47
commit efe7fd2b6b

View file

@ -25,6 +25,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._ import org.scalatest.time.SpanSugar._
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually { class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
@ -37,10 +38,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
.setAppName("test-cluster") .setAppName("test-cluster")
.set(TEST_NO_STAGE_RETRY, true) .set(TEST_NO_STAGE_RETRY, true)
sc = new SparkContext(conf) sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000)
} }
// TODO (SPARK-31730): re-enable it test("global sync by barrier() call") {
ignore("global sync by barrier() call") {
initLocalClusterSparkContext() initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4) val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it => val rdd2 = rdd.barrier().mapPartitions { it =>
@ -57,10 +58,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
} }
test("share messages with allGather() call") { test("share messages with allGather() call") {
val conf = new SparkConf() initLocalClusterSparkContext()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10, 4) val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it => val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get() val context = BarrierTaskContext.get()
@ -78,10 +76,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
} }
test("throw exception if we attempt to synchronize with different blocking calls") { test("throw exception if we attempt to synchronize with different blocking calls") {
val conf = new SparkConf() initLocalClusterSparkContext()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10, 4) val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it => val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get() val context = BarrierTaskContext.get()
@ -100,10 +95,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
} }
test("successively sync with allGather and barrier") { test("successively sync with allGather and barrier") {
val conf = new SparkConf() initLocalClusterSparkContext()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10, 4) val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it => val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get() val context = BarrierTaskContext.get()
@ -129,8 +121,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
assert(times2.max - times2.min <= 1000) assert(times2.max - times2.min <= 1000)
} }
// TODO (SPARK-31730): re-enable it test("support multiple barrier() call within a single task") {
ignore("support multiple barrier() call within a single task") {
initLocalClusterSparkContext() initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4) val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it => val rdd2 = rdd.barrier().mapPartitions { it =>
@ -285,6 +276,9 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
test("SPARK-31485: barrier stage should fail if only partial tasks are launched") { test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
initLocalClusterSparkContext(2) initLocalClusterSparkContext(2)
// It's required to reset the delay timer when a task is scheduled, otherwise all the tasks
// could get scheduled at ANY level.
sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2) val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0) val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for // set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for