[SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt

## What changes were proposed in this pull request?

`Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes ReplSuite unstable.

This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the leak.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16825 from zsxwing/SPARK-19481.
This commit is contained in:
Shixiong Zhu 2017-02-09 11:16:51 -08:00 committed by Davies Liu
parent 6287c94f08
commit 303f00a4bf
5 changed files with 20 additions and 11 deletions

View file

@ -2489,6 +2489,13 @@ object SparkContext extends Logging {
}
}
/** Return the current active [[SparkContext]] if any. */
private[spark] def getActive: Option[SparkContext] = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
Option(activeContext.get())
}
}
/**
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another

View file

@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
object Main extends Logging {
initializeLogIfNecessary(true)
Signaling.cancelOnInterrupt()
private var _interp: SparkILoop = _

View file

@ -1027,7 +1027,6 @@ class SparkILoop(
builder.getOrCreate()
}
sparkContext = sparkSession.sparkContext
Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}

View file

@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
object Main extends Logging {
initializeLogIfNecessary(true)
Signaling.cancelOnInterrupt()
val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
@ -108,7 +109,6 @@ object Main extends Logging {
logInfo("Created Spark session")
}
sparkContext = sparkSession.sparkContext
Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}

View file

@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging {
* when no jobs are currently running.
* This makes it possible to interrupt a running shell job by pressing Ctrl+C.
*/
def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") {
if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
logWarning("Cancelling all active jobs, this can take a while. " +
"Press Ctrl+C again to exit now.")
ctx.cancelAllJobs()
true
} else {
false
}
def cancelOnInterrupt(): Unit = SignalUtils.register("INT") {
SparkContext.getActive.map { ctx =>
if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
logWarning("Cancelling all active jobs, this can take a while. " +
"Press Ctrl+C again to exit now.")
ctx.cancelAllJobs()
true
} else {
false
}
}.getOrElse(false)
}
}