From 9f478a6832eaad61bcf6e974845b968105e153f5 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 4 Sep 2019 09:20:51 -0700 Subject: [PATCH] [SPARK-28901][SQL] SparkThriftServer's Cancel SQL Operation show it in JDBC Tab UI ### What changes were proposed in this pull request? Current Spark Thirft Server can't support cancel SQL job, when we use Hue to query throgh Spark Thrift Server, when we run a sql and then click cancel button to cancel this sql, we will it won't work in backend and in the spark JDBC UI tab, we can see the SQL's status is always COMPILED, then the duration of SQL is always increasing, this may make people confused. ![image](https://user-images.githubusercontent.com/46485123/63869830-60338f00-c9eb-11e9-8776-cee965adcb0a.png) ### Why are the changes needed? If sql status can't reflect sql's true status, it will make user confused. ### Does this PR introduce any user-facing change? SparkthriftServer's UI tab will show SQL's status in CANCELED when we cancel a SQL . ### How was this patch tested? Manuel tested UI TAB Status ![image](https://user-images.githubusercontent.com/46485123/63915010-80a12f00-ca67-11e9-9342-830dfa9c719f.png) ![image](https://user-images.githubusercontent.com/46485123/63915084-a9292900-ca67-11e9-8e26-375bf8ce0963.png) backend log ![image](https://user-images.githubusercontent.com/46485123/63914864-1092a900-ca67-11e9-93f2-08690ed9abf4.png) Closes #25611 from AngersZhuuuu/SPARK-28901. Authored-by: angerszhu Signed-off-by: Xiao Li --- .../hive/thriftserver/HiveThriftServer2.scala | 48 ++++++---- .../SparkExecuteStatementOperation.scala | 91 ++++++++++++------- 2 files changed, 88 insertions(+), 51 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index abb53cf342..36d4ac095e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -138,7 +138,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value + val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } @@ -174,16 +174,31 @@ object HiveThriftServer2 extends Logging { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { server.stop() } - private var onlineSessionNum: Int = 0 private val sessionList = new mutable.LinkedHashMap[String, SessionInfo] private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo] private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT) private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT) - private var totalRunning = 0 - def getOnlineSessionNum: Int = synchronized { onlineSessionNum } + def getOnlineSessionNum: Int = synchronized { + sessionList.count(_._2.finishTimestamp == 0) + } - def getTotalRunning: Int = synchronized { totalRunning } + def isExecutionActive(execInfo: ExecutionInfo): Boolean = { + !(execInfo.state == ExecutionState.FAILED || + execInfo.state == ExecutionState.CANCELED || + execInfo.state == ExecutionState.CLOSED) + } + + /** + * When an error or a cancellation occurs, we set the finishTimestamp of the statement. + * Therefore, when we count the number of running statements, we need to exclude errors and + * cancellations and count all statements that have not been closed so far. + */ + def getTotalRunning: Int = synchronized { + executionList.count { + case (_, v) => isExecutionActive(v) + } + } def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq } @@ -208,14 +223,12 @@ object HiveThriftServer2 extends Logging { synchronized { val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName) sessionList.put(sessionId, info) - onlineSessionNum += 1 trimSessionIfNecessary() } } def onSessionClosed(sessionId: String): Unit = synchronized { sessionList(sessionId).finishTimestamp = System.currentTimeMillis - onlineSessionNum -= 1 trimSessionIfNecessary() } @@ -231,7 +244,6 @@ object HiveThriftServer2 extends Logging { trimExecutionIfNecessary() sessionList(sessionId).totalExecution += 1 executionList(id).groupId = groupId - totalRunning += 1 } def onStatementParsed(id: String, executionPlan: String): Unit = synchronized { @@ -239,20 +251,22 @@ object HiveThriftServer2 extends Logging { executionList(id).state = ExecutionState.COMPILED } - def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = { - synchronized { - executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).detail = errorMessage - executionList(id).state = ExecutionState.FAILED - totalRunning -= 1 - trimExecutionIfNecessary() - } + def onStatementCanceled(id: String): Unit = synchronized { + executionList(id).finishTimestamp = System.currentTimeMillis + executionList(id).state = ExecutionState.CANCELED + trimExecutionIfNecessary() + } + + def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized { + executionList(id).finishTimestamp = System.currentTimeMillis + executionList(id).detail = errorMsg + executionList(id).state = ExecutionState.FAILED + trimExecutionIfNecessary() } def onStatementFinish(id: String): Unit = synchronized { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).state = ExecutionState.FINISHED - totalRunning -= 1 trimExecutionIfNecessary() } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 2f011c25fe..69e85484cc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -72,9 +72,8 @@ private[hive] class SparkExecuteStatementOperation( override def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - logDebug(s"CLOSING $statementId") + logInfo(s"Close statement with $statementId") cleanup(OperationState.CLOSED) - sqlContext.sparkContext.clearJobGroup() HiveThriftServer2.listener.onOperationClosed(statementId) } @@ -159,6 +158,14 @@ private[hive] class SparkExecuteStatementOperation( override def runInternal(): Unit = { setState(OperationState.PENDING) + statementId = UUID.randomUUID().toString + logInfo(s"Submitting query '$statement' with $statementId") + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + statement, + statementId, + parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run if (!runInBackground) { @@ -201,33 +208,38 @@ private[hive] class SparkExecuteStatementOperation( setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => + logError("Error submitting query in background, query rejected", rejected) setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e) } } } private def execute(): Unit = withSchedulerPool { - statementId = UUID.randomUUID().toString - logInfo(s"Running query '$statement' with $statementId") - setState(OperationState.RUNNING) - // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader - Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - - HiveThriftServer2.listener.onStatementStart( - statementId, - parentSession.getSessionHandle.getSessionId.toString, - statement, - statementId, - parentSession.getUsername) - sqlContext.sparkContext.setJobGroup(statementId, statement) try { + synchronized { + if (getStatus.getState.isTerminal) { + logInfo(s"Query with $statementId in terminal state before it started running") + return + } else { + logInfo(s"Running query with $statementId") + setState(OperationState.RUNNING) + } + } + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + + sqlContext.sparkContext.setJobGroup(statementId, statement) result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { @@ -249,32 +261,43 @@ private[hive] class SparkExecuteStatementOperation( } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { - case e: HiveSQLException => - if (getStatus().getState() == OperationState.CANCELED) { - return - } else { - setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e - } // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => val currentState = getStatus().getState() - logError(s"Error executing query, currentState $currentState, ", e) - setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.toString) + if (currentState.isTerminal) { + // This may happen if the execution was cancelled, and then closed from another thread. + logWarning(s"Ignore exception in terminal state with $statementId: $e") + } else { + logError(s"Error executing query with $statementId, currentState $currentState, ", e) + setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + if (e.isInstanceOf[HiveSQLException]) { + throw e.asInstanceOf[HiveSQLException] + } else { + throw new HiveSQLException("Error running query: " + e.toString, e) + } + } + } finally { + synchronized { + if (!getStatus.getState.isTerminal) { + setState(OperationState.FINISHED) + HiveThriftServer2.listener.onStatementFinish(statementId) + } + } + sqlContext.sparkContext.clearJobGroup() } - setState(OperationState.FINISHED) - HiveThriftServer2.listener.onStatementFinish(statementId) } override def cancel(): Unit = { - logInfo(s"Cancel '$statement' with $statementId") - cleanup(OperationState.CANCELED) + synchronized { + if (!getStatus.getState.isTerminal) { + logInfo(s"Cancel query with $statementId") + cleanup(OperationState.CANCELED) + HiveThriftServer2.listener.onStatementCanceled(statementId) + } + } } private def cleanup(state: OperationState) {