[SPARK-30556][SQL] Copy sparkContext.localproperties to child thread inSubqueryExec.executionContext

### What changes were proposed in this pull request?
In `org.apache.spark.sql.execution.SubqueryExec#relationFuture` make a copy of `org.apache.spark.SparkContext#localProperties` and pass it to the sub-execution thread in `org.apache.spark.sql.execution.SubqueryExec#executionContext`

### Why are the changes needed?
Local properties set via sparkContext are not available as TaskContext properties when executing  jobs and threadpools have idle threads which are reused

Explanation:
When `SubqueryExec`, the relationFuture is evaluated via a separate thread. The threads inherit the `localProperties` from `sparkContext` as they are the child threads.
These threads are created in the `executionContext` (thread pools). Each Thread pool has a default keepAliveSeconds of 60 seconds for idle threads.
Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via `sparkContext.runJob/submitJob`

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Added UT

Closes #27267 from ajithme/subquerylocalprop.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Ajith 2020-01-22 18:21:11 -08:00 committed by Dongjoon Hyun
parent eccae13a5f
commit bbab2bb961
4 changed files with 69 additions and 7 deletions

View file

@ -145,9 +145,17 @@ object StaticSQLConf {
"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].")
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].")
.createWithDefault(128)
val SUBQUERY_MAX_THREAD_THRESHOLD =
buildStaticConf("spark.sql.subquery.maxThreadThreshold")
.internal()
.doc("The maximum degree of parallelism to execute the subquery.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].")
.createWithDefault(16)
val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length")
.doc("Threshold of SQL length beyond which it will be truncated before adding to " +
"event. Defaults to no truncation. If set to 0, callsite will be logged instead.")

View file

@ -20,6 +20,8 @@ package org.apache.spark.sql.execution
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.SparkContext
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.SparkSession
@ -164,4 +166,20 @@ object SQLExecution {
}
}
}
/**
* Wrap passed function to ensure necessary thread-local variables like
* SparkContext local properties are forwarded to execution thread
*/
def withThreadLocalCaptured[T](
sparkSession: SparkSession, exec: ExecutionContext)(body: => T): Future[T] = {
val activeSession = sparkSession
val sc = sparkSession.sparkContext
val localProps = Utils.cloneProperties(sc.getLocalProperties)
Future {
SparkSession.setActiveSession(activeSession)
sc.setLocalProperties(localProps)
body
}(exec)
}
}

View file

@ -31,8 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
/** Physical plan for Project. */
@ -749,7 +750,9 @@ case class SubqueryExec(name: String, child: SparkPlan)
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
SQLExecution.withThreadLocalCaptured[Array[InternalRow]](
sqlContext.sparkSession,
SubqueryExec.executionContext) {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
@ -764,7 +767,7 @@ case class SubqueryExec(name: String, child: SparkPlan)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
rows
}
}(SubqueryExec.executionContext)
}
}
protected override def doCanonicalize(): SparkPlan = {
@ -788,7 +791,8 @@ case class SubqueryExec(name: String, child: SparkPlan)
object SubqueryExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
ThreadUtils.newDaemonCachedThreadPool("subquery",
SQLConf.get.getConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD)))
}
/**

View file

@ -19,9 +19,9 @@ package org.apache.spark.sql.internal
import org.scalatest.Assertions._
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.{SparkException, SparkFunSuite, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
@ -125,6 +125,38 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect())
assert(e.getCause.isInstanceOf[NoSuchElementException])
}
test("SPARK-30556 propagate local properties to subquery execution thread") {
withSQLConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD.key -> "1") {
withTempView("l", "m", "n") {
Seq(true).toDF().createOrReplaceTempView("l")
val confKey = "spark.sql.y"
def createDataframe(confKey: String, confValue: String): Dataset[Boolean] = {
Seq(true)
.toDF()
.mapPartitions { _ =>
TaskContext.get.getLocalProperty(confKey) == confValue match {
case true => Iterator(true)
case false => Iterator.empty
}
}
}
// set local configuration and assert
val confValue1 = "e"
createDataframe(confKey, confValue1).createOrReplaceTempView("m")
spark.sparkContext.setLocalProperty(confKey, confValue1)
assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM m)").collect.size == 1)
// change the conf value and assert again
val confValue2 = "f"
createDataframe(confKey, confValue2).createOrReplaceTempView("n")
spark.sparkContext.setLocalProperty(confKey, confValue2)
assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM n)").collect().size == 1)
}
}
}
}
case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode {