From 8014b0b5d61237dc4851d4ae9927778302d692da Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 31 Jul 2020 17:28:35 +0900 Subject: [PATCH] [SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to create SparkContext in executors ### What changes were proposed in this pull request? This is a follow-up of #28986. This PR adds a config to switch allow/disallow to create `SparkContext` in executors. - `spark.driver.allowSparkContextInExecutors` ### Why are the changes needed? Some users or libraries actually create `SparkContext` in executors. We shouldn't break their workloads. ### Does this PR introduce _any_ user-facing change? Yes, users will be able to create `SparkContext` in executors with the config enabled. ### How was this patch tested? More tests are added. Closes #29278 from ueshin/issues/SPARK-32160/add_configs. Authored-by: Takuya UESHIN Signed-off-by: HyukjinKwon --- .../scala/org/apache/spark/SparkContext.scala | 6 +++-- .../spark/internal/config/package.scala | 7 +++++ .../org/apache/spark/SparkContextSuite.scala | 9 +++++++ docs/core-migration-guide.md | 4 +++ python/pyspark/context.py | 6 +++-- python/pyspark/tests/test_context.py | 11 ++++++++ .../org/apache/spark/sql/SparkSession.scala | 12 ++++++--- .../spark/sql/SparkSessionBuilderSuite.scala | 26 ++++++++++++++++++- 8 files changed, 72 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 06abc0541a..9ecf316bee 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -83,8 +83,10 @@ class SparkContext(config: SparkConf) extends Logging { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() - // In order to prevent SparkContext from being created in executors. - SparkContext.assertOnDriver() + if (!config.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) { + // In order to prevent SparkContext from being created in executors. + SparkContext.assertOnDriver() + } // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e1b598e670..fdc9253ce9 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1908,4 +1908,11 @@ package object config { .version("3.1.0") .booleanConf .createWithDefault(false) + + private[spark] val ALLOW_SPARK_CONTEXT_IN_EXECUTORS = + ConfigBuilder("spark.driver.allowSparkContextInExecutors") + .doc("If set to true, SparkContext can be created in executors.") + .version("3.0.1") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 132e994c37..1f7aa8eec8 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -946,6 +946,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(error.contains("SparkContext should only be created and accessed on the driver.")) } + + test("SPARK-32160: Allow to create SparkContext in executors if the config is set") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + + sc.range(0, 1).foreach { _ => + new SparkContext(new SparkConf().setAppName("test").setMaster("local") + .set(ALLOW_SPARK_CONTEXT_IN_EXECUTORS, true)).stop() + } + } } object SparkContextSuite { diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 63baef145f..b2a08502d0 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -22,6 +22,10 @@ license: | * Table of contents {:toc} +## Upgrading from Core 3.0 to 3.1 + +- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.driver.allowSparkContextInExecutors` when creating `SparkContext` in executors. + ## Upgrading from Core 2.4 to 3.0 - The `org.apache.spark.ExecutorPlugin` interface and related configuration has been replaced with diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5ddce9f458..0816657692 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -117,8 +117,10 @@ class SparkContext(object): ... ValueError:... """ - # In order to prevent SparkContext from being created in executors. - SparkContext._assert_on_driver() + if (conf is None or + conf.get("spark.driver.allowSparkContextInExecutors", "false").lower() != "true"): + # In order to prevent SparkContext from being created in executors. + SparkContext._assert_on_driver() self._callsite = first_spark_call() or CallSite(None, None, None) if gateway is not None and gateway.gateway_parameters.auth_token is None: diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index 168299e385..64fe3837e7 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -275,6 +275,17 @@ class ContextTests(unittest.TestCase): self.assertIn("SparkContext should only be created and accessed on the driver.", str(context.exception)) + def test_allow_to_create_spark_context_in_executors(self): + # SPARK-32160: SparkContext can be created in executors if the config is set. + + def create_spark_context(): + conf = SparkConf().set("spark.driver.allowSparkContextInExecutors", "true") + with SparkContext(conf=conf): + pass + + with SparkContext("local-cluster[3, 1, 1024]") as sc: + sc.range(2).foreach(lambda _: create_spark_context()) + class ContextTestsWithResources(unittest.TestCase): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 08b0a1c6a6..306c3235b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -29,6 +29,7 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog @@ -900,7 +901,13 @@ object SparkSession extends Logging { * @since 2.0.0 */ def getOrCreate(): SparkSession = synchronized { - assertOnDriver() + val sparkConf = new SparkConf() + options.foreach { case (k, v) => sparkConf.set(k, v) } + + if (!sparkConf.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) { + assertOnDriver() + } + // Get the session from current thread's active session. var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { @@ -919,9 +926,6 @@ object SparkSession extends Logging { // No active nor global default session. Create a new one. val sparkContext = userSuppliedContext.getOrElse { - val sparkConf = new SparkConf() - options.foreach { case (k, v) => sparkConf.set(k, v) } - // set a random app name if not given. if (!sparkConf.contains("spark.app.name")) { sparkConf.setAppName(java.util.UUID.randomUUID().toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index e914d8398e..cc261a9ed3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterEach -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf._ @@ -257,4 +258,27 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { context.stop() } } + + test("SPARK-32160: Disallow to create SparkSession in executors") { + val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate() + + val error = intercept[SparkException] { + session.range(1).foreach { v => + SparkSession.builder.master("local").getOrCreate() + () + } + }.getMessage() + + assert(error.contains("SparkSession should only be created and accessed on the driver.")) + } + + test("SPARK-32160: Allow to create SparkSession in executors if the config is set") { + val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate() + + session.range(1).foreach { v => + SparkSession.builder.master("local") + .config(ALLOW_SPARK_CONTEXT_IN_EXECUTORS.key, true).getOrCreate().stop() + () + } + } }