[SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession
### What changes were proposed in this pull request? SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession This seems a long-standing bug. ```scala scala> spark.sql("set spark.sql.warehouse.dir").show +--------------------+--------------------+ | key| value| +--------------------+--------------------+ |spark.sql.warehou...|file:/Users/kenty...| +--------------------+--------------------+ scala> spark.sql("set spark.sql.warehouse.dir=2"); org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir; at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154) at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42) at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100) at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) ... 47 elided scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get getClass getOrCreate scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate 20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect. res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574 scala> spark.sql("set spark.sql.warehouse.dir").show +--------------------+-----+ | key|value| +--------------------+-----+ |spark.sql.warehou...| xyz| +--------------------+-----+ scala> OptionsAttachments ``` ### Why are the changes needed? bugfix as shown in the previous section ### Does this PR introduce any user-facing change? Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances. ### How was this patch tested? new ut. Closes #28316 from yaooqinn/SPARK-31532. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
parent
6a576161ae
commit
8424f55229
|
@ -895,7 +895,7 @@ object SparkSession extends Logging {
|
|||
* SparkSession exists, the method creates a new SparkSession and assigns the
|
||||
* newly created SparkSession as the global default.
|
||||
*
|
||||
* In case an existing SparkSession is returned, the config options specified in
|
||||
* In case an existing SparkSession is returned, the non-static config options specified in
|
||||
* this builder will be applied to the existing SparkSession.
|
||||
*
|
||||
* @since 2.0.0
|
||||
|
@ -905,10 +905,7 @@ object SparkSession extends Logging {
|
|||
// Get the session from current thread's active session.
|
||||
var session = activeThreadSession.get()
|
||||
if ((session ne null) && !session.sparkContext.isStopped) {
|
||||
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
|
||||
if (options.nonEmpty) {
|
||||
logWarning("Using an existing SparkSession; some configuration may not take effect.")
|
||||
}
|
||||
applyModifiableSettings(session)
|
||||
return session
|
||||
}
|
||||
|
||||
|
@ -917,10 +914,7 @@ object SparkSession extends Logging {
|
|||
// If the current thread does not have an active session, get it from the global session.
|
||||
session = defaultSession.get()
|
||||
if ((session ne null) && !session.sparkContext.isStopped) {
|
||||
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
|
||||
if (options.nonEmpty) {
|
||||
logWarning("Using an existing SparkSession; some configuration may not take effect.")
|
||||
}
|
||||
applyModifiableSettings(session)
|
||||
return session
|
||||
}
|
||||
|
||||
|
@ -959,6 +953,22 @@ object SparkSession extends Logging {
|
|||
|
||||
return session
|
||||
}
|
||||
|
||||
private def applyModifiableSettings(session: SparkSession): Unit = {
|
||||
val (staticConfs, otherConfs) =
|
||||
options.partition(kv => SQLConf.staticConfKeys.contains(kv._1))
|
||||
|
||||
otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
|
||||
|
||||
if (staticConfs.nonEmpty) {
|
||||
logWarning("Using an existing SparkSession; the static sql configurations will not take" +
|
||||
" effect.")
|
||||
}
|
||||
if (otherConfs.nonEmpty) {
|
||||
logWarning("Using an existing SparkSession; some spark core configurations may not take" +
|
||||
" effect.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.scalatest.BeforeAndAfterEach
|
|||
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
|
||||
import org.apache.spark.sql.internal.StaticSQLConf._
|
||||
|
||||
/**
|
||||
* Test cases for the builder pattern of [[SparkSession]].
|
||||
|
@ -168,4 +168,51 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
|
|||
assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234")
|
||||
assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
|
||||
}
|
||||
|
||||
test("SPARK-31532: should not propagate static sql configs to the existing" +
|
||||
" active/default SparkSession") {
|
||||
val session = SparkSession.builder()
|
||||
.master("local")
|
||||
.config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532")
|
||||
.config("spark.app.name", "test-app-SPARK-31532")
|
||||
.getOrCreate()
|
||||
// do not propagate static sql configs to the existing active session
|
||||
val session1 = SparkSession
|
||||
.builder()
|
||||
.config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1")
|
||||
.getOrCreate()
|
||||
assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
|
||||
assert(session1.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
|
||||
|
||||
// do not propagate static sql configs to the existing default session
|
||||
SparkSession.clearActiveSession()
|
||||
val session2 = SparkSession
|
||||
.builder()
|
||||
.config(WAREHOUSE_PATH.key, "SPARK-31532-db")
|
||||
.config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2")
|
||||
.getOrCreate()
|
||||
|
||||
assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db"))
|
||||
assert(session.conf.get(WAREHOUSE_PATH) === session2.conf.get(WAREHOUSE_PATH))
|
||||
assert(session2.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
|
||||
}
|
||||
|
||||
test("SPARK-31532: propagate static sql configs if no existing SparkSession") {
|
||||
val conf = new SparkConf()
|
||||
.setMaster("local")
|
||||
.setAppName("test-app-SPARK-31532-2")
|
||||
.set(GLOBAL_TEMP_DATABASE.key, "globaltempdb-spark-31532")
|
||||
.set(WAREHOUSE_PATH.key, "SPARK-31532-db")
|
||||
SparkContext.getOrCreate(conf)
|
||||
|
||||
// propagate static sql configs if no existing session
|
||||
val session = SparkSession
|
||||
.builder()
|
||||
.config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-2")
|
||||
.config(WAREHOUSE_PATH.key, "SPARK-31532-db-2")
|
||||
.getOrCreate()
|
||||
assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2")
|
||||
assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532-2")
|
||||
assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue