Switched to use daemon thread in executor and fixed a bug in job cancellation for fair scheduler.

This commit is contained in:
Reynold Xin 2013-10-10 22:40:48 -07:00
parent 058508b625
commit 80cdbf4f49
6 changed files with 19 additions and 28 deletions

View file

@ -122,7 +122,7 @@ private[spark] class Executor(
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable], Utils.daemonThreadFactory)
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

View file

@ -48,9 +48,12 @@ private[spark] trait SchedulableBuilder {
return Some(tsm)
}
case pool: Pool =>
getTsm(pool)
val found = getTsm(pool)
if (found.isDefined) {
return getTsm(pool)
}
}
return None
None
}
getTsm(rootPool)
}

View file

@ -78,12 +78,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
private val executorIdToHost = new HashMap[String, String]
// JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null
// URIs of JARs to pass to executor
var jarUris: String = ""
// Listener object to pass upcalls into
var listener: TaskSchedulerListener = null
@ -356,9 +350,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (backend != null) {
backend.stop()
}
if (jarServer != null) {
jarServer.stop()
}
if (taskResultGetter != null) {
taskResultGetter.stop()
}

View file

@ -30,12 +30,8 @@ private[spark] trait SchedulerBackend {
def reviveOffers(): Unit
def defaultParallelism(): Int
def killTask(taskId: Long, executorId: String) {
throw new UnsupportedOperationException
}
def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
// Memory used by each executor (in megabytes)
protected val executorMemory: Int = SparkContext.executorMemoryRequested
// TODO: Probably want to add a killTask too
}

View file

@ -209,7 +209,6 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
}
override def stop() {
//threadPool.shutdownNow()
}
override def defaultParallelism() = threads

View file

@ -17,17 +17,15 @@
package org.apache.spark.scheduler.local
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.apache.spark._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ConcurrentMap, HashMap}
import java.util.concurrent.Semaphore
import java.util.concurrent.CountDownLatch
import java.util.Properties
import scala.collection.mutable.HashMap
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import org.apache.spark._
class Lock() {
var finished = false
@ -63,7 +61,11 @@ object TaskThreadInfo {
* 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* thus it will be scheduled later when cluster has free cpu cores.
*/
class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
class LocalSchedulerSuite extends FunSuite with LocalSparkContext with BeforeAndAfterEach {
override def afterEach() {
System.clearProperty("spark.scheduler.mode")
}
def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {