spark-instrumented-optimizer/python/pyspark
Li Jin 9786ce66c5 [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames
## What changes were proposed in this pull request?
This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions.

```
       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
       >>> from pyspark.sql import Window
       >>> df = spark.createDataFrame(
       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
       ...     ("id", "v"))
       >>> pandas_udf("double", PandasUDFType.GROUPED_AGG)
       ... def mean_udf(v):
       ...     return v.mean()
       >>> w = Window.partitionBy('id')
       >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
       +---+----+------+
       | id|   v|mean_v|
       +---+----+------+
       |  1| 1.0|   1.5|
       |  1| 2.0|   1.5|
       |  2| 3.0|   6.0|
       |  2| 5.0|   6.0|
       |  2|10.0|   6.0|
       +---+----+------+
```

The scope of this PR is somewhat limited in terms of:
(1) Only supports unbounded window, which acts essentially as group by.
(2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping)

Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR.

## How was this patch tested?

WindowPandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21082 from icexelloss/SPARK-22239-window-udf.
2018-06-13 09:10:52 +08:00
..
ml [SPARK-15064][ML] Locale support in StopWordsRemover 2018-06-12 08:16:37 -07:00
mllib [SPARK-15750][MLLIB][PYSPARK] Constructing FPGrowth fails when no numPartitions specified in pyspark 2018-05-07 14:47:58 -07:00
sql [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames 2018-06-13 09:10:52 +08:00
streaming [SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch in PythonTransformFunction 2018-06-09 01:27:51 +07:00
__init__.py [SPARK-23328][PYTHON] Disallow default value None in na.replace/replace when 'to_replace' is not a dictionary 2018-02-09 14:21:10 +08:00
_globals.py [SPARK-23328][PYTHON] Disallow default value None in na.replace/replace when 'to_replace' is not a dictionary 2018-02-09 14:21:10 +08:00
accumulators.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
broadcast.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
cloudpickle.py [SPARK-24303][PYTHON] Update cloudpickle to v0.4.4 2018-05-18 09:53:24 -07:00
conf.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
context.py [SPARK-21945][YARN][PYTHON] Make --py-files work with PySpark shell in Yarn client mode 2018-05-17 12:07:58 +08:00
daemon.py [PYSPARK] Update py4j to version 0.10.7. 2018-05-09 10:47:35 -07:00
files.py [SPARK-3309] [PySpark] Put all public API in __all__ 2014-09-03 11:49:45 -07:00
find_spark_home.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
heapq3.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
java_gateway.py [PYSPARK] Update py4j to version 0.10.7. 2018-05-09 10:47:35 -07:00
join.py [SPARK-14202] [PYTHON] Use generator expression instead of list comp in python_full_outer_jo… 2016-03-28 14:51:36 -07:00
profiler.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
rdd.py [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames 2018-06-13 09:10:52 +08:00
rddsampler.py [SPARK-4897] [PySpark] Python 3 support 2015-04-16 16:20:57 -07:00
resultiterable.py [SPARK-3074] [PySpark] support groupByKey() with single huge key 2015-04-09 17:07:23 -07:00
serializers.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
shell.py [SPARK-16451][REPL] Fail shell if SparkSession fails to start. 2018-06-05 08:29:29 +07:00
shuffle.py [SPARK-23754][PYTHON] Re-raising StopIteration in client code 2018-05-30 18:11:33 +08:00
statcounter.py [SPARK-6919] [PYSPARK] Add asDict method to StatCounter 2015-09-29 13:38:15 -07:00
status.py [SPARK-4172] [PySpark] Progress API in Python 2015-02-17 13:36:43 -08:00
storagelevel.py [SPARK-13992][CORE][PYSPARK][FOLLOWUP] Update OFF_HEAP semantics for Java api and Python api 2016-04-12 23:06:55 -07:00
taskcontext.py [SPARK-24397][PYSPARK] Added TaskContext.getLocalProperty(key) in Python 2018-05-31 11:23:57 -07:00
tests.py [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from driver to executor 2018-06-11 10:15:42 +08:00
traceback_utils.py [SPARK-1087] Move python traceback utilities into new traceback_utils.py file. 2014-09-15 19:28:17 -07:00
util.py [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from driver to executor 2018-06-11 10:15:42 +08:00
version.py [SPARK-23028] Bump master branch version to 2.4.0-SNAPSHOT 2018-01-13 00:37:59 +08:00
worker.py [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames 2018-06-13 09:10:52 +08:00