[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

## What changes were proposed in this pull request?

DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build.

There were multiple issues with the test:

1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout:

```
eventually(timeout(10.seconds), interval(1.millis)) {
  assert(DataFrameRangeSuite.stageToKill > 0)
}
```

2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait.

This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for synhronization.

## How was this patch tested?

Existing unit test.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20888 from gaborgsomogyi/SPARK-23775.
This commit is contained in:
Gabor Somogyi 2018-04-18 16:37:41 -07:00 committed by Marcelo Vanzin
parent a9066478f6
commit 0c94e48bc5

View file

@ -17,14 +17,16 @@
package org.apache.spark.sql package org.apache.spark.sql
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.math.abs import scala.math.abs
import scala.util.Random import scala.util.Random
import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually
import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SharedSQLContext
@ -152,39 +154,53 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
} }
test("Cancelling stage in a query with Range.") { test("Cancelling stage in a query with Range.") {
val listener = new SparkListener { // Save and restore the value because SparkContext is shared
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { val savedInterruptOnCancel = sparkContext
eventually(timeout(10.seconds), interval(1.millis)) { .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
assert(DataFrameRangeSuite.stageToKill > 0)
}
sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
}
}
sparkContext.addSparkListener(listener) try {
for (codegen <- Seq(true, false)) { sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
DataFrameRangeSuite.stageToKill = -1 for (codegen <- Seq(true, false)) {
val ex = intercept[SparkException] { // This countdown latch used to make sure with all the stages cancelStage called in listener
spark.range(0, 100000000000L, 1, 1).map { x => val latch = new CountDownLatch(2)
DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
x val listener = new SparkListener {
}.toDF("id").agg(sum("id")).collect() override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
sparkContext.cancelStage(taskStart.stageId)
latch.countDown()
}
} }
ex.getCause() match {
case null => sparkContext.addSparkListener(listener)
assert(ex.getMessage().contains("cancelled")) withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
case cause: SparkException => val ex = intercept[SparkException] {
assert(cause.getMessage().contains("cancelled")) sparkContext.range(0, 10000L, numSlices = 10).mapPartitions { x =>
case cause: Throwable => x.synchronized {
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.") x.wait()
}
x
}.toDF("id").agg(sum("id")).collect()
}
ex.getCause() match {
case null =>
assert(ex.getMessage().contains("cancelled"))
case cause: SparkException =>
assert(cause.getMessage().contains("cancelled"))
case cause: Throwable =>
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
}
} }
latch.await(20, TimeUnit.SECONDS)
eventually(timeout(20.seconds)) {
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
}
sparkContext.removeSparkListener(listener)
} }
eventually(timeout(20.seconds)) { } finally {
assert(sparkContext.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
} savedInterruptOnCancel)
} }
sparkContext.removeSparkListener(listener)
} }
test("SPARK-20430 Initialize Range parameters in a driver side") { test("SPARK-20430 Initialize Range parameters in a driver side") {
@ -204,7 +220,3 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
} }
} }
} }
object DataFrameRangeSuite {
@volatile var stageToKill = -1
}