[SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.

This fixes the thread leak. I also changed the unit test to keep track
of allocated contexts and make sure they're closed after tests are
run; this is needed since some tests use this pattern:

    val sc = createContext()
    doSomethingThatMayThrow()
    sc.stop()

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #5311 from vanzin/SPARK-6650 and squashes the following commits:

652c73b [Marcelo Vanzin] Nits.
5711512 [Marcelo Vanzin] More exception safety.
cc5a744 [Marcelo Vanzin] Stop alloc manager before scheduler.
9886f69 [Marcelo Vanzin] [SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.
This commit is contained in:
Marcelo Vanzin 2015-04-02 19:48:55 -07:00 committed by Andrew Or
parent 052dee0707
commit 45134ec920
3 changed files with 49 additions and 36 deletions

View file

@ -17,10 +17,12 @@
package org.apache.spark package org.apache.spark
import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.mutable import scala.collection.mutable
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
import org.apache.spark.util.{SystemClock, Clock} import org.apache.spark.util.{Clock, SystemClock, Utils}
/** /**
* An agent that dynamically allocates and removes executors based on the workload. * An agent that dynamically allocates and removes executors based on the workload.
@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy // Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener private val listener = new ExecutorAllocationListener
// Executor that handles the scheduling task.
private val executor = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
/** /**
* Verify that the settings specified through the config are valid. * Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception. * If not, throw an appropriate exception.
@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
} }
/** /**
* Register for scheduler callbacks to decide when to add and remove executors. * Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/ */
def start(): Unit = { def start(): Unit = {
listenerBus.addListener(listener) listenerBus.addListener(listener)
startPolling()
val scheduleTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
} }
/** /**
* Start the main polling thread that keeps track of when to add and remove executors. * Stop the allocation manager.
*/ */
private def startPolling(): Unit = { def stop(): Unit = {
val t = new Thread { executor.shutdown()
override def run(): Unit = { executor.awaitTermination(10, TimeUnit.SECONDS)
while (true) {
try {
schedule()
} catch {
case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
}
Thread.sleep(intervalMillis)
}
}
}
t.setName("spark-dynamic-executor-allocation")
t.setDaemon(true)
t.start()
} }
/** /**

View file

@ -1400,6 +1400,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
env.metricsSystem.report() env.metricsSystem.report()
metadataCleaner.cancel() metadataCleaner.cancel()
cleaner.foreach(_.stop()) cleaner.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
dagScheduler.stop() dagScheduler.stop()
dagScheduler = null dagScheduler = null
listenerBus.stop() listenerBus.stop()

View file

@ -19,7 +19,7 @@ package org.apache.spark
import scala.collection.mutable import scala.collection.mutable
import org.scalatest.{FunSuite, PrivateMethodTester} import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.cluster.ExecutorInfo
@ -28,10 +28,20 @@ import org.apache.spark.util.ManualClock
/** /**
* Test add and remove behavior of ExecutorAllocationManager. * Test add and remove behavior of ExecutorAllocationManager.
*/ */
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
import ExecutorAllocationManager._ import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._ import ExecutorAllocationManagerSuite._
private val contexts = new mutable.ListBuffer[SparkContext]()
before {
contexts.clear()
}
after {
contexts.foreach(_.stop())
}
test("verify min/max executors") { test("verify min/max executors") {
val conf = new SparkConf() val conf = new SparkConf()
.setMaster("local") .setMaster("local")
@ -39,18 +49,19 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
.set("spark.dynamicAllocation.enabled", "true") .set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true") .set("spark.dynamicAllocation.testing", "true")
val sc0 = new SparkContext(conf) val sc0 = new SparkContext(conf)
contexts += sc0
assert(sc0.executorAllocationManager.isDefined) assert(sc0.executorAllocationManager.isDefined)
sc0.stop() sc0.stop()
// Min < 0 // Min < 0
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1") val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
intercept[SparkException] { new SparkContext(conf1) } intercept[SparkException] { contexts += new SparkContext(conf1) }
SparkEnv.get.stop() SparkEnv.get.stop()
SparkContext.clearActiveContext() SparkContext.clearActiveContext()
// Max < 0 // Max < 0
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1") val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
intercept[SparkException] { new SparkContext(conf2) } intercept[SparkException] { contexts += new SparkContext(conf2) }
SparkEnv.get.stop() SparkEnv.get.stop()
SparkContext.clearActiveContext() SparkContext.clearActiveContext()
@ -665,16 +676,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-2")) assert(removeTimes(manager).contains("executor-2"))
assert(!removeTimes(manager).contains("executor-1")) assert(!removeTimes(manager).contains("executor-1"))
} }
}
/**
* Helper methods for testing ExecutorAllocationManager.
* This includes methods to access private methods and fields in ExecutorAllocationManager.
*/
private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val schedulerBacklogTimeout = 1L
private val sustainedSchedulerBacklogTimeout = 2L
private val executorIdleTimeout = 3L
private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = { private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
val conf = new SparkConf() val conf = new SparkConf()
@ -688,9 +689,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
sustainedSchedulerBacklogTimeout.toString) sustainedSchedulerBacklogTimeout.toString)
.set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString) .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
.set("spark.dynamicAllocation.testing", "true") .set("spark.dynamicAllocation.testing", "true")
new SparkContext(conf) val sc = new SparkContext(conf)
contexts += sc
sc
} }
}
/**
* Helper methods for testing ExecutorAllocationManager.
* This includes methods to access private methods and fields in ExecutorAllocationManager.
*/
private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val schedulerBacklogTimeout = 1L
private val sustainedSchedulerBacklogTimeout = 2L
private val executorIdleTimeout = 3L
private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = { private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details") new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
} }