ef7a5e0bca
#### What changes were proposed in this pull request? This follow-up PR is to address the remaining comments in https://github.com/apache/spark/pull/12385 The major change in this PR is to issue better error messages in PySpark by using the mechanism that was proposed by davies in https://github.com/apache/spark/pull/7135 For example, in PySpark, if we input the following statement: ```python >>> l = [('Alice', 1)] >>> df = sqlContext.createDataFrame(l) >>> df.createTempView("people") >>> df.createTempView("people") ``` Before this PR, the exception we will get is like ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", line 152, in createTempView self._jdf.createTempView(name) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o35.createTempView. : org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary table 'people' already exists; at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTempView(SessionCatalog.scala:324) at org.apache.spark.sql.SparkSession.createTempView(SparkSession.scala:523) at org.apache.spark.sql.Dataset.createTempView(Dataset.scala:2328) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745) ``` After this PR, the exception we will get become cleaner: ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", line 152, in createTempView self._jdf.createTempView(name) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", line 75, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"Temporary table 'people' already exists;" ``` #### How was this patch tested? Fixed an existing PySpark test case Author: gatorsmile <gatorsmile@gmail.com> Closes #13126 from gatorsmile/followup-14684.
113 lines
3.9 KiB
Python
113 lines
3.9 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import py4j
|
|
|
|
|
|
class CapturedException(Exception):
|
|
def __init__(self, desc, stackTrace):
|
|
self.desc = desc
|
|
self.stackTrace = stackTrace
|
|
|
|
def __str__(self):
|
|
return repr(self.desc)
|
|
|
|
|
|
class AnalysisException(CapturedException):
|
|
"""
|
|
Failed to analyze a SQL query plan.
|
|
"""
|
|
|
|
|
|
class ParseException(CapturedException):
|
|
"""
|
|
Failed to parse a SQL command.
|
|
"""
|
|
|
|
|
|
class IllegalArgumentException(CapturedException):
|
|
"""
|
|
Passed an illegal or inappropriate argument.
|
|
"""
|
|
|
|
|
|
class ContinuousQueryException(CapturedException):
|
|
"""
|
|
Exception that stopped a :class:`ContinuousQuery`.
|
|
"""
|
|
|
|
|
|
class QueryExecutionException(CapturedException):
|
|
"""
|
|
Failed to execute a query.
|
|
"""
|
|
|
|
|
|
def capture_sql_exception(f):
|
|
def deco(*a, **kw):
|
|
try:
|
|
return f(*a, **kw)
|
|
except py4j.protocol.Py4JJavaError as e:
|
|
s = e.java_exception.toString()
|
|
stackTrace = '\n\t at '.join(map(lambda x: x.toString(),
|
|
e.java_exception.getStackTrace()))
|
|
if s.startswith('org.apache.spark.sql.AnalysisException: '):
|
|
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
|
|
if s.startswith('org.apache.spark.sql.catalyst.analysis'):
|
|
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
|
|
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
|
|
raise ParseException(s.split(': ', 1)[1], stackTrace)
|
|
if s.startswith('org.apache.spark.sql.ContinuousQueryException: '):
|
|
raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
|
|
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
|
|
raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
|
|
if s.startswith('java.lang.IllegalArgumentException: '):
|
|
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
|
|
raise
|
|
return deco
|
|
|
|
|
|
def install_exception_handler():
|
|
"""
|
|
Hook an exception handler into Py4j, which could capture some SQL exceptions in Java.
|
|
|
|
When calling Java API, it will call `get_return_value` to parse the returned object.
|
|
If any exception happened in JVM, the result will be Java exception object, it raise
|
|
py4j.protocol.Py4JJavaError. We replace the original `get_return_value` with one that
|
|
could capture the Java exception and throw a Python one (with the same error message).
|
|
|
|
It's idempotent, could be called multiple times.
|
|
"""
|
|
original = py4j.protocol.get_return_value
|
|
# The original `get_return_value` is not patched, it's idempotent.
|
|
patched = capture_sql_exception(original)
|
|
# only patch the one used in in py4j.java_gateway (call Java API)
|
|
py4j.java_gateway.get_return_value = patched
|
|
|
|
|
|
def toJArray(gateway, jtype, arr):
|
|
"""
|
|
Convert python list to java type array
|
|
:param gateway: Py4j Gateway
|
|
:param jtype: java type of element in array
|
|
:param arr: python type list
|
|
"""
|
|
jarr = gateway.new_array(jtype, len(arr))
|
|
for i in range(0, len(arr)):
|
|
jarr[i] = arr[i]
|
|
return jarr
|