From fda4070ea934cac081162f70d9ea7fe2e9a07cd4 Mon Sep 17 00:00:00 2001 From: lajin Date: Wed, 16 Oct 2019 19:51:56 -0700 Subject: [PATCH] [SPARK-29283][SQL] Error message is hidden when query from JDBC, especially enabled adaptive execution ### What changes were proposed in this pull request? When adaptive execution is enabled, the Spark users who connected from JDBC always get adaptive execution error whatever the under root cause is. It's very confused. We have to check the driver log to find out why. ```shell 0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v; SELECT * FROM testData join testData2 ON key = v; Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0) 0: jdbc:hive2://localhost:10000> ``` For example, a job queried from JDBC failed due to HDFS missing block. User still get the error message `Adaptive execution failed due to stage materialization failures`. The easiest way to reproduce is changing the code of `AdaptiveSparkPlanExec`, to let it throws out an exception when it faces `StageSuccess`. ```scala case class AdaptiveSparkPlanExec( events.drainTo(rem) (Seq(nextMsg) ++ rem.asScala).foreach { case StageSuccess(stage, res) => // stage.resultOption = Some(res) val ex = new SparkException("Wrapper Exception", new IllegalArgumentException("Root cause is IllegalArgumentException for Test")) errors.append( new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex)) case StageFailure(stage, ex) => errors.append( new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex)) ``` ### Why are the changes needed? To make the error message more user-friend and more useful for query from JDBC. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually test query: ```shell 0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string); CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string); +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.225 seconds) 0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int); CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int); +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.043 seconds) ``` Before: ```shell 0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v; SELECT * FROM testData join testData2 ON key = v; Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0) 0: jdbc:hive2://localhost:10000> ``` After: ```shell 0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v; SELECT * FROM testData join testData2 ON key = v; Error: Error running query: java.lang.IllegalArgumentException: Root cause is IllegalArgumentException for Test (state=,code=0) 0: jdbc:hive2://localhost:10000> ``` Closes #25960 from LantaoJin/SPARK-29283. Authored-by: lajin Signed-off-by: Yuming Wang --- .../SparkExecuteStatementOperation.scala | 17 +++++++++++------ .../SparkGetCatalogsOperation.scala | 18 ++++++++++++++---- .../SparkGetColumnsOperation.scala | 18 ++++++++++++++---- .../SparkGetFunctionsOperation.scala | 18 ++++++++++++++---- .../SparkGetSchemasOperation.scala | 18 ++++++++++++++---- .../SparkGetTableTypesOperation.scala | 18 ++++++++++++++---- .../SparkGetTablesOperation.scala | 19 ++++++++++++++----- .../SparkGetTypeInfoOperation.scala | 18 ++++++++++++++---- 8 files changed, 109 insertions(+), 35 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 90a428defd..68197a9de8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.shims.Utils import org.apache.hive.service.cli._ @@ -312,12 +313,16 @@ private[hive] class SparkExecuteStatementOperation( } else { logError(s"Error executing query with $statementId, currentState $currentState, ", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - if (e.isInstanceOf[HiveSQLException]) { - throw e.asInstanceOf[HiveSQLException] - } else { - throw new HiveSQLException("Error running query: " + e.toString, e) + e match { + case hiveException: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) + throw hiveException + case _ => + val root = ExceptionUtils.getRootCause(e) + HiveThriftServer2.listener.onStatementError( + statementId, root.getMessage, SparkUtils.exceptionString(root)) + throw new HiveSQLException("Error running query: " + root.toString, root) } } } finally { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala index cde99fd35b..6c8a5b0099 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.util.UUID +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hive.service.cli.{HiveSQLException, OperationState} import org.apache.hive.service.cli.operation.GetCatalogsOperation @@ -68,11 +69,20 @@ private[hive] class SparkGetCatalogsOperation( } setState(OperationState.FINISHED) } catch { - case e: HiveSQLException => + case e: Throwable => + logError(s"Error executing get catalogs operation with $statementId", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e + e match { + case hiveException: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) + throw hiveException + case _ => + val root = ExceptionUtils.getRootCause(e) + HiveThriftServer2.listener.onStatementError( + statementId, root.getMessage, SparkUtils.exceptionString(root)) + throw new HiveSQLException("Error getting catalogs: " + root.toString, root) + } } HiveThriftServer2.listener.onStatementFinish(statementId) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 89faff2f6f..f845a2285b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -22,6 +22,7 @@ import java.util.regex.Pattern import scala.collection.JavaConverters.seqAsJavaListConverter +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObject} import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType import org.apache.hive.service.cli._ @@ -129,11 +130,20 @@ private[hive] class SparkGetColumnsOperation( } setState(OperationState.FINISHED) } catch { - case e: HiveSQLException => + case e: Throwable => + logError(s"Error executing get columns operation with $statementId", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e + e match { + case hiveException: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) + throw hiveException + case _ => + val root = ExceptionUtils.getRootCause(e) + HiveThriftServer2.listener.onStatementError( + statementId, root.getMessage, SparkUtils.exceptionString(root)) + throw new HiveSQLException("Error getting columns: " + root.toString, root) + } } HiveThriftServer2.listener.onStatementFinish(statementId) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 462e57300e..1cdd891842 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -22,6 +22,7 @@ import java.util.UUID import scala.collection.JavaConverters.seqAsJavaListConverter +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils} import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetFunctionsOperation @@ -104,11 +105,20 @@ private[hive] class SparkGetFunctionsOperation( } setState(OperationState.FINISHED) } catch { - case e: HiveSQLException => + case e: Throwable => + logError(s"Error executing get functions operation with $statementId", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e + e match { + case hiveException: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) + throw hiveException + case _ => + val root = ExceptionUtils.getRootCause(e) + HiveThriftServer2.listener.onStatementError( + statementId, root.getMessage, SparkUtils.exceptionString(root)) + throw new HiveSQLException("Error getting functions: " + root.toString, root) + } } HiveThriftServer2.listener.onStatementFinish(statementId) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index 87ef154bcc..928610a6bc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.thriftserver import java.util.UUID import java.util.regex.Pattern +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetSchemasOperation @@ -87,11 +88,20 @@ private[hive] class SparkGetSchemasOperation( } setState(OperationState.FINISHED) } catch { - case e: HiveSQLException => + case e: Throwable => + logError(s"Error executing get schemas operation with $statementId", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e + e match { + case hiveException: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) + throw hiveException + case _ => + val root = ExceptionUtils.getRootCause(e) + HiveThriftServer2.listener.onStatementError( + statementId, root.getMessage, SparkUtils.exceptionString(root)) + throw new HiveSQLException("Error getting schemas: " + root.toString, root) + } } HiveThriftServer2.listener.onStatementFinish(statementId) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index 8f2257f77d..ec03f1e148 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.util.UUID +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetTableTypesOperation @@ -74,11 +75,20 @@ private[hive] class SparkGetTableTypesOperation( } setState(OperationState.FINISHED) } catch { - case e: HiveSQLException => + case e: Throwable => + logError(s"Error executing get table types operation with $statementId", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e + e match { + case hiveException: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) + throw hiveException + case _ => + val root = ExceptionUtils.getRootCause(e) + HiveThriftServer2.listener.onStatementError( + statementId, root.getMessage, SparkUtils.exceptionString(root)) + throw new HiveSQLException("Error getting table types: " + root.toString, root) + } } HiveThriftServer2.listener.onStatementFinish(statementId) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 6441dc50f4..bf9cf7ad46 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -22,6 +22,7 @@ import java.util.regex.Pattern import scala.collection.JavaConverters._ +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils import org.apache.hive.service.cli._ @@ -30,7 +31,6 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.util.{Utils => SparkUtils} @@ -119,11 +119,20 @@ private[hive] class SparkGetTablesOperation( } setState(OperationState.FINISHED) } catch { - case e: HiveSQLException => + case e: Throwable => + logError(s"Error executing get tables operation with $statementId", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e + e match { + case hiveException: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) + throw hiveException + case _ => + val root = ExceptionUtils.getRootCause(e) + HiveThriftServer2.listener.onStatementError( + statementId, root.getMessage, SparkUtils.exceptionString(root)) + throw new HiveSQLException("Error getting tables: " + root.toString, root) + } } HiveThriftServer2.listener.onStatementFinish(statementId) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala index 7a6a8c59b7..0d263b09d5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.util.UUID +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hive.service.cli.{HiveSQLException, OperationState} import org.apache.hive.service.cli.operation.GetTypeInfoOperation @@ -92,11 +93,20 @@ private[hive] class SparkGetTypeInfoOperation( }) setState(OperationState.FINISHED) } catch { - case e: HiveSQLException => + case e: Throwable => + logError(s"Error executing get type info with $statementId", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw e + e match { + case hiveException: HiveSQLException => + HiveThriftServer2.listener.onStatementError( + statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) + throw hiveException + case _ => + val root = ExceptionUtils.getRootCause(e) + HiveThriftServer2.listener.onStatementError( + statementId, root.getMessage, SparkUtils.exceptionString(root)) + throw new HiveSQLException("Error getting type info: " + root.toString, root) + } } HiveThriftServer2.listener.onStatementFinish(statementId) }