diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6c6cca84e1..d9b0a72618 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -115,7 +115,9 @@ object SQLConf { * Returns the active config object within the current scope. If there is an active SparkSession, * the proper SQLConf associated with the thread's active session is used. If it's called from * tasks in the executor side, a SQLConf will be created from job local properties, which are set - * and propagated from the driver side. + * and propagated from the driver side, unless a `SQLConf` has been set in the scope by + * `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created + * from DataFrames. * * The way this works is a little bit convoluted, due to the fact that config was added initially * only for physical plans (and as a result not in sql/catalyst module). @@ -129,7 +131,12 @@ object SQLConf { */ def get: SQLConf = { if (TaskContext.get != null) { - new ReadOnlySQLConf(TaskContext.get()) + val conf = existingConf.get() + if (conf != null) { + conf + } else { + new ReadOnlySQLConf(TaskContext.get()) + } } else { val isSchedulerEventLoopThread = SparkContext.getActive .map(_.dagScheduler.eventProcessLoop.eventThread) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index e5e86db29f..630d062d65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -105,7 +105,8 @@ class QueryExecution( * Given QueryExecution is not a public class, end users are discouraged to use this: please * use `Dataset.rdd` instead where conversion will be applied. */ - lazy val toRdd: RDD[InternalRow] = executedPlan.execute() + lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD( + executedPlan.execute(), sparkSession.sessionState.conf) /** * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala new file mode 100644 index 0000000000..7373da33e1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import java.util.Properties + +import scala.collection.JavaConverters._ + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.internal.SQLConf + +/** + * It is just a wrapper over `sqlRDD`, which sets and makes effective all the configs from the + * captured `SQLConf`. + * Please notice that this means we may miss configurations set after the creation of this RDD and + * before its execution. + * + * @param sqlRDD the `RDD` generated by the SQL plan + * @param conf the `SQLConf` to apply to the execution of the SQL plan + */ +class SQLExecutionRDD( + var sqlRDD: RDD[InternalRow], @transient conf: SQLConf) extends RDD[InternalRow](sqlRDD) { + private val sqlConfigs = conf.getAllConfs + private lazy val sqlConfExecutorSide = { + val props = new Properties() + props.putAll(sqlConfigs.asJava) + val newConf = new SQLConf() + newConf.setConf(props) + newConf + } + + override val partitioner = firstParent[InternalRow].partitioner + + override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + // If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set + // and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of + // this RDD. + if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) { + SQLConf.withExistingConf(sqlConfExecutorSide) { + firstParent[InternalRow].iterator(split, context) + } + } else { + firstParent[InternalRow].iterator(split, context) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index d885348f37..94b73ec186 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -17,8 +17,13 @@ package org.apache.spark.sql.internal -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.debug.codegenStringSeq import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SQLTestUtils @@ -102,4 +107,41 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { } } } + + test("SPARK-28939: propagate SQLConf also in conversions to RDD") { + val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y") + val physicalPlan = SQLConfAssertPlan(confs) + val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan) + withSQLConf(confs: _*) { + // Force RDD evaluation to trigger asserts + dummyQueryExecution.toRdd.collect() + } + val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan) + // Without setting the configs assertions fail + val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect()) + assert(e.getCause.isInstanceOf[NoSuchElementException]) + } +} + +case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = { + sqlContext + .sparkContext + .parallelize(0 until 2, 2) + .mapPartitions { it => + val confs = SQLConf.get + confToCheck.foreach { case (key, expectedValue) => + assert(confs.getConfString(key) == expectedValue) + } + it.map(i => InternalRow.fromSeq(Seq(i))) + } + } + + override def output: Seq[Attribute] = Seq.empty +} + +case class FakeQueryExecution(spark: SparkSession, physicalPlan: SparkPlan) + extends QueryExecution(spark, LocalRelation()) { + override lazy val sparkPlan: SparkPlan = physicalPlan + override lazy val executedPlan: SparkPlan = physicalPlan }