From 80cdbf4f49cdb07bfd765d3fdd1d16d5aec2e60a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 10 Oct 2013 22:40:48 -0700 Subject: [PATCH] Switched to use daemon thread in executor and fixed a bug in job cancellation for fair scheduler. --- .../org/apache/spark/executor/Executor.scala | 2 +- .../spark/scheduler/SchedulableBuilder.scala | 7 ++++-- .../scheduler/cluster/ClusterScheduler.scala | 9 -------- .../scheduler/cluster/SchedulerBackend.scala | 6 +---- .../scheduler/local/LocalScheduler.scala | 1 - .../scheduler/local/LocalSchedulerSuite.scala | 22 ++++++++++--------- 6 files changed, 19 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4c544275c2..16258f3521 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -122,7 +122,7 @@ private[spark] class Executor( // Start worker thread pool val threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) + 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory) // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index a4e86538f9..873801e867 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -48,9 +48,12 @@ private[spark] trait SchedulableBuilder { return Some(tsm) } case pool: Pool => - getTsm(pool) + val found = getTsm(pool) + if (found.isDefined) { + return getTsm(pool) + } } - return None + None } getTsm(rootPool) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 250dec5126..6c12ff7370 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -78,12 +78,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) private val executorIdToHost = new HashMap[String, String] - // JAR server, if any JARs were added by the user to the SparkContext - var jarServer: HttpServer = null - - // URIs of JARs to pass to executor - var jarUris: String = "" - // Listener object to pass upcalls into var listener: TaskSchedulerListener = null @@ -356,9 +350,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (backend != null) { backend.stop() } - if (jarServer != null) { - jarServer.stop() - } if (taskResultGetter != null) { taskResultGetter.stop() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala index c0578dcaa1..5367218faa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala @@ -30,12 +30,8 @@ private[spark] trait SchedulerBackend { def reviveOffers(): Unit def defaultParallelism(): Int - def killTask(taskId: Long, executorId: String) { - throw new UnsupportedOperationException - } + def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException // Memory used by each executor (in megabytes) protected val executorMemory: Int = SparkContext.executorMemoryRequested - - // TODO: Probably want to add a killTask too } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index 0a6f4df902..dc6509d195 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -209,7 +209,6 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: } override def stop() { - //threadPool.shutdownNow() } override def defaultParallelism() = threads diff --git a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala index af76c843e8..d46a7469c7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/local/LocalSchedulerSuite.scala @@ -17,17 +17,15 @@ package org.apache.spark.scheduler.local -import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter - -import org.apache.spark._ -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster._ -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.{ConcurrentMap, HashMap} import java.util.concurrent.Semaphore import java.util.concurrent.CountDownLatch -import java.util.Properties + +import scala.collection.mutable.HashMap + +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +import org.apache.spark._ + class Lock() { var finished = false @@ -63,7 +61,11 @@ object TaskThreadInfo { * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue, * thus it will be scheduled later when cluster has free cpu cores. */ -class LocalSchedulerSuite extends FunSuite with LocalSparkContext { +class LocalSchedulerSuite extends FunSuite with LocalSparkContext with BeforeAndAfterEach { + + override def afterEach() { + System.clearProperty("spark.scheduler.mode") + } def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {