Added comprehensive tests for job cancellation in a variety of environments (local vs cluster, fifo vs fair).

This commit is contained in:
Reynold Xin 2013-10-10 22:57:43 -07:00
parent 80cdbf4f49
commit 37397b73ba
2 changed files with 138 additions and 50 deletions

View file

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.util.concurrent.Semaphore
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext._
import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListener}
/**
* Test suite for cancelling running jobs. We run the cancellation tasks for single job action
* (e.g. count) as well as multi-job action (e.g. take). We test in the combination of:
* - FIFO vs fair scheduler
* - local vs local cluster
*/
class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
with LocalSparkContext {
override def afterEach() {
System.clearProperty("spark.scheduler.mode")
}
test("local mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[2]", "test")
testCount()
testTake()
resetSparkContext()
}
test("cluster mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test")
testCount()
testTake()
resetSparkContext()
}
test("local mode, fair scheduler") {
System.setProperty("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local[2]", "test")
testCount()
testTake()
resetSparkContext()
}
test("cluster mode, fair scheduler") {
System.setProperty("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local-cluster[2,1,512]", "test")
testCount()
testTake()
resetSparkContext()
}
def testCount() {
// Cancel before launching any tasks
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
// Cancel after some tasks have been launched
{
// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.dagScheduler.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
future {
// Wait until some tasks were launched before we cancel the job.
sem.acquire()
f.cancel()
}
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
}
def testTake() {
// Cancel before launching any tasks
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
// Cancel after some tasks have been launched
{
// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.dagScheduler.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
future {
sem.acquire()
f.cancel()
}
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
}
}

View file

@ -45,56 +45,6 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll {
lazy val zeroPartRdd = new EmptyRDD[Int](sc)
test("job cancellation before any tasks is launched") {
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
test("job cancellation after some tasks have been launched") {
// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.dagScheduler.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
future {
// Wait until some tasks were launched before we cancel the job.
sem.acquire()
f.cancel()
}
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
test("cancelling take action") {
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
test("cancelling take action after some tasks have been launched") {
// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.dagScheduler.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
future {
sem.acquire()
f.cancel()
}
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
test("countAsync") {
assert(zeroPartRdd.countAsync().get() === 0)
assert(sc.parallelize(1 to 10000, 5).countAsync().get() === 10000)