Revert "[SPARK-32412][SQL] Unify error handling for spark thrift serv…

…er operations"

### What changes were proposed in this pull request?

This reverts commit 510a1656e6.

### Why are the changes needed?

see https://github.com/apache/spark/pull/29204#discussion_r475716547

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

pass ci tools

Closes #29531 from yaooqinn/revert.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kent Yao 2020-08-25 05:57:14 +00:00 committed by Wenchen Fan
parent 3f1e56d4ca
commit c26a97637f
3 changed files with 76 additions and 78 deletions

View file

@ -19,9 +19,11 @@ package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Map => JMap}
import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
@ -36,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.VariableSubstitution
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.{Utils => SparkUtils}
private[hive] class SparkExecuteStatementOperation(
val sqlContext: SQLContext,
@ -110,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation(
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
logInfo(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
@ -179,7 +182,7 @@ private[hive] class SparkExecuteStatementOperation(
resultOffset += 1
}
previousFetchEndOffset = resultOffset
logInfo(s"Returning result set with ${curRow} rows from offsets " +
log.info(s"Returning result set with ${curRow} rows from offsets " +
s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId")
resultRowSet
}
@ -216,9 +219,7 @@ private[hive] class SparkExecuteStatementOperation(
execute()
}
} catch {
case e: HiveSQLException =>
setOperationException(e)
logError(s"Error executing query with $statementId,", e)
case e: HiveSQLException => setOperationException(e)
}
}
}
@ -238,7 +239,21 @@ private[hive] class SparkExecuteStatementOperation(
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch onError()
} catch {
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
}
}
@ -279,7 +294,30 @@ private[hive] class SparkExecuteStatementOperation(
}
dataTypes = result.schema.fields.map(_.dataType)
} catch {
onError(needCancel = true)
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
val currentState = getStatus().getState()
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
}
}
} finally {
synchronized {
if (!getStatus.getState.isTerminal) {
@ -310,7 +348,9 @@ private[hive] class SparkExecuteStatementOperation(
}
}
// RDDs will be cleaned automatically upon garbage collection.
sqlContext.sparkContext.cancelJobGroup(statementId)
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}
}

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.hive.thriftserver
import java.util.concurrent.RejectedExecutionException
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.Operation
@ -96,32 +94,15 @@ private[hive] trait SparkOperation extends Operation with Logging {
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}
protected def onError(needCancel: Boolean = false): PartialFunction[Throwable, Unit] = {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
protected def onError(): PartialFunction[Throwable, Unit] = {
case e: Throwable =>
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId)
val currentState = getStatus.getState
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case rejected: RejectedExecutionException =>
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case _ =>
val tips = if (shouldRunAsync()) " in background" else ""
throw new HiveSQLException(s"Error operating $getType$tips: ${e.getMessage}", e)
}
logError(s"Error operating $getType with $statementId", e)
super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
}
}
}

View file

@ -21,9 +21,6 @@ import java.sql.SQLException
import org.apache.hive.service.cli.HiveSQLException
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types._
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
test("the scratch dir will be deleted during server start but recreated with new operation") {
@ -55,51 +52,31 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
test("Full stack traces as error message for jdbc or thrift client") {
val sql = "select date_sub(date'2011-11-11', '1.2')"
val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")) {
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val e = intercept[HiveSQLException] {
client.executeStatement(sessionHandle, sql, confOverlay)
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(!e.getMessage
.contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
val e = intercept[HiveSQLException] {
client.executeStatement(
sessionHandle,
sql,
confOverlay)
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(!e.getMessage.contains("" +
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")) {
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val opHandle = client.executeStatementAsync(sessionHandle, sql, confOverlay)
var status = client.getOperationStatus(opHandle)
while (!status.getState.isTerminal) {
Thread.sleep(10)
status = client.getOperationStatus(opHandle)
}
val e = status.getOperationException
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage
.contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}
Seq("true", "false").foreach { value =>
withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, value)) {
withJdbcStatement { statement =>
val e = intercept[SQLException] {
statement.executeQuery(sql)
}
assert(e.getMessage.contains(
"The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage.contains(
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
withJdbcStatement { statement =>
val e = intercept[SQLException] {
statement.executeQuery(sql)
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage.contains("" +
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}
}