spark-instrumented-optimizer/python/pyspark/sql
WeichenXu 31e7c37354 [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early
## What changes were proposed in this pull request?

 Closes the generator when Python UDFs stop early.

### Manually verification on pandas iterator UDF and mapPartitions

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import col, udf
from pyspark.taskcontext import TaskContext
import time
import os

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
spark.conf.set('spark.sql.pandas.udf.buffer.size', '4')

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
    try:
        for batch in it:
            yield batch + 100
            time.sleep(1.0)
    except BaseException as be:
        print("Debug: exception raised: " + str(type(be)))
        raise be
    finally:
        open("/tmp/000001.tmp", "a").close()

df1 = spark.range(10).select(col('id').alias('a')).repartition(1)

# will see log Debug: exception raised: <class 'GeneratorExit'>
# and file "/tmp/000001.tmp" generated.
df1.select(col('a'), fi1('a')).limit(2).collect()

def mapper(it):
    try:
        for batch in it:
                yield batch
    except BaseException as be:
        print("Debug: exception raised: " + str(type(be)))
        raise be
    finally:
        open("/tmp/000002.tmp", "a").close()

df2 = spark.range(10000000).repartition(1)

# will see log Debug: exception raised: <class 'GeneratorExit'>
# and file "/tmp/000002.tmp" generated.
df2.rdd.mapPartitions(mapper).take(2)

```

## How was this patch tested?

Unit test added.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #24986 from WeichenXu123/pandas_iter_udf_limit.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-28 17:10:25 +09:00
..
avro [SPARK-26856][PYSPARK][FOLLOWUP] Fix UT failure due to wrong patterns for Kinesis assembly 2019-04-02 14:52:56 +09:00
tests [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early 2019-06-28 17:10:25 +09:00
__init__.py [SPARK-22369][PYTHON][DOCS] Exposes catalog API documentation in PySpark 2017-11-02 15:22:52 +01:00
catalog.py [SPARK-24665][PYSPARK][FOLLOWUP] Use SQLConf in PySpark to manage all sql configs 2018-08-17 10:18:08 +08:00
column.py [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column 2019-06-13 11:04:41 +09:00
conf.py [SPARK-23698][PYTHON] Resolve undefined names in Python 3 2018-08-22 10:06:59 -07:00
context.py [SPARK-26640][CORE][ML][SQL][STREAMING][PYSPARK] Code cleanup from lgtm.com analysis 2019-01-17 19:40:39 -06:00
dataframe.py [SPARK-27992][PYTHON] Allow Python to join with connection thread to propagate errors 2019-06-26 13:05:41 -07:00
functions.py [SPARK-28132][PYTHON] Update document type conversion for Pandas UDFs (pyarrow 0.13.0, pandas 0.24.2, Python 3.7) 2019-06-21 10:47:54 -07:00
group.py [SPARK-24722][SQL] pivot() with Column type argument 2018-08-04 14:17:32 +08:00
readwriter.py [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning 2019-06-18 13:48:32 +09:00
session.py [SPARK-27995][PYTHON] Note the difference between str of Python 2 and 3 at Arrow optimized 2019-06-11 18:43:59 +09:00
streaming.py [SPARK-27627][SQL] Make option "pathGlobFilter" as a general option for all file sources 2019-05-09 08:41:43 +09:00
types.py [SPARK-23299][SQL][PYSPARK] Fix __repr__ behaviour for Rows 2019-05-06 10:00:49 -07:00
udf.py [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series 2019-06-15 08:29:20 -07:00
utils.py [SPARK-28041][PYTHON] Increase minimum supported Pandas to 0.23.2 2019-06-18 09:10:58 +09:00
window.py [MINOR][PYSPARK][SQL][DOC] Fix rowsBetween doc in Window 2019-06-14 09:56:37 +09:00