[SPARK-28901][SQL] SparkThriftServer's Cancel SQL Operation show it in JDBC Tab UI

### What changes were proposed in this pull request?
Current Spark Thirft Server can't support cancel SQL job,  when we use Hue to query throgh Spark Thrift Server, when we run a sql and then click cancel button to cancel this sql, we will it won't work in backend and in the spark JDBC UI tab, we can see the SQL's status is always COMPILED, then the duration of SQL is always increasing, this may make people confused.

![image](https://user-images.githubusercontent.com/46485123/63869830-60338f00-c9eb-11e9-8776-cee965adcb0a.png)

### Why are the changes needed?

If sql status can't reflect sql's true status, it will make user confused.

### Does this PR introduce any user-facing change?

SparkthriftServer's UI tab will show SQL's status in CANCELED when we cancel a SQL .

### How was this patch tested?
Manuel tested

UI TAB Status
![image](https://user-images.githubusercontent.com/46485123/63915010-80a12f00-ca67-11e9-9342-830dfa9c719f.png)

![image](https://user-images.githubusercontent.com/46485123/63915084-a9292900-ca67-11e9-8e26-375bf8ce0963.png)

backend log
![image](https://user-images.githubusercontent.com/46485123/63914864-1092a900-ca67-11e9-93f2-08690ed9abf4.png)

Closes #25611 from AngersZhuuuu/SPARK-28901.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
This commit is contained in:
angerszhu 2019-09-04 09:20:51 -07:00 committed by Xiao Li
parent a07f795aea
commit 9f478a6832
2 changed files with 88 additions and 51 deletions

View file

@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging {
}
private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}
@ -174,16 +174,31 @@ object HiveThriftServer2 extends Logging {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
server.stop()
}
private var onlineSessionNum: Int = 0
private val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
private var totalRunning = 0
def getOnlineSessionNum: Int = synchronized { onlineSessionNum }
def getOnlineSessionNum: Int = synchronized {
sessionList.count(_._2.finishTimestamp == 0)
}
def getTotalRunning: Int = synchronized { totalRunning }
def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
!(execInfo.state == ExecutionState.FAILED ||
execInfo.state == ExecutionState.CANCELED ||
execInfo.state == ExecutionState.CLOSED)
}
/**
* When an error or a cancellation occurs, we set the finishTimestamp of the statement.
* Therefore, when we count the number of running statements, we need to exclude errors and
* cancellations and count all statements that have not been closed so far.
*/
def getTotalRunning: Int = synchronized {
executionList.count {
case (_, v) => isExecutionActive(v)
}
}
def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq }
@ -208,14 +223,12 @@ object HiveThriftServer2 extends Logging {
synchronized {
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
sessionList.put(sessionId, info)
onlineSessionNum += 1
trimSessionIfNecessary()
}
}
def onSessionClosed(sessionId: String): Unit = synchronized {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
onlineSessionNum -= 1
trimSessionIfNecessary()
}
@ -231,7 +244,6 @@ object HiveThriftServer2 extends Logging {
trimExecutionIfNecessary()
sessionList(sessionId).totalExecution += 1
executionList(id).groupId = groupId
totalRunning += 1
}
def onStatementParsed(id: String, executionPlan: String): Unit = synchronized {
@ -239,20 +251,22 @@ object HiveThriftServer2 extends Logging {
executionList(id).state = ExecutionState.COMPILED
}
def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMessage
executionList(id).state = ExecutionState.FAILED
totalRunning -= 1
trimExecutionIfNecessary()
}
def onStatementCanceled(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CANCELED
trimExecutionIfNecessary()
}
def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMsg
executionList(id).state = ExecutionState.FAILED
trimExecutionIfNecessary()
}
def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
totalRunning -= 1
trimExecutionIfNecessary()
}

View file

@ -72,9 +72,8 @@ private[hive] class SparkExecuteStatementOperation(
override def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug(s"CLOSING $statementId")
logInfo(s"Close statement with $statementId")
cleanup(OperationState.CLOSED)
sqlContext.sparkContext.clearJobGroup()
HiveThriftServer2.listener.onOperationClosed(statementId)
}
@ -159,6 +158,14 @@ private[hive] class SparkExecuteStatementOperation(
override def runInternal(): Unit = {
setState(OperationState.PENDING)
statementId = UUID.randomUUID().toString
logInfo(s"Submitting query '$statement' with $statementId")
HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
statementId,
parentSession.getUsername)
setHasResultSet(true) // avoid no resultset for async run
if (!runInBackground) {
@ -201,33 +208,38 @@ private[hive] class SparkExecuteStatementOperation(
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
}
}
private def execute(): Unit = withSchedulerPool {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
statementId,
parentSession.getUsername)
sqlContext.sparkContext.setJobGroup(statementId, statement)
try {
synchronized {
if (getStatus.getState.isTerminal) {
logInfo(s"Query with $statementId in terminal state before it started running")
return
} else {
logInfo(s"Running query with $statementId")
setState(OperationState.RUNNING)
}
}
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
sqlContext.sparkContext.setJobGroup(statementId, statement)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
@ -249,32 +261,43 @@ private[hive] class SparkExecuteStatementOperation(
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
case e: HiveSQLException =>
if (getStatus().getState() == OperationState.CANCELED) {
return
} else {
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
}
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
val currentState = getStatus().getState()
logError(s"Error executing query, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e.toString)
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
if (e.isInstanceOf[HiveSQLException]) {
throw e.asInstanceOf[HiveSQLException]
} else {
throw new HiveSQLException("Error running query: " + e.toString, e)
}
}
} finally {
synchronized {
if (!getStatus.getState.isTerminal) {
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
sqlContext.sparkContext.clearJobGroup()
}
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}
override def cancel(): Unit = {
logInfo(s"Cancel '$statement' with $statementId")
cleanup(OperationState.CANCELED)
synchronized {
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel query with $statementId")
cleanup(OperationState.CANCELED)
HiveThriftServer2.listener.onStatementCanceled(statementId)
}
}
}
private def cleanup(state: OperationState) {