[SPARK-29283][SQL] Error message is hidden when query from JDBC, especially enabled adaptive execution

### What changes were proposed in this pull request?
When adaptive execution is enabled, the Spark users who connected from JDBC always get adaptive execution error whatever the under root cause is. It's very confused. We have to check the driver log to find out why.
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000>
```

For example, a job queried from JDBC failed due to HDFS missing block. User still get the error message `Adaptive execution failed due to stage materialization failures`.

The easiest way to reproduce is changing the code of `AdaptiveSparkPlanExec`, to let it throws out  an exception when it faces `StageSuccess`.
```scala
  case class AdaptiveSparkPlanExec(
      events.drainTo(rem)
         (Seq(nextMsg) ++ rem.asScala).foreach {
           case StageSuccess(stage, res) =>
//            stage.resultOption = Some(res)
            val ex = new SparkException("Wrapper Exception",
              new IllegalArgumentException("Root cause is IllegalArgumentException for Test"))
            errors.append(
              new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))
           case StageFailure(stage, ex) =>
             errors.append(
               new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))
```

### Why are the changes needed?
To make the error message more user-friend and more useful for query from JDBC.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Manually test query:
```shell
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string);
CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string);
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.225 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.043 seconds)
```
Before:
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000>
```
After:
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: java.lang.IllegalArgumentException: Root cause is IllegalArgumentException for Test (state=,code=0)
0: jdbc:hive2://localhost:10000>
```

Closes #25960 from LantaoJin/SPARK-29283.

Authored-by: lajin <lajin@ebay.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
This commit is contained in:
lajin 2019-10-16 19:51:56 -07:00 committed by Yuming Wang
parent 8eb8f7478c
commit fda4070ea9
8 changed files with 109 additions and 35 deletions

View file

@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
import org.apache.hive.service.cli._
@ -312,12 +313,16 @@ private[hive] class SparkExecuteStatementOperation(
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
if (e.isInstanceOf[HiveSQLException]) {
throw e.asInstanceOf[HiveSQLException]
} else {
throw new HiveSQLException("Error running query: " + e.toString, e)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error running query: " + root.toString, root)
}
}
} finally {

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.GetCatalogsOperation
@ -68,11 +69,20 @@ private[hive] class SparkGetCatalogsOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get catalogs operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting catalogs: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}

View file

@ -22,6 +22,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters.seqAsJavaListConverter
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObject}
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType
import org.apache.hive.service.cli._
@ -129,11 +130,20 @@ private[hive] class SparkGetColumnsOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get columns operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting columns: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}

View file

@ -22,6 +22,7 @@ import java.util.UUID
import scala.collection.JavaConverters.seqAsJavaListConverter
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils}
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetFunctionsOperation
@ -104,11 +105,20 @@ private[hive] class SparkGetFunctionsOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get functions operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting functions: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}

View file

@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
import java.util.regex.Pattern
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetSchemasOperation
@ -87,11 +88,20 @@ private[hive] class SparkGetSchemasOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get schemas operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting schemas: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetTableTypesOperation
@ -74,11 +75,20 @@ private[hive] class SparkGetTableTypesOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get table types operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting table types: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}

View file

@ -22,6 +22,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters._
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
import org.apache.hive.service.cli._
@ -30,7 +31,6 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.{Utils => SparkUtils}
@ -119,11 +119,20 @@ private[hive] class SparkGetTablesOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get tables operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting tables: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.GetTypeInfoOperation
@ -92,11 +93,20 @@ private[hive] class SparkGetTypeInfoOperation(
})
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get type info with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting type info: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}