From 37397b73ba949d7e0f0bb90d49459281c7afeadf Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 10 Oct 2013 22:57:43 -0700 Subject: [PATCH] Added comprehensive tests for job cancellation in a variety of environments (local vs cluster, fifo vs fair). --- .../apache/spark/JobCancellationSuite.scala | 138 ++++++++++++++++++ .../spark/rdd/AsyncRDDActionsSuite.scala | 50 ------- 2 files changed, 138 insertions(+), 50 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/JobCancellationSuite.scala diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala new file mode 100644 index 0000000000..35d88e4b72 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.Semaphore + +import scala.concurrent.future +import scala.concurrent.ExecutionContext.Implicits.global + +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkContext._ +import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListener} + + +/** + * Test suite for cancelling running jobs. We run the cancellation tasks for single job action + * (e.g. count) as well as multi-job action (e.g. take). We test in the combination of: + * - FIFO vs fair scheduler + * - local vs local cluster + */ +class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAfter + with LocalSparkContext { + + override def afterEach() { + System.clearProperty("spark.scheduler.mode") + } + + test("local mode, FIFO scheduler") { + System.setProperty("spark.scheduler.mode", "FIFO") + sc = new SparkContext("local[2]", "test") + testCount() + testTake() + resetSparkContext() + } + + test("cluster mode, FIFO scheduler") { + System.setProperty("spark.scheduler.mode", "FIFO") + sc = new SparkContext("local-cluster[2,1,512]", "test") + testCount() + testTake() + resetSparkContext() + } + + test("local mode, fair scheduler") { + System.setProperty("spark.scheduler.mode", "FAIR") + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() + System.setProperty("spark.scheduler.allocation.file", xmlPath) + sc = new SparkContext("local[2]", "test") + testCount() + testTake() + resetSparkContext() + } + + test("cluster mode, fair scheduler") { + System.setProperty("spark.scheduler.mode", "FAIR") + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() + System.setProperty("spark.scheduler.allocation.file", xmlPath) + sc = new SparkContext("local-cluster[2,1,512]", "test") + testCount() + testTake() + resetSparkContext() + } + + def testCount() { + // Cancel before launching any tasks + { + val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() + future { f.cancel() } + val e = intercept[SparkException] { f.get() } + assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) + } + + // Cancel after some tasks have been launched + { + // Add a listener to release the semaphore once any tasks are launched. + val sem = new Semaphore(0) + sc.dagScheduler.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart) { + sem.release() + } + }) + + val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() + future { + // Wait until some tasks were launched before we cancel the job. + sem.acquire() + f.cancel() + } + val e = intercept[SparkException] { f.get() } + assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) + } + } + + def testTake() { + // Cancel before launching any tasks + { + val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) + future { f.cancel() } + val e = intercept[SparkException] { f.get() } + assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) + } + + // Cancel after some tasks have been launched + { + // Add a listener to release the semaphore once any tasks are launched. + val sem = new Semaphore(0) + sc.dagScheduler.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart) { + sem.release() + } + }) + val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) + future { + sem.acquire() + f.cancel() + } + val e = intercept[SparkException] { f.get() } + assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index ac84640751..131e2466ac 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -45,56 +45,6 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll { lazy val zeroPartRdd = new EmptyRDD[Int](sc) - test("job cancellation before any tasks is launched") { - val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() - future { f.cancel() } - val e = intercept[SparkException] { f.get() } - assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) - } - - test("job cancellation after some tasks have been launched") { - // Add a listener to release the semaphore once any tasks are launched. - val sem = new Semaphore(0) - sc.dagScheduler.addSparkListener(new SparkListener { - override def onTaskStart(taskStart: SparkListenerTaskStart) { - sem.release() - } - }) - - val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() - future { - // Wait until some tasks were launched before we cancel the job. - sem.acquire() - f.cancel() - } - val e = intercept[SparkException] { f.get() } - assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) - } - - test("cancelling take action") { - val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) - future { f.cancel() } - val e = intercept[SparkException] { f.get() } - assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) - } - - test("cancelling take action after some tasks have been launched") { - // Add a listener to release the semaphore once any tasks are launched. - val sem = new Semaphore(0) - sc.dagScheduler.addSparkListener(new SparkListener { - override def onTaskStart(taskStart: SparkListenerTaskStart) { - sem.release() - } - }) - val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) - future { - sem.acquire() - f.cancel() - } - val e = intercept[SparkException] { f.get() } - assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) - } - test("countAsync") { assert(zeroPartRdd.countAsync().get() === 0) assert(sc.parallelize(1 to 10000, 5).countAsync().get() === 10000)