[SPARK-22764][CORE] Fix flakiness in SparkContextSuite.

Use a semaphore to synchronize the tasks with the listener code
that is trying to cancel the job or stage, so that the listener
won't try to cancel a job or stage that has already finished.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19956 from vanzin/SPARK-22764.
This commit is contained in:
Marcelo Vanzin 2017-12-13 16:06:16 -06:00 committed by Imran Rashid
parent ba0e79f57c
commit a83e8e6c22

View file

@ -20,7 +20,7 @@ package org.apache.spark
import java.io.File import java.io.File
import java.net.{MalformedURLException, URI} import java.net.{MalformedURLException, URI}
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit import java.util.concurrent.{Semaphore, TimeUnit}
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -499,6 +499,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("Cancelling stages/jobs with custom reasons.") { test("Cancelling stages/jobs with custom reasons.") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val REASON = "You shall not pass" val REASON = "You shall not pass"
val slices = 10
val listener = new SparkListener { val listener = new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
@ -508,6 +509,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
} }
sc.cancelStage(taskStart.stageId, REASON) sc.cancelStage(taskStart.stageId, REASON)
SparkContextSuite.cancelStage = false SparkContextSuite.cancelStage = false
SparkContextSuite.semaphore.release(slices)
} }
} }
@ -518,21 +520,25 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
} }
sc.cancelJob(jobStart.jobId, REASON) sc.cancelJob(jobStart.jobId, REASON)
SparkContextSuite.cancelJob = false SparkContextSuite.cancelJob = false
SparkContextSuite.semaphore.release(slices)
} }
} }
} }
sc.addSparkListener(listener) sc.addSparkListener(listener)
for (cancelWhat <- Seq("stage", "job")) { for (cancelWhat <- Seq("stage", "job")) {
SparkContextSuite.semaphore.drainPermits()
SparkContextSuite.isTaskStarted = false SparkContextSuite.isTaskStarted = false
SparkContextSuite.cancelStage = (cancelWhat == "stage") SparkContextSuite.cancelStage = (cancelWhat == "stage")
SparkContextSuite.cancelJob = (cancelWhat == "job") SparkContextSuite.cancelJob = (cancelWhat == "job")
val ex = intercept[SparkException] { val ex = intercept[SparkException] {
sc.range(0, 10000L).mapPartitions { x => sc.range(0, 10000L, numSlices = slices).mapPartitions { x =>
org.apache.spark.SparkContextSuite.isTaskStarted = true SparkContextSuite.isTaskStarted = true
// Block waiting for the listener to cancel the stage or job.
SparkContextSuite.semaphore.acquire()
x x
}.cartesian(sc.range(0, 10L))count() }.count()
} }
ex.getCause() match { ex.getCause() match {
@ -636,4 +642,5 @@ object SparkContextSuite {
@volatile var isTaskStarted = false @volatile var isTaskStarted = false
@volatile var taskKilled = false @volatile var taskKilled = false
@volatile var taskSucceeded = false @volatile var taskSucceeded = false
val semaphore = new Semaphore(0)
} }