[SPARK-26992][STS] Fix STS scheduler pool correct delivery

## What changes were proposed in this pull request?

The user sets the value of spark.sql.thriftserver.scheduler.pool.
Spark thrift server saves this value in the LocalProperty of threadlocal type, but does not clean up after running, causing other sessions to run in the previously set pool name.

## How was this patch tested?

manual tests

Closes #23895 from cxzl25/thrift_server_scheduler_pool_pollute.

Lead-authored-by: cxzl25 <cxzl25@users.noreply.github.com>
Co-authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
cxzl25 2019-04-06 17:14:29 -05:00 committed by Sean Owen
parent 4a5768b2a2
commit 6450c5948a

View file

@ -109,7 +109,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withSchedulerPool {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@ -210,7 +210,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
private def execute(): Unit = {
private def execute(): Unit = withSchedulerPool {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
@ -225,10 +225,6 @@ private[hive] class SparkExecuteStatementOperation(
statementId,
parentSession.getUsername)
sqlContext.sparkContext.setJobGroup(statementId, statement)
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
// It may have unpredictably behavior since we use thread pools to execute quries and
// the 'spark.scheduler.pool' may not be 'default' when we did not set its value.(SPARK-26914)
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
try {
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
@ -291,6 +287,20 @@ private[hive] class SparkExecuteStatementOperation(
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}
private def withSchedulerPool[T](body: => T): T = {
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
}
try {
body
} finally {
if (pool != null) {
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, null)
}
}
}
}
object SparkExecuteStatementOperation {