[SPARK-28939][SQL] Propagate SQLConf for plans executed by toRdd

### What changes were proposed in this pull request?

The PR proposes to create a custom `RDD` which enables to propagate `SQLConf` also in cases not tracked by SQL execution, as it happens when a `Dataset` is converted to and RDD either using `.rdd` or `.queryExecution.toRdd` and then the returned RDD is used to invoke actions on it.

In this way, SQL configs are effective also in these cases, while earlier they were ignored.

### Why are the changes needed?

Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are used, all the SQL configs set are ignored. An example of a reproducer can be:
```
  withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") {
    val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*)
    df.createOrReplaceTempView("spark64kb")
    val data = spark.sql("select * from spark64kb limit 10")
    // Subexpression elimination is used here, despite it should have been disabled
    data.describe()
  }
```

### Does this PR introduce any user-facing change?

When a user calls `.queryExecution.toRdd`, a `SQLExecutionRDD` is returned wrapping the `RDD` of the execute. When `.rdd` is used, an additional `SQLExecutionRDD` is present in the hierarchy.

### How was this patch tested?

added UT

Closes #25643 from mgaido91/SPARK-28939.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Marco Gaido 2019-09-09 21:20:34 +08:00 committed by Wenchen Fan
parent abec6d7763
commit 3d6b33a49a
4 changed files with 119 additions and 5 deletions

View file

@ -115,7 +115,9 @@ object SQLConf {
* Returns the active config object within the current scope. If there is an active SparkSession, * Returns the active config object within the current scope. If there is an active SparkSession,
* the proper SQLConf associated with the thread's active session is used. If it's called from * the proper SQLConf associated with the thread's active session is used. If it's called from
* tasks in the executor side, a SQLConf will be created from job local properties, which are set * tasks in the executor side, a SQLConf will be created from job local properties, which are set
* and propagated from the driver side. * and propagated from the driver side, unless a `SQLConf` has been set in the scope by
* `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created
* from DataFrames.
* *
* The way this works is a little bit convoluted, due to the fact that config was added initially * The way this works is a little bit convoluted, due to the fact that config was added initially
* only for physical plans (and as a result not in sql/catalyst module). * only for physical plans (and as a result not in sql/catalyst module).
@ -129,7 +131,12 @@ object SQLConf {
*/ */
def get: SQLConf = { def get: SQLConf = {
if (TaskContext.get != null) { if (TaskContext.get != null) {
new ReadOnlySQLConf(TaskContext.get()) val conf = existingConf.get()
if (conf != null) {
conf
} else {
new ReadOnlySQLConf(TaskContext.get())
}
} else { } else {
val isSchedulerEventLoopThread = SparkContext.getActive val isSchedulerEventLoopThread = SparkContext.getActive
.map(_.dagScheduler.eventProcessLoop.eventThread) .map(_.dagScheduler.eventProcessLoop.eventThread)

View file

@ -105,7 +105,8 @@ class QueryExecution(
* Given QueryExecution is not a public class, end users are discouraged to use this: please * Given QueryExecution is not a public class, end users are discouraged to use this: please
* use `Dataset.rdd` instead where conversion will be applied. * use `Dataset.rdd` instead where conversion will be applied.
*/ */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute() lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
executedPlan.execute(), sparkSession.sessionState.conf)
/** /**
* Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal

View file

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.internal.SQLConf
/**
* It is just a wrapper over `sqlRDD`, which sets and makes effective all the configs from the
* captured `SQLConf`.
* Please notice that this means we may miss configurations set after the creation of this RDD and
* before its execution.
*
* @param sqlRDD the `RDD` generated by the SQL plan
* @param conf the `SQLConf` to apply to the execution of the SQL plan
*/
class SQLExecutionRDD(
var sqlRDD: RDD[InternalRow], @transient conf: SQLConf) extends RDD[InternalRow](sqlRDD) {
private val sqlConfigs = conf.getAllConfs
private lazy val sqlConfExecutorSide = {
val props = new Properties()
props.putAll(sqlConfigs.asJava)
val newConf = new SQLConf()
newConf.setConf(props)
newConf
}
override val partitioner = firstParent[InternalRow].partitioner
override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
// If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set
// and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of
// this RDD.
if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
SQLConf.withExistingConf(sqlConfExecutorSide) {
firstParent[InternalRow].iterator(split, context)
}
} else {
firstParent[InternalRow].iterator(split, context)
}
}
}

View file

@ -17,8 +17,13 @@
package org.apache.spark.sql.internal package org.apache.spark.sql.internal
import org.apache.spark.SparkFunSuite import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.debug.codegenStringSeq import org.apache.spark.sql.execution.debug.codegenStringSeq
import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.test.SQLTestUtils
@ -102,4 +107,41 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
} }
} }
} }
test("SPARK-28939: propagate SQLConf also in conversions to RDD") {
val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y")
val physicalPlan = SQLConfAssertPlan(confs)
val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan)
withSQLConf(confs: _*) {
// Force RDD evaluation to trigger asserts
dummyQueryExecution.toRdd.collect()
}
val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan)
// Without setting the configs assertions fail
val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect())
assert(e.getCause.isInstanceOf[NoSuchElementException])
}
}
case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = {
sqlContext
.sparkContext
.parallelize(0 until 2, 2)
.mapPartitions { it =>
val confs = SQLConf.get
confToCheck.foreach { case (key, expectedValue) =>
assert(confs.getConfString(key) == expectedValue)
}
it.map(i => InternalRow.fromSeq(Seq(i)))
}
}
override def output: Seq[Attribute] = Seq.empty
}
case class FakeQueryExecution(spark: SparkSession, physicalPlan: SparkPlan)
extends QueryExecution(spark, LocalRelation()) {
override lazy val sparkPlan: SparkPlan = physicalPlan
override lazy val executedPlan: SparkPlan = physicalPlan
} }