spark-instrumented-optimizer/python/pyspark/sql
Liang-Chi Hsieh 6a5a7254dc [SPARK-18667][PYSPARK][SQL] Change the way to group row in BatchEvalPythonExec so input_file_name function can work with UDF in pyspark
## What changes were proposed in this pull request?

`input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:

    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    def filename(path):
        return path

    sourceFile = udf(filename, StringType())
    spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()

    +---------------------------+
    |filename(input_file_name())|
    +---------------------------+
    |                           |
    +---------------------------+

The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.

This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.

## How was this patch tested?

Added unit test to PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16115 from viirya/fix-py-udf-input-filename.
2016-12-08 23:22:18 +08:00
..
__init__.py [SPARK-16772][PYTHON][DOCS] Fix API doc references to UDFRegistration + Update "important classes" 2016-08-06 05:02:59 +01:00
catalog.py [SPARK-17338][SQL][FOLLOW-UP] add global temp view 2016-10-11 15:21:28 +08:00
column.py [SPARK-17215][SQL] Method SQLContext.parseDataType(dataTypeString: String) could be removed. 2016-08-24 23:36:04 -07:00
conf.py [SPARK-15464][ML][MLLIB][SQL][TESTS] Replace SQLContext and SparkContext with SparkSession using builder pattern in python test code 2016-05-23 18:14:48 -07:00
context.py [SPARK-11775][PYSPARK][SQL] Allow PySpark to register Java UDF 2016-10-14 15:50:35 -07:00
dataframe.py [SPARK-18447][DOCS] Fix the markdown for Note:/NOTE:/Note that across Python API documentation 2016-11-22 11:40:18 +00:00
functions.py [SPARK-18447][DOCS] Fix the markdown for Note:/NOTE:/Note that across Python API documentation 2016-11-22 11:40:18 +00:00
group.py [MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation 2016-07-06 10:45:51 -07:00
readwriter.py [SPARK-17764][SQL] Add to_json supporting to convert nested struct column to JSON string 2016-11-01 12:46:41 -07:00
session.py [SPARK-17720][SQL] introduce static SQL conf 2016-10-11 20:27:08 -07:00
streaming.py [SPARK-18754][SS] Rename recentProgresses to recentProgress 2016-12-07 15:36:29 -08:00
tests.py [SPARK-18667][PYSPARK][SQL] Change the way to group row in BatchEvalPythonExec so input_file_name function can work with UDF in pyspark 2016-12-08 23:22:18 +08:00
types.py [SPARK-17215][SQL] Method SQLContext.parseDataType(dataTypeString: String) could be removed. 2016-08-24 23:36:04 -07:00
utils.py [SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery 2016-06-15 10:46:07 -07:00
window.py [SPARK-18690][PYTHON][SQL] Backward compatibility of unbounded frames 2016-12-02 17:39:28 -08:00