Job cancellation: addressed code review feedback from Kay.

This commit is contained in:
Reynold Xin 2013-10-11 15:58:14 -07:00
parent e2047d3927
commit a61cf40ab9
12 changed files with 85 additions and 79 deletions

View file

@ -46,7 +46,7 @@ trait FutureAction[T] extends Future[T] {
override def ready(atMost: Duration)(implicit permit: CanAwait): FutureAction.this.type
/**
* Await and return the result (of type T) of this action.
* Awaits and returns the result (of type T) of this action.
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @throws Exception exception during action execution
@ -76,7 +76,7 @@ trait FutureAction[T] extends Future[T] {
override def value: Option[Try[T]]
/**
* Block and return the result of this job.
* Blocks and returns the result of this job.
*/
@throws(classOf[Exception])
def get(): T = Await.result(this, Duration.Inf)

View file

@ -881,9 +881,7 @@ class SparkContext(
* Cancel all jobs that have been scheduled or are running.
*/
def cancelAllJobs() {
dagScheduler.activeJobs.foreach { job =>
dagScheduler.cancelJob(job.jobId)
}
dagScheduler.cancelAllJobs()
}
/**

View file

@ -114,8 +114,8 @@ private[spark] class Executor(
}
}
// Akka's message frame size. This is only used to warn the user when the task result is greater
// than this value, in which case Akka will silently drop the task result message.
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = {
env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
}
@ -198,6 +198,7 @@ private[spark] class Executor(
if (killed) {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
return
}
attemptedTask = Some(task)

View file

@ -31,7 +31,7 @@ import org.apache.spark.{Logging, CancellablePromise, FutureAction}
class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with Logging {
/**
* Return a future for counting the number of elements in the RDD.
* Returns a future for counting the number of elements in the RDD.
*/
def countAsync(): FutureAction[Long] = {
val totalCount = new AtomicLong
@ -51,7 +51,7 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with
}
/**
* Return a future for retrieving all elements of this RDD.
* Returns a future for retrieving all elements of this RDD.
*/
def collectAsync(): FutureAction[Seq[T]] = {
val results = new Array[Array[T]](self.partitions.size)
@ -60,7 +60,7 @@ class AsyncRDDActions[T: ClassManifest](self: RDD[T]) extends Serializable with
}
/**
* The async version of take that returns a FutureAction.
* Returns a future for retrieving the first num elements of the RDD.
*/
def takeAsync(num: Int): FutureAction[Seq[T]] = {
val promise = new CancellablePromise[Seq[T]]

View file

@ -88,7 +88,8 @@ class DAGScheduler(
eventQueue.put(ExecutorGained(execId, host))
}
// Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
// cancellation of the job itself.
override def taskSetFailed(taskSet: TaskSet, reason: String) {
eventQueue.put(TaskSetFailed(taskSet, reason))
}
@ -336,11 +337,18 @@ class DAGScheduler(
/**
* Cancel a job that is running or waiting in the queue.
*/
def cancelJob(jobId: Int): Unit = this.synchronized {
def cancelJob(jobId: Int) {
logInfo("Asked to cancel job " + jobId)
eventQueue.put(JobCancelled(jobId))
}
/**
* Cancel all jobs that are running or waiting in the queue.
*/
def cancelAllJobs() {
eventQueue.put(AllJobsCancelled)
}
/**
* Process one event retrieved from the event queue.
* Returns true if we should stop the event loop.
@ -373,6 +381,12 @@ class DAGScheduler(
taskSched.cancelTasks(stage.id)
}
case AllJobsCancelled =>
// Cancel all running jobs.
running.foreach { stage =>
taskSched.cancelTasks(stage.id)
}
case ExecutorGained(execId, host) =>
handleExecutorGained(execId, host)
@ -777,7 +791,7 @@ class DAGScheduler(
failedStage.completionTime = Some(System.currentTimeMillis())
for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason)
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
idToActiveJob -= resultStage.jobId

View file

@ -46,6 +46,8 @@ private[scheduler] case class JobSubmitted(
private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
private[scheduler] case object AllJobsCancelled extends DAGSchedulerEvent
private[scheduler]
case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent

View file

@ -43,7 +43,10 @@ private[spark] class Pool(
var runningTasks = 0
var priority = 0
// A pool's stage id is used to break the tie in scheduling.
var stageId = -1
var name = poolName
var parent: Pool = null

View file

@ -35,28 +35,6 @@ private[spark] trait SchedulableBuilder {
def buildPools()
def addTaskSetManager(manager: Schedulable, properties: Properties)
/**
* Find the TaskSetManager for the given stage. In fair scheduler, this function examines
* all the pools to find the TaskSetManager.
*/
def getTaskSetManagers(stageId: Int): Option[TaskSetManager] = {
def getTsm(pool: Pool): Option[TaskSetManager] = {
pool.schedulableQueue.foreach {
case tsm: TaskSetManager =>
if (tsm.stageId == stageId) {
return Some(tsm)
}
case pool: Pool =>
val found = getTsm(pool)
if (found.isDefined) {
return getTsm(pool)
}
}
None
}
getTsm(rootPool)
}
}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)

View file

@ -166,21 +166,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
override def cancelTasks(stageId: Int): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
schedulableBuilder.getTaskSetManagers(stageId).foreach { tsm =>
activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
// There are two possible cases here:
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task.
// In this case, send a kill signal to the executors to kill the task and then abort
// the stage.
// 2. The task set manager has been created but no tasks has been scheduled. In this case,
// simply abort the task set.
// simply abort the stage.
val taskIds = taskSetTaskIds(tsm.taskSet.id)
if (taskIds.size > 0) {
taskIds.foreach { tid =>
val execId = taskIdToExecutorId(tid)
backend.killTask(tid, execId)
}
} else {
tsm.error("Stage %d was cancelled before any tasks was launched".format(stageId))
}
tsm.error("Stage %d was cancelled".format(stageId))
}
}

View file

@ -17,8 +17,7 @@
package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer
import java.util.{Arrays, NoSuchElementException}
import java.util.Arrays
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@ -27,7 +26,7 @@ import scala.math.max
import scala.math.min
import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
SparkException, Success, TaskEndReason, TaskResultLost, TaskState, TaskKilled}
Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
import org.apache.spark.util.{SystemClock, Clock}

View file

@ -57,8 +57,10 @@ class LocalActor(localScheduler: LocalScheduler, private var freeCores: Int)
launchTask(localScheduler.resourceOffer(freeCores))
case LocalStatusUpdate(taskId, state, serializeData) =>
if (TaskState.isFinished(state)) {
freeCores += 1
launchTask(localScheduler.resourceOffer(freeCores))
}
case KillTask(taskId) =>
executor.killTask(taskId)
@ -128,20 +130,21 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
override def cancelTasks(stageId: Int): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
schedulableBuilder.getTaskSetManagers(stageId).foreach { tsm =>
logInfo("Cancelling stage " + activeTaskSets.map(_._2.stageId))
activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
// There are two possible cases here:
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task.
// In this case, send a kill signal to the executors to kill the task and then abort
// the stage.
// 2. The task set manager has been created but no tasks has been scheduled. In this case,
// simply abort the task set.
// simply abort the stage.
val taskIds = taskSetTaskIds(tsm.taskSet.id)
if (taskIds.size > 0) {
taskIds.foreach { tid =>
localActor ! KillTask(tid)
}
} else {
tsm.error("Stage %d was cancelled before any tasks was launched".format(stageId))
}
tsm.error("Stage %d was cancelled".format(stageId))
}
}
@ -185,7 +188,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
}
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
if (TaskState.isFinished(state)) synchronized {
if (TaskState.isFinished(state)) {
synchronized {
taskIdToTaskSetId.get(taskId) match {
case Some(taskSetId) =>
val taskSetManager = activeTaskSets(taskSetId)
@ -200,12 +204,12 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
taskSetManager.error("Task %d was killed".format(taskId))
case _ => {}
}
localActor ! LocalStatusUpdate(taskId, state, serializedData)
case None =>
logInfo("Ignoring update from TID " + taskId + " because its task set is gone")
}
}
localActor ! LocalStatusUpdate(taskId, state, serializedData)
}
}
override def stop() {

View file

@ -31,9 +31,8 @@ import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListener}
/**
* Test suite for cancelling running jobs. We run the cancellation tasks for single job action
* (e.g. count) as well as multi-job action (e.g. take). We test in the combination of:
* - FIFO vs fair scheduler
* - local vs local cluster
* (e.g. count) as well as multi-job action (e.g. take). We test the local and cluster schedulers
* in both FIFO and fair scheduling modes.
*/
class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
with LocalSparkContext {
@ -48,14 +47,8 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
sc = new SparkContext("local[2]", "test")
testCount()
testTake()
resetSparkContext()
}
test("cluster mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test")
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
resetSparkContext()
}
@ -66,6 +59,18 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
sc = new SparkContext("local[2]", "test")
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
resetSparkContext()
}
test("cluster mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test")
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
resetSparkContext()
}
@ -76,6 +81,8 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
sc = new SparkContext("local-cluster[2,1,512]", "test")
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
resetSparkContext()
}