[SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to create SparkContext in executors
### What changes were proposed in this pull request? This is a follow-up of #28986. This PR adds a config to switch allow/disallow to create `SparkContext` in executors. - `spark.driver.allowSparkContextInExecutors` ### Why are the changes needed? Some users or libraries actually create `SparkContext` in executors. We shouldn't break their workloads. ### Does this PR introduce _any_ user-facing change? Yes, users will be able to create `SparkContext` in executors with the config enabled. ### How was this patch tested? More tests are added. Closes #29278 from ueshin/issues/SPARK-32160/add_configs. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
813532d103
commit
8014b0b5d6
|
@ -83,8 +83,10 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
// The call site where this SparkContext was constructed.
|
||||
private val creationSite: CallSite = Utils.getCallSite()
|
||||
|
||||
// In order to prevent SparkContext from being created in executors.
|
||||
SparkContext.assertOnDriver()
|
||||
if (!config.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
|
||||
// In order to prevent SparkContext from being created in executors.
|
||||
SparkContext.assertOnDriver()
|
||||
}
|
||||
|
||||
// In order to prevent multiple SparkContexts from being active at the same time, mark this
|
||||
// context as having started construction.
|
||||
|
|
|
@ -1908,4 +1908,11 @@ package object config {
|
|||
.version("3.1.0")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
private[spark] val ALLOW_SPARK_CONTEXT_IN_EXECUTORS =
|
||||
ConfigBuilder("spark.driver.allowSparkContextInExecutors")
|
||||
.doc("If set to true, SparkContext can be created in executors.")
|
||||
.version("3.0.1")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
}
|
||||
|
|
|
@ -946,6 +946,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
|
|||
|
||||
assert(error.contains("SparkContext should only be created and accessed on the driver."))
|
||||
}
|
||||
|
||||
test("SPARK-32160: Allow to create SparkContext in executors if the config is set") {
|
||||
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
|
||||
|
||||
sc.range(0, 1).foreach { _ =>
|
||||
new SparkContext(new SparkConf().setAppName("test").setMaster("local")
|
||||
.set(ALLOW_SPARK_CONTEXT_IN_EXECUTORS, true)).stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object SparkContextSuite {
|
||||
|
|
|
@ -22,6 +22,10 @@ license: |
|
|||
* Table of contents
|
||||
{:toc}
|
||||
|
||||
## Upgrading from Core 3.0 to 3.1
|
||||
|
||||
- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.driver.allowSparkContextInExecutors` when creating `SparkContext` in executors.
|
||||
|
||||
## Upgrading from Core 2.4 to 3.0
|
||||
|
||||
- The `org.apache.spark.ExecutorPlugin` interface and related configuration has been replaced with
|
||||
|
|
|
@ -117,8 +117,10 @@ class SparkContext(object):
|
|||
...
|
||||
ValueError:...
|
||||
"""
|
||||
# In order to prevent SparkContext from being created in executors.
|
||||
SparkContext._assert_on_driver()
|
||||
if (conf is None or
|
||||
conf.get("spark.driver.allowSparkContextInExecutors", "false").lower() != "true"):
|
||||
# In order to prevent SparkContext from being created in executors.
|
||||
SparkContext._assert_on_driver()
|
||||
|
||||
self._callsite = first_spark_call() or CallSite(None, None, None)
|
||||
if gateway is not None and gateway.gateway_parameters.auth_token is None:
|
||||
|
|
|
@ -275,6 +275,17 @@ class ContextTests(unittest.TestCase):
|
|||
self.assertIn("SparkContext should only be created and accessed on the driver.",
|
||||
str(context.exception))
|
||||
|
||||
def test_allow_to_create_spark_context_in_executors(self):
|
||||
# SPARK-32160: SparkContext can be created in executors if the config is set.
|
||||
|
||||
def create_spark_context():
|
||||
conf = SparkConf().set("spark.driver.allowSparkContextInExecutors", "true")
|
||||
with SparkContext(conf=conf):
|
||||
pass
|
||||
|
||||
with SparkContext("local-cluster[3, 1, 1024]") as sc:
|
||||
sc.range(2).foreach(lambda _: create_spark_context())
|
||||
|
||||
|
||||
class ContextTestsWithResources(unittest.TestCase):
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
|
|||
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable}
|
||||
import org.apache.spark.api.java.JavaRDD
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
|
||||
import org.apache.spark.sql.catalog.Catalog
|
||||
|
@ -900,7 +901,13 @@ object SparkSession extends Logging {
|
|||
* @since 2.0.0
|
||||
*/
|
||||
def getOrCreate(): SparkSession = synchronized {
|
||||
assertOnDriver()
|
||||
val sparkConf = new SparkConf()
|
||||
options.foreach { case (k, v) => sparkConf.set(k, v) }
|
||||
|
||||
if (!sparkConf.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
|
||||
assertOnDriver()
|
||||
}
|
||||
|
||||
// Get the session from current thread's active session.
|
||||
var session = activeThreadSession.get()
|
||||
if ((session ne null) && !session.sparkContext.isStopped) {
|
||||
|
@ -919,9 +926,6 @@ object SparkSession extends Logging {
|
|||
|
||||
// No active nor global default session. Create a new one.
|
||||
val sparkContext = userSuppliedContext.getOrElse {
|
||||
val sparkConf = new SparkConf()
|
||||
options.foreach { case (k, v) => sparkConf.set(k, v) }
|
||||
|
||||
// set a random app name if not given.
|
||||
if (!sparkConf.contains("spark.app.name")) {
|
||||
sparkConf.setAppName(java.util.UUID.randomUUID().toString)
|
||||
|
|
|
@ -19,7 +19,8 @@ package org.apache.spark.sql
|
|||
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
|
||||
import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.internal.StaticSQLConf._
|
||||
|
@ -257,4 +258,27 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
|
|||
context.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-32160: Disallow to create SparkSession in executors") {
|
||||
val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate()
|
||||
|
||||
val error = intercept[SparkException] {
|
||||
session.range(1).foreach { v =>
|
||||
SparkSession.builder.master("local").getOrCreate()
|
||||
()
|
||||
}
|
||||
}.getMessage()
|
||||
|
||||
assert(error.contains("SparkSession should only be created and accessed on the driver."))
|
||||
}
|
||||
|
||||
test("SPARK-32160: Allow to create SparkSession in executors if the config is set") {
|
||||
val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate()
|
||||
|
||||
session.range(1).foreach { v =>
|
||||
SparkSession.builder.master("local")
|
||||
.config(ALLOW_SPARK_CONTEXT_IN_EXECUTORS.key, true).getOrCreate().stop()
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue