From 84bcf9dc88ffeae6fba4cfad9455ad75bed6e6f6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 30 Jan 2018 21:00:29 +0800 Subject: [PATCH] [SPARK-23222][SQL] Make DataFrameRangeSuite not flaky ## What changes were proposed in this pull request? It is reported that the test `Cancelling stage in a query with Range` in `DataFrameRangeSuite` fails a few times in unrelated PRs. I personally also saw it too in my PR. This test is not very flaky actually but only fails occasionally. Based on how the test works, I guess that is because `range` finishes before the listener calls `cancelStage`. I increase the range number from `1000000000L` to `100000000000L` and count the range in one partition. I also reduce the `interval` of checking stage id. Hopefully it can make the test not flaky anymore. ## How was this patch tested? The modified tests. Author: Liang-Chi Hsieh Closes #20431 from viirya/SPARK-23222. --- .../scala/org/apache/spark/sql/DataFrameRangeSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 45afbd29d1..57a930dfaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -154,7 +154,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - eventually(timeout(10.seconds)) { + eventually(timeout(10.seconds), interval(1.millis)) { assert(DataFrameRangeSuite.stageToKill > 0) } sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) @@ -166,7 +166,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(1000000000L).map { x => + spark.range(0, 100000000000L, 1, 1).map { x => DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() x }.toDF("id").agg(sum("id")).collect() @@ -184,6 +184,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) } } + sparkContext.removeSparkListener(listener) } test("SPARK-20430 Initialize Range parameters in a driver side") {