[SPARK-23222][SQL] Make DataFrameRangeSuite not flaky
## What changes were proposed in this pull request? It is reported that the test `Cancelling stage in a query with Range` in `DataFrameRangeSuite` fails a few times in unrelated PRs. I personally also saw it too in my PR. This test is not very flaky actually but only fails occasionally. Based on how the test works, I guess that is because `range` finishes before the listener calls `cancelStage`. I increase the range number from `1000000000L` to `100000000000L` and count the range in one partition. I also reduce the `interval` of checking stage id. Hopefully it can make the test not flaky anymore. ## How was this patch tested? The modified tests. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #20431 from viirya/SPARK-23222.
This commit is contained in:
parent
7a2ada223e
commit
84bcf9dc88
|
@ -154,7 +154,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
|
||||||
test("Cancelling stage in a query with Range.") {
|
test("Cancelling stage in a query with Range.") {
|
||||||
val listener = new SparkListener {
|
val listener = new SparkListener {
|
||||||
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
|
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
|
||||||
eventually(timeout(10.seconds)) {
|
eventually(timeout(10.seconds), interval(1.millis)) {
|
||||||
assert(DataFrameRangeSuite.stageToKill > 0)
|
assert(DataFrameRangeSuite.stageToKill > 0)
|
||||||
}
|
}
|
||||||
sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
|
sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
|
||||||
|
@ -166,7 +166,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
|
||||||
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
|
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
|
||||||
DataFrameRangeSuite.stageToKill = -1
|
DataFrameRangeSuite.stageToKill = -1
|
||||||
val ex = intercept[SparkException] {
|
val ex = intercept[SparkException] {
|
||||||
spark.range(1000000000L).map { x =>
|
spark.range(0, 100000000000L, 1, 1).map { x =>
|
||||||
DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
|
DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
|
||||||
x
|
x
|
||||||
}.toDF("id").agg(sum("id")).collect()
|
}.toDF("id").agg(sum("id")).collect()
|
||||||
|
@ -184,6 +184,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
|
||||||
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
|
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sparkContext.removeSparkListener(listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-20430 Initialize Range parameters in a driver side") {
|
test("SPARK-20430 Initialize Range parameters in a driver side") {
|
||||||
|
|
Loading…
Reference in a new issue