spark-instrumented-optimizer/python/pyspark
edorigatti 3e5b4ae63a [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from driver to executor
## What changes were proposed in this pull request?
SPARK-23754 was fixed in #21383 by changing the UDF code to wrap the user function, but this required a hack to save its argspec. This PR reverts this change and fixes the `StopIteration` bug in the worker

## How does this work?

The root of the problem is that when an user-supplied function raises a `StopIteration`, pyspark might stop processing data, if this function is used in a for-loop. The solution is to catch `StopIteration`s exceptions and re-raise them as `RuntimeError`s, so that the execution fails and the error is reported to the user. This is done using the `fail_on_stopiteration` wrapper, in different ways depending on where the function is used:
 - In RDDs, the user function is wrapped in the driver, because this function is also called in the driver itself.
 - In SQL UDFs, the function is wrapped in the worker, since all processing happens there. Moreover, the worker needs the signature of the user function, which is lost when wrapping it, but passing this signature to the worker requires a not so nice hack.

## How was this patch tested?

Same tests, plus tests for pandas UDFs

Author: edorigatti <emilio.dorigatti@gmail.com>

Closes #21467 from e-dorigatti/fix_udf_hack.
2018-06-11 10:15:42 +08:00
..
ml [SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule in ml/__init__.py and add ImageSchema into __all__ 2018-06-08 09:32:11 -07:00
mllib [SPARK-15750][MLLIB][PYSPARK] Constructing FPGrowth fails when no numPartitions specified in pyspark 2018-05-07 14:47:58 -07:00
sql [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from driver to executor 2018-06-11 10:15:42 +08:00
streaming [SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch in PythonTransformFunction 2018-06-09 01:27:51 +07:00
__init__.py [SPARK-23328][PYTHON] Disallow default value None in na.replace/replace when 'to_replace' is not a dictionary 2018-02-09 14:21:10 +08:00
_globals.py [SPARK-23328][PYTHON] Disallow default value None in na.replace/replace when 'to_replace' is not a dictionary 2018-02-09 14:21:10 +08:00
accumulators.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
broadcast.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
cloudpickle.py [SPARK-24303][PYTHON] Update cloudpickle to v0.4.4 2018-05-18 09:53:24 -07:00
conf.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
context.py [SPARK-21945][YARN][PYTHON] Make --py-files work with PySpark shell in Yarn client mode 2018-05-17 12:07:58 +08:00
daemon.py [PYSPARK] Update py4j to version 0.10.7. 2018-05-09 10:47:35 -07:00
files.py [SPARK-3309] [PySpark] Put all public API in __all__ 2014-09-03 11:49:45 -07:00
find_spark_home.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
heapq3.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
java_gateway.py [PYSPARK] Update py4j to version 0.10.7. 2018-05-09 10:47:35 -07:00
join.py [SPARK-14202] [PYTHON] Use generator expression instead of list comp in python_full_outer_jo… 2016-03-28 14:51:36 -07:00
profiler.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
rdd.py [SPARK-23754][PYTHON] Re-raising StopIteration in client code 2018-05-30 18:11:33 +08:00
rddsampler.py [SPARK-4897] [PySpark] Python 3 support 2015-04-16 16:20:57 -07:00
resultiterable.py [SPARK-3074] [PySpark] support groupByKey() with single huge key 2015-04-09 17:07:23 -07:00
serializers.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
shell.py [SPARK-16451][REPL] Fail shell if SparkSession fails to start. 2018-06-05 08:29:29 +07:00
shuffle.py [SPARK-23754][PYTHON] Re-raising StopIteration in client code 2018-05-30 18:11:33 +08:00
statcounter.py [SPARK-6919] [PYSPARK] Add asDict method to StatCounter 2015-09-29 13:38:15 -07:00
status.py [SPARK-4172] [PySpark] Progress API in Python 2015-02-17 13:36:43 -08:00
storagelevel.py [SPARK-13992][CORE][PYSPARK][FOLLOWUP] Update OFF_HEAP semantics for Java api and Python api 2016-04-12 23:06:55 -07:00
taskcontext.py [SPARK-24397][PYSPARK] Added TaskContext.getLocalProperty(key) in Python 2018-05-31 11:23:57 -07:00
tests.py [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from driver to executor 2018-06-11 10:15:42 +08:00
traceback_utils.py [SPARK-1087] Move python traceback utilities into new traceback_utils.py file. 2014-09-15 19:28:17 -07:00
util.py [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from driver to executor 2018-06-11 10:15:42 +08:00
version.py [SPARK-23028] Bump master branch version to 2.4.0-SNAPSHOT 2018-01-13 00:37:59 +08:00
worker.py [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from driver to executor 2018-06-11 10:15:42 +08:00