Update Java api for setJobGroup with interruptOnCancel
Also adds a unit test. Author: Aaron Davidson <aaron@databricks.com> Closes #522 from aarondav/cancel2 and squashes the following commits: 565c253 [Aaron Davidson] Update Java api for setJobGroup with interruptOnCancel 65b33d8 [Aaron Davidson] Add unit test for Thread interruption on cancellation
This commit is contained in:
parent
4b2bab1d08
commit
d485eecb72
|
@ -570,6 +570,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
* // In a separate thread:
|
||||
* sc.cancelJobGroup("some_job_to_cancel");
|
||||
* }}}
|
||||
*
|
||||
* If interruptOnCancel is set to true for the job group, then job cancellation will result
|
||||
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
|
||||
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
|
||||
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
|
||||
*/
|
||||
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean): Unit =
|
||||
sc.setJobGroup(groupId, description, interruptOnCancel)
|
||||
|
||||
/**
|
||||
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
|
||||
* different value or cleared.
|
||||
*
|
||||
* @see `setJobGroup(groupId: String, description: String, interruptThread: Boolean)`.
|
||||
* This method sets interruptOnCancel to false.
|
||||
*/
|
||||
def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore
|
|||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.future
|
||||
|
||||
import org.scalatest.{BeforeAndAfter, FunSuite}
|
||||
|
@ -101,11 +101,11 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
|
|||
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
|
||||
}
|
||||
|
||||
sc.clearJobGroup()
|
||||
val jobB = sc.parallelize(1 to 100, 2).countAsync()
|
||||
|
||||
// Block until both tasks of job A have started and cancel job A.
|
||||
sem.acquire(2)
|
||||
|
||||
sc.clearJobGroup()
|
||||
val jobB = sc.parallelize(1 to 100, 2).countAsync()
|
||||
sc.cancelJobGroup("jobA")
|
||||
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
|
||||
assert(e.getMessage contains "cancel")
|
||||
|
@ -113,6 +113,38 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
|
|||
// Once A is cancelled, job B should finish fairly quickly.
|
||||
assert(jobB.get() === 100)
|
||||
}
|
||||
|
||||
|
||||
test("job group with interruption") {
|
||||
sc = new SparkContext("local[2]", "test")
|
||||
|
||||
// Add a listener to release the semaphore once any tasks are launched.
|
||||
val sem = new Semaphore(0)
|
||||
sc.addSparkListener(new SparkListener {
|
||||
override def onTaskStart(taskStart: SparkListenerTaskStart) {
|
||||
sem.release()
|
||||
}
|
||||
})
|
||||
|
||||
// jobA is the one to be cancelled.
|
||||
val jobA = future {
|
||||
sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
|
||||
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
|
||||
}
|
||||
|
||||
// Block until both tasks of job A have started and cancel job A.
|
||||
sem.acquire(2)
|
||||
|
||||
sc.clearJobGroup()
|
||||
val jobB = sc.parallelize(1 to 100, 2).countAsync()
|
||||
sc.cancelJobGroup("jobA")
|
||||
val e = intercept[SparkException] { Await.result(jobA, 5.seconds) }
|
||||
assert(e.getMessage contains "cancel")
|
||||
|
||||
// Once A is cancelled, job B should finish fairly quickly.
|
||||
assert(jobB.get() === 100)
|
||||
}
|
||||
|
||||
/*
|
||||
test("two jobs sharing the same stage") {
|
||||
// sem1: make sure cancel is issued after some tasks are launched
|
||||
|
|
Loading…
Reference in a new issue