[SPARK-2645] [CORE] Allow SparkEnv.stop() to be called multiple times without side effects.

Fix for SparkContext stop behavior - Allow sc.stop() to be called multiple times without side effects.

Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>

Closes #6973 from rekhajoshm/SPARK-2645 and squashes the following commits:

277043e [Joshi] Fix for SparkContext stop behavior
446b0a4 [Joshi] Fix for SparkContext stop behavior
2ce5760 [Joshi] Fix for SparkContext stop behavior
c97839a [Joshi] Fix for SparkContext stop behavior
1aff39c [Joshi] Fix for SparkContext stop behavior
12f66b5 [Joshi] Fix for SparkContext stop behavior
72bb484 [Joshi] Fix for SparkContext stop behavior
a5a7d7f [Joshi] Fix for SparkContext stop behavior
9193a0c [Joshi] Fix for SparkContext stop behavior
58dba70 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
380c5b0 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
b566b66 [Joshi] SPARK-2645: Fix for SparkContext stop behavior
0be142d [Rekha Joshi] Merge pull request #3 from apache/master
106fd8e [Rekha Joshi] Merge pull request #2 from apache/master
e3677c9 [Rekha Joshi] Merge pull request #1 from apache/master
This commit is contained in:
Joshi 2015-06-30 14:00:35 -07:00 committed by Andrew Or
parent 79f0b371a3
commit 7dda0844e1
2 changed files with 44 additions and 29 deletions

View file

@ -22,7 +22,6 @@ import java.net.Socket
import akka.actor.ActorSystem
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Properties
@ -90,39 +89,42 @@ class SparkEnv (
private var driverTmpDirToDelete: Option[String] = None
private[spark] def stop() {
isStopped = true
pythonWorkers.foreach { case(key, worker) => worker.stop() }
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
rpcEnv.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()
if (!isStopped) {
isStopped = true
pythonWorkers.values.foreach(_.stop())
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
rpcEnv.shutdown()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()
// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
case Some(path) => {
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
// Note that blockTransferService is stopped by BlockManager since it is started by it.
// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
case Some(path) => {
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
}
}
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}

View file

@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import org.scalatest.Matchers._
class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
@ -272,4 +273,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}
}
test("calling multiple sc.stop() must not throw any exception") {
noException should be thrownBy {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val cnt = sc.parallelize(1 to 4).count()
sc.cancelAllJobs()
sc.stop()
// call stop second time
sc.stop()
}
}
}