86100df54b
## What changes were proposed in this pull request? This PR implements a new feature - window aggregation Pandas UDF for bounded window. #### Doc: https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#heading=h.c87w44wcj3wj #### Example: ``` from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.window import Window df = spark.range(0, 10, 2).toDF('v') w1 = Window.partitionBy().orderBy('v').rangeBetween(-2, 4) w2 = Window.partitionBy().orderBy('v').rowsBetween(-2, 2) pandas_udf('double', PandasUDFType.GROUPED_AGG) def avg(v): return v.mean() df.withColumn('v_mean', avg(df['v']).over(w1)).show() # +---+------+ # | v|v_mean| # +---+------+ # | 0| 1.0| # | 2| 2.0| # | 4| 4.0| # | 6| 6.0| # | 8| 7.0| # +---+------+ df.withColumn('v_mean', avg(df['v']).over(w2)).show() # +---+------+ # | v|v_mean| # +---+------+ # | 0| 2.0| # | 2| 3.0| # | 4| 4.0| # | 6| 5.0| # | 8| 6.0| # +---+------+ ``` #### High level changes: This PR modifies the existing WindowInPandasExec physical node to deal with unbounded (growing, shrinking and sliding) windows. * `WindowInPandasExec` now share the same base class as `WindowExec` and share utility functions. See `WindowExecBase` * `WindowFunctionFrame` now has two new functions `currentLowerBound` and `currentUpperBound` - to return the lower and upper window bound for the current output row. It is also modified to allow `AggregateProcessor` == null. Null aggregator processor is used for `WindowInPandasExec` where we don't have an aggregator and only uses lower and upper bound functions from `WindowFunctionFrame` * The biggest change is in `WindowInPandasExec`, where it is modified to take `currentLowerBound` and `currentUpperBound` and write those values together with the input data to the python process for rolling window aggregation. See `WindowInPandasExec` for more details. #### Discussion In benchmarking, I found numpy variant of the rolling window UDF is much faster than the pandas version: Spark SQL window function: 20s Pandas variant: ~80s Numpy variant: 10s Numpy variant with numba: 4s Allowing numpy variant of the vectorized UDFs is something I want to discuss because of the performance improvement, but doesn't have to be in this PR. ## How was this patch tested? New tests Closes #22305 from icexelloss/SPARK-24561-bounded-window-udf. Authored-by: Li Jin <ice.xelloss@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> |
||
---|---|---|
.. | ||
docs | ||
lib | ||
pyspark | ||
test_coverage | ||
test_support | ||
.coveragerc | ||
.gitignore | ||
MANIFEST.in | ||
pylintrc | ||
README.md | ||
run-tests | ||
run-tests-with-coverage | ||
run-tests.py | ||
setup.cfg | ||
setup.py |
Apache Spark
Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.
Online Documentation
You can find the latest Spark documentation, including a programming guide, on the project web page
Python Packaging
This README file only contains basic information related to pip installed PySpark. This packaging is currently experimental and may change in future versions (although we will do our best to keep compatibility). Using PySpark requires the Spark JARs, and if you are building this from source please see the builder instructions at "Building Spark".
The Python packaging for Spark is not intended to replace all of the other use cases. This Python packaged version of Spark is suitable for interacting with an existing cluster (be it Spark standalone, YARN, or Mesos) - but does not contain the tools required to set up your own standalone Spark cluster. You can download the full version of Spark from the Apache Spark downloads page.
NOTE: If you are using this with a Spark standalone cluster you must ensure that the version (including minor version) matches or you may experience odd errors.
Python Requirements
At its core PySpark depends on Py4J (currently version 0.10.8.1), but some additional sub-packages have their own extra requirements for some features (including numpy, pandas, and pyarrow).