[SPARK-9446] Clear Active SparkContext in stop() method

In thread 'stopped SparkContext remaining active' on mailing list, Andres observed the following in driver log:
```
15/07/29 15:17:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: <address removed>
15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Shutting down all executors
Exception in thread "Yarn application state monitor" org.apache.spark.SparkException: Error asking standalone scheduler to shut down executors
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158)
        at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1644)
        at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139)
Caused by: java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Asking each executor to shut down

        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
        ... 6 more
```
Effect of the above exception is that a stopped SparkContext is returned to user since SparkContext.clearActiveContext() is not called.

Author: tedyu <yuzhihong@gmail.com>

Closes #7756 from tedyu/master and squashes the following commits:

7339ff2 [tedyu] Move null assignment out of tryLogNonFatalError block
6e02cd9 [tedyu] Use Utils.tryLogNonFatalError to guard resource release
f5fb519 [tedyu] Clear Active SparkContext in stop() method using finally
This commit is contained in:
tedyu 2015-07-31 18:16:55 +01:00 committed by Sean Owen
parent 04a49edfdb
commit 27ae851ce1

View file

@ -1689,33 +1689,57 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
Utils.removeShutdownHook(_shutdownHookRef)
}
postApplicationEnd()
_ui.foreach(_.stop())
Utils.tryLogNonFatalError {
postApplicationEnd()
}
Utils.tryLogNonFatalError {
_ui.foreach(_.stop())
}
if (env != null) {
env.metricsSystem.report()
Utils.tryLogNonFatalError {
env.metricsSystem.report()
}
}
if (metadataCleaner != null) {
metadataCleaner.cancel()
Utils.tryLogNonFatalError {
metadataCleaner.cancel()
}
}
Utils.tryLogNonFatalError {
_cleaner.foreach(_.stop())
}
Utils.tryLogNonFatalError {
_executorAllocationManager.foreach(_.stop())
}
_cleaner.foreach(_.stop())
_executorAllocationManager.foreach(_.stop())
if (_dagScheduler != null) {
_dagScheduler.stop()
Utils.tryLogNonFatalError {
_dagScheduler.stop()
}
_dagScheduler = null
}
if (_listenerBusStarted) {
listenerBus.stop()
_listenerBusStarted = false
Utils.tryLogNonFatalError {
listenerBus.stop()
_listenerBusStarted = false
}
}
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
_eventLogger.foreach(_.stop())
if (env != null && _heartbeatReceiver != null) {
env.rpcEnv.stop(_heartbeatReceiver)
Utils.tryLogNonFatalError {
env.rpcEnv.stop(_heartbeatReceiver)
}
}
Utils.tryLogNonFatalError {
_progressBar.foreach(_.stop())
}
_progressBar.foreach(_.stop())
_taskScheduler = null
// TODO: Cache.stop()?
if (_env != null) {
_env.stop()
Utils.tryLogNonFatalError {
_env.stop()
}
SparkEnv.set(null)
}
SparkContext.clearActiveContext()