[SPARK-35057][SQL] Group exception messages in hive/thriftserver

### What changes were proposed in this pull request?
This PR group exception messages in `sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32646 from beliefer/SPARK-35057.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
gengjiaan 2021-05-27 07:31:14 +00:00 committed by Wenchen Fan
parent 5cc17ba0c7
commit 3e190807bc
8 changed files with 92 additions and 17 deletions

View file

@ -1217,4 +1217,17 @@ object QueryExecutionErrors {
s"Parse Mode: ${FailFastMode.name}. To process malformed records as null " +
"result, try setting the option 'mode' as 'PERMISSIVE'.", e)
}
def remoteOperationsUnsupportedError(): Throwable = {
new RuntimeException("Remote operations not supported")
}
def invalidKerberosConfigForHiveServer2Error(): Throwable = {
new IOException(
"HiveServer2 Kerberos principal or keytab is not correctly configured")
}
def parentSparkUIToAttachTabNotFoundError(): Throwable = {
new SparkException("Parent SparkUI to attach this tab to not found!")
}
}

View file

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import java.io.IOException
import java.util.concurrent.RejectedExecutionException
import org.apache.hive.service.ServiceException
import org.apache.hive.service.cli.{HiveSQLException, OperationType}
/**
* Object for grouping error messages from (most) exceptions thrown during
* hive execution with thrift server.
*/
object HiveThriftServerErrors {
def taskExecutionRejectedError(rejected: RejectedExecutionException): Throwable = {
new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
}
def runningQueryError(e: Throwable): Throwable = {
new HiveSQLException(s"Error running query: ${e.toString}", e)
}
def hiveOperatingError(operationType: OperationType, e: Throwable): Throwable = {
new HiveSQLException(s"Error operating $operationType ${e.getMessage}", e)
}
def failedToOpenNewSessionError(e: Throwable): Throwable = {
new HiveSQLException(s"Failed to open new session: $e", e)
}
def cannotLoginToKerberosError(e: Throwable): Throwable = {
new ServiceException("Unable to login to kerberos with given principal/keytab", e)
}
def cannotLoginToSpnegoError(
principal: String, keyTabFile: String, e: IOException): Throwable = {
new ServiceException("Unable to login to spnego with given principal " +
s"$principal and keytab $keyTabFile: $e", e)
}
def failedToStartServiceError(serviceName: String, e: Throwable): Throwable = {
new ServiceException(s"Failed to Start $serviceName", e)
}
}

View file

@ -255,8 +255,7 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
throw HiveThriftServerErrors.taskExecutionRejectedError(rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
@ -322,7 +321,7 @@ private[hive] class SparkExecuteStatementOperation(
statementId, e.getMessage, SparkUtils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
case _ => throw HiveThriftServerErrors.runningQueryError(e)
}
}
} finally {

View file

@ -102,7 +102,7 @@ private[hive] trait SparkOperation extends Operation with Logging {
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
case _ => throw HiveThriftServerErrors.hiveOperatingError(getType, e)
}
}
}

View file

@ -44,6 +44,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
@ -143,7 +144,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
if (isRemoteMode(sessionState)) {
// Hive 1.2 + not supported in CLI
throw new RuntimeException("Remote operations not supported")
throw QueryExecutionErrors.remoteOperationsUnsupportedError()
}
// Respect the configurations set by --hiveconf from the command line
// (based on Hive's CliDriver).
@ -330,7 +331,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
}
} else {
// Hive 1.2 + not supported in CLI
throw new RuntimeException("Remote operations not supported")
throw QueryExecutionErrors.remoteOperationsUnsupportedError()
}
override def setHiveVariables(hiveVariables: java.util.Map[String, String]): Unit = {

View file

@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.Utils
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hive.service.{AbstractService, CompositeService, Service, ServiceException}
import org.apache.hive.service.{AbstractService, CompositeService, Service}
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
@ -36,6 +36,7 @@ import org.apache.hive.service.server.HiveServer2
import org.slf4j.Logger
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext)
@ -56,8 +57,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
val principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)
val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB)
if (principal.isEmpty || keyTabFile.isEmpty) {
throw new IOException(
"HiveServer2 Kerberos principal or keytab is not correctly configured")
throw QueryExecutionErrors.invalidKerberosConfigForHiveServer2Error()
}
val originalUgi = UserGroupInformation.getCurrentUser
@ -72,7 +72,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
setSuperField(this, "serviceUGI", sparkServiceUGI)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
throw HiveThriftServerErrors.cannotLoginToKerberosError(e)
}
// Try creating spnego UGI if it is configured.
@ -84,8 +84,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
setSuperField(this, "httpUGI", httpUGI)
} catch {
case e: IOException =>
throw new ServiceException("Unable to login to spnego with given principal " +
s"$principal and keytab $keyTabFile: $e", e)
throw HiveThriftServerErrors.cannotLoginToSpnegoError(principal, keyTabFile, e)
}
}
}
@ -149,7 +148,7 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
logError(s"Error starting services $getName", e)
invoke(classOf[CompositeService], this, "stop",
classOf[Int] -> new Integer(serviceStartCount))
throw new ServiceException("Failed to Start " + getName, e)
throw HiveThriftServerErrors.failedToStartServiceError(getName, e)
}
}
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
import scala.util.control.NonFatal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
import org.apache.hive.service.cli.SessionHandle
import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.hive.service.server.HiveServer2
@ -80,7 +80,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
case NonFatal(inner) =>
logWarning("Error closing session", inner)
}
throw new HiveSQLException("Failed to open new session: " + e, e)
throw HiveThriftServerErrors.failedToOpenNewSessionError(e)
}
}

View file

@ -17,8 +17,9 @@
package org.apache.spark.sql.hive.thriftserver.ui
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.ui.{SparkUI, SparkUITab}
/**
@ -44,7 +45,7 @@ private[thriftserver] class ThriftServerTab(
private[thriftserver] object ThriftServerTab {
def getSparkUI(sparkContext: SparkContext): SparkUI = {
sparkContext.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
throw QueryExecutionErrors.parentSparkUIToAttachTabNotFoundError()
}
}
}