[SPARK-26362][CORE] Remove 'spark.driver.allowMultipleContexts' to disallow multiple creation of SparkContexts
## What changes were proposed in this pull request? Multiple SparkContexts are discouraged and it has been warning for last 4 years, see SPARK-4180. It could cause arbitrary and mysterious error cases, see SPARK-2243. Honestly, I didn't even know Spark still allows it, which looks never officially supported, see SPARK-2243. I believe It should be good timing now to remove this configuration. ## How was this patch tested? Each doc was manually checked and manually tested: ``` $ ./bin/spark-shell --conf=spark.driver.allowMultipleContexts=true ... scala> new SparkContext() org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at: org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:939) ... org.apache.spark.SparkContext$.$anonfun$assertNoOtherContextIsRunning$2(SparkContext.scala:2435) at scala.Option.foreach(Option.scala:274) at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2432) at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2509) at org.apache.spark.SparkContext.<init>(SparkContext.scala:80) at org.apache.spark.SparkContext.<init>(SparkContext.scala:112) ... 49 elided ``` Closes #23311 from HyukjinKwon/SPARK-26362. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
parent
1b604c1fd0
commit
9ccae0c9e7
|
@ -64,9 +64,8 @@ import org.apache.spark.util.logging.DriverLogger
|
|||
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
|
||||
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
|
||||
*
|
||||
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
|
||||
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
|
||||
*
|
||||
* @note Only one `SparkContext` should be active per JVM. You must `stop()` the
|
||||
* active `SparkContext` before creating a new one.
|
||||
* @param config a Spark Config object describing the application configuration. Any settings in
|
||||
* this config overrides the default configs as well as system properties.
|
||||
*/
|
||||
|
@ -75,14 +74,10 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
// The call site where this SparkContext was constructed.
|
||||
private val creationSite: CallSite = Utils.getCallSite()
|
||||
|
||||
// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
|
||||
private val allowMultipleContexts: Boolean =
|
||||
config.getBoolean("spark.driver.allowMultipleContexts", false)
|
||||
|
||||
// In order to prevent multiple SparkContexts from being active at the same time, mark this
|
||||
// context as having started construction.
|
||||
// NOTE: this must be placed at the beginning of the SparkContext constructor.
|
||||
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
|
||||
SparkContext.markPartiallyConstructed(this)
|
||||
|
||||
val startTime = System.currentTimeMillis()
|
||||
|
||||
|
@ -2392,7 +2387,7 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
// In order to prevent multiple SparkContexts from being active at the same time, mark this
|
||||
// context as having finished construction.
|
||||
// NOTE: this must be placed at the end of the SparkContext constructor.
|
||||
SparkContext.setActiveContext(this, allowMultipleContexts)
|
||||
SparkContext.setActiveContext(this)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2409,18 +2404,18 @@ object SparkContext extends Logging {
|
|||
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()
|
||||
|
||||
/**
|
||||
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
|
||||
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
|
||||
*
|
||||
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
|
||||
* Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`.
|
||||
*/
|
||||
private val activeContext: AtomicReference[SparkContext] =
|
||||
new AtomicReference[SparkContext](null)
|
||||
|
||||
/**
|
||||
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
|
||||
* Points to a partially-constructed SparkContext if another thread is in the SparkContext
|
||||
* constructor, or `None` if no SparkContext is being constructed.
|
||||
*
|
||||
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
|
||||
* Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`.
|
||||
*/
|
||||
private var contextBeingConstructed: Option[SparkContext] = None
|
||||
|
||||
|
@ -2428,24 +2423,16 @@ object SparkContext extends Logging {
|
|||
* Called to ensure that no other SparkContext is running in this JVM.
|
||||
*
|
||||
* Throws an exception if a running context is detected and logs a warning if another thread is
|
||||
* constructing a SparkContext. This warning is necessary because the current locking scheme
|
||||
* constructing a SparkContext. This warning is necessary because the current locking scheme
|
||||
* prevents us from reliably distinguishing between cases where another context is being
|
||||
* constructed and cases where another constructor threw an exception.
|
||||
*/
|
||||
private def assertNoOtherContextIsRunning(
|
||||
sc: SparkContext,
|
||||
allowMultipleContexts: Boolean): Unit = {
|
||||
private def assertNoOtherContextIsRunning(sc: SparkContext): Unit = {
|
||||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
|
||||
Option(activeContext.get()).filter(_ ne sc).foreach { ctx =>
|
||||
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
|
||||
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
|
||||
val errMsg = "Only one SparkContext should be running in this JVM (see SPARK-2243)." +
|
||||
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
|
||||
val exception = new SparkException(errMsg)
|
||||
if (allowMultipleContexts) {
|
||||
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
|
||||
} else {
|
||||
throw exception
|
||||
}
|
||||
throw new SparkException(errMsg)
|
||||
}
|
||||
|
||||
contextBeingConstructed.filter(_ ne sc).foreach { otherContext =>
|
||||
|
@ -2454,7 +2441,7 @@ object SparkContext extends Logging {
|
|||
val otherContextCreationSite =
|
||||
Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location")
|
||||
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
|
||||
" constructor). This may indicate an error, since only one SparkContext may be" +
|
||||
" constructor). This may indicate an error, since only one SparkContext should be" +
|
||||
" running in this JVM (see SPARK-2243)." +
|
||||
s" The other SparkContext was created at:\n$otherContextCreationSite"
|
||||
logWarning(warnMsg)
|
||||
|
@ -2467,8 +2454,6 @@ object SparkContext extends Logging {
|
|||
* singleton object. Because we can only have one active SparkContext per JVM,
|
||||
* this is useful when applications may wish to share a SparkContext.
|
||||
*
|
||||
* @note This function cannot be used to create multiple SparkContext instances
|
||||
* even if multiple contexts are allowed.
|
||||
* @param config `SparkConfig` that will be used for initialisation of the `SparkContext`
|
||||
* @return current `SparkContext` (or a new one if it wasn't created before the function call)
|
||||
*/
|
||||
|
@ -2477,7 +2462,7 @@ object SparkContext extends Logging {
|
|||
// from assertNoOtherContextIsRunning within setActiveContext
|
||||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
|
||||
if (activeContext.get() == null) {
|
||||
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
|
||||
setActiveContext(new SparkContext(config))
|
||||
} else {
|
||||
if (config.getAll.nonEmpty) {
|
||||
logWarning("Using an existing SparkContext; some configuration may not take effect.")
|
||||
|
@ -2494,14 +2479,12 @@ object SparkContext extends Logging {
|
|||
*
|
||||
* This method allows not passing a SparkConf (useful if just retrieving).
|
||||
*
|
||||
* @note This function cannot be used to create multiple SparkContext instances
|
||||
* even if multiple contexts are allowed.
|
||||
* @return current `SparkContext` (or a new one if wasn't created before the function call)
|
||||
*/
|
||||
def getOrCreate(): SparkContext = {
|
||||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
|
||||
if (activeContext.get() == null) {
|
||||
setActiveContext(new SparkContext(), allowMultipleContexts = false)
|
||||
setActiveContext(new SparkContext())
|
||||
}
|
||||
activeContext.get()
|
||||
}
|
||||
|
@ -2516,16 +2499,14 @@ object SparkContext extends Logging {
|
|||
|
||||
/**
|
||||
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
|
||||
* running. Throws an exception if a running context is detected and logs a warning if another
|
||||
* thread is constructing a SparkContext. This warning is necessary because the current locking
|
||||
* running. Throws an exception if a running context is detected and logs a warning if another
|
||||
* thread is constructing a SparkContext. This warning is necessary because the current locking
|
||||
* scheme prevents us from reliably distinguishing between cases where another context is being
|
||||
* constructed and cases where another constructor threw an exception.
|
||||
*/
|
||||
private[spark] def markPartiallyConstructed(
|
||||
sc: SparkContext,
|
||||
allowMultipleContexts: Boolean): Unit = {
|
||||
private[spark] def markPartiallyConstructed(sc: SparkContext): Unit = {
|
||||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
|
||||
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
|
||||
assertNoOtherContextIsRunning(sc)
|
||||
contextBeingConstructed = Some(sc)
|
||||
}
|
||||
}
|
||||
|
@ -2534,18 +2515,16 @@ object SparkContext extends Logging {
|
|||
* Called at the end of the SparkContext constructor to ensure that no other SparkContext has
|
||||
* raced with this constructor and started.
|
||||
*/
|
||||
private[spark] def setActiveContext(
|
||||
sc: SparkContext,
|
||||
allowMultipleContexts: Boolean): Unit = {
|
||||
private[spark] def setActiveContext(sc: SparkContext): Unit = {
|
||||
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
|
||||
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
|
||||
assertNoOtherContextIsRunning(sc)
|
||||
contextBeingConstructed = None
|
||||
activeContext.set(sc)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's
|
||||
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's
|
||||
* also called in unit tests to prevent a flood of warnings from test suites that don't / can't
|
||||
* properly clean up their SparkContexts.
|
||||
*/
|
||||
|
|
|
@ -40,8 +40,8 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD}
|
|||
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
|
||||
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
|
||||
*
|
||||
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
|
||||
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
|
||||
* @note Only one `SparkContext` should be active per JVM. You must `stop()` the
|
||||
* active `SparkContext` before creating a new one.
|
||||
*/
|
||||
class JavaSparkContext(val sc: SparkContext) extends Closeable {
|
||||
|
||||
|
|
|
@ -44,7 +44,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
|
|||
test("Only one SparkContext may be active at a time") {
|
||||
// Regression test for SPARK-4180
|
||||
val conf = new SparkConf().setAppName("test").setMaster("local")
|
||||
.set("spark.driver.allowMultipleContexts", "false")
|
||||
sc = new SparkContext(conf)
|
||||
val envBefore = SparkEnv.get
|
||||
// A SparkContext is already running, so we shouldn't be able to create a second one
|
||||
|
@ -58,7 +57,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
|
|||
}
|
||||
|
||||
test("Can still construct a new SparkContext after failing to construct a previous one") {
|
||||
val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false")
|
||||
val conf = new SparkConf()
|
||||
// This is an invalid configuration (no app name or master URL)
|
||||
intercept[SparkException] {
|
||||
new SparkContext(conf)
|
||||
|
@ -67,18 +66,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
|
|||
sc = new SparkContext(conf.setMaster("local").setAppName("test"))
|
||||
}
|
||||
|
||||
test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
|
||||
var secondSparkContext: SparkContext = null
|
||||
try {
|
||||
val conf = new SparkConf().setAppName("test").setMaster("local")
|
||||
.set("spark.driver.allowMultipleContexts", "true")
|
||||
sc = new SparkContext(conf)
|
||||
secondSparkContext = new SparkContext(conf)
|
||||
} finally {
|
||||
Option(secondSparkContext).foreach(_.stop())
|
||||
}
|
||||
}
|
||||
|
||||
test("Test getOrCreate") {
|
||||
var sc2: SparkContext = null
|
||||
SparkContext.clearActiveContext()
|
||||
|
@ -92,10 +79,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
|
|||
assert(sc === sc2)
|
||||
assert(sc eq sc2)
|
||||
|
||||
// Try creating second context to confirm that it's still possible, if desired
|
||||
sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local")
|
||||
.set("spark.driver.allowMultipleContexts", "true"))
|
||||
|
||||
sc2.stop()
|
||||
}
|
||||
|
||||
|
|
|
@ -25,8 +25,7 @@ import org.apache.spark.util.AccumulatorV2
|
|||
|
||||
class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
|
||||
test("launch of backend and scheduler") {
|
||||
val conf = new SparkConf().setMaster("myclusterManager").
|
||||
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
|
||||
val conf = new SparkConf().setMaster("myclusterManager").setAppName("testcm")
|
||||
sc = new SparkContext(conf)
|
||||
// check if the scheduler components are created and initialized
|
||||
sc.schedulerBackend match {
|
||||
|
|
|
@ -138,7 +138,7 @@ The first thing a Spark program must do is to create a [SparkContext](api/scala/
|
|||
how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object
|
||||
that contains information about your application.
|
||||
|
||||
Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before creating a new one.
|
||||
Only one SparkContext should be active per JVM. You must `stop()` the active SparkContext before creating a new one.
|
||||
|
||||
{% highlight scala %}
|
||||
val conf = new SparkConf().setAppName(appName).setMaster(master)
|
||||
|
|
|
@ -220,6 +220,10 @@ object MimaExcludes {
|
|||
// [SPARK-26139] Implement shuffle write metrics in SQL
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ShuffleDependency.this"),
|
||||
|
||||
// [SPARK-26362][CORE] Remove 'spark.driver.allowMultipleContexts' to disallow multiple creation of SparkContexts
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.setActiveContext"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.markPartiallyConstructed"),
|
||||
|
||||
// Data Source V2 API changes
|
||||
(problem: Problem) => problem match {
|
||||
case MissingClassProblem(cls) =>
|
||||
|
|
|
@ -63,6 +63,9 @@ class SparkContext(object):
|
|||
Main entry point for Spark functionality. A SparkContext represents the
|
||||
connection to a Spark cluster, and can be used to create L{RDD} and
|
||||
broadcast variables on that cluster.
|
||||
|
||||
.. note:: Only one :class:`SparkContext` should be active per JVM. You must `stop()`
|
||||
the active :class:`SparkContext` before creating a new one.
|
||||
"""
|
||||
|
||||
_gateway = None
|
||||
|
|
|
@ -263,7 +263,6 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
.setMaster("local[*]")
|
||||
.setAppName("test")
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set("spark.driver.allowMultipleContexts", "true")
|
||||
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
|
||||
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
|
||||
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
|
||||
|
|
Loading…
Reference in a new issue