[SPARK-16941] Use concurrentHashMap instead of scala Map in SparkSQLOperationManager.
## What changes were proposed in this pull request? ThriftServer will have some thread-safe problem in **SparkSQLOperationManager**. Add a SynchronizedMap trait for the maps in it to avoid this problem. Details in [SPARK-16941](https://issues.apache.org/jira/browse/SPARK-16941) ## How was this patch tested? NA Author: huangzhaowei <carlmartinmax@gmail.com> Closes #14534 from SaintBacchus/SPARK-16941.
This commit is contained in:
parent
8a6b7037bb
commit
a45fefd17e
|
@ -23,7 +23,7 @@ import java.util.{Arrays, Map => JMap, UUID}
|
|||
import java.util.concurrent.RejectedExecutionException
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema
|
||||
|
@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
statement: String,
|
||||
confOverlay: JMap[String, String],
|
||||
runInBackground: Boolean = true)
|
||||
(sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String])
|
||||
(sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
|
||||
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
|
||||
with Logging {
|
||||
|
||||
|
@ -215,7 +215,8 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
statementId,
|
||||
parentSession.getUsername)
|
||||
sqlContext.sparkContext.setJobGroup(statementId, statement)
|
||||
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
|
||||
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
|
||||
if (pool != null) {
|
||||
sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
|
||||
}
|
||||
try {
|
||||
|
@ -223,7 +224,7 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
logDebug(result.queryExecution.toString())
|
||||
result.queryExecution.logical match {
|
||||
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
|
||||
sessionToActivePool(parentSession.getSessionHandle) = value
|
||||
sessionToActivePool.put(parentSession.getSessionHandle, value)
|
||||
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
|
||||
case _ =>
|
||||
}
|
||||
|
|
|
@ -79,14 +79,14 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
|
|||
sqlContext.newSession()
|
||||
}
|
||||
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
|
||||
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
|
||||
sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
|
||||
sessionHandle
|
||||
}
|
||||
|
||||
override def closeSession(sessionHandle: SessionHandle) {
|
||||
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
|
||||
super.closeSession(sessionHandle)
|
||||
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
|
||||
sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
|
||||
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,7 @@
|
|||
package org.apache.spark.sql.hive.thriftserver.server
|
||||
|
||||
import java.util.{Map => JMap}
|
||||
|
||||
import scala.collection.mutable.Map
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import org.apache.hive.service.cli._
|
||||
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
|
||||
|
@ -39,15 +38,17 @@ private[thriftserver] class SparkSQLOperationManager()
|
|||
val handleToOperation = ReflectionUtils
|
||||
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
|
||||
|
||||
val sessionToActivePool = Map[SessionHandle, String]()
|
||||
val sessionToContexts = Map[SessionHandle, SQLContext]()
|
||||
val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
|
||||
val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
|
||||
|
||||
override def newExecuteStatementOperation(
|
||||
parentSession: HiveSession,
|
||||
statement: String,
|
||||
confOverlay: JMap[String, String],
|
||||
async: Boolean): ExecuteStatementOperation = synchronized {
|
||||
val sqlContext = sessionToContexts(parentSession.getSessionHandle)
|
||||
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
|
||||
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
|
||||
s" initialized or had already closed.")
|
||||
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
|
||||
val runInBackground = async && sessionState.hiveThriftServerAsync
|
||||
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
|
||||
|
|
Loading…
Reference in a new issue