From c15462917114c9066001a8c003597fd609fc50e4 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 28 Aug 2020 15:09:06 +0900 Subject: [PATCH] [SPARK-32183][DOCS][PYTHON] User Guide - PySpark Usage Guide for Pandas with Apache Arrow ### What changes were proposed in this pull request? This PR proposes to move Arrow usage guide from Spark documentation site to PySpark documentation site (at "User Guide"). Here is the demo for reviewing quicker: https://hyukjin-spark.readthedocs.io/en/stable/user_guide/arrow_pandas.html ### Why are the changes needed? To have a single place for PySpark users, and better documentation. ### Does this PR introduce _any_ user-facing change? Yes, it will move https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html to our PySpark documentation. ### How was this patch tested? ```bash cd docs SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 jekyll serve --watch ``` and ```bash cd python/docs make clean html ``` Closes #29548 from HyukjinKwon/SPARK-32183. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- docs/sql-pyspark-pandas-with-arrow.md | 349 +-------------- examples/src/main/python/sql/arrow.py | 22 +- python/docs/source/reference/pyspark.sql.rst | 2 + .../docs/source/user_guide/arrow_pandas.rst | 411 ++++++++++++++++++ python/docs/source/user_guide/index.rst | 5 + python/pyspark/sql/pandas/group_ops.py | 4 +- .../apache/spark/sql/internal/SQLConf.scala | 2 +- 7 files changed, 426 insertions(+), 369 deletions(-) create mode 100644 python/docs/source/user_guide/arrow_pandas.rst diff --git a/docs/sql-pyspark-pandas-with-arrow.md b/docs/sql-pyspark-pandas-with-arrow.md index a11162963b..6895376dfb 100644 --- a/docs/sql-pyspark-pandas-with-arrow.md +++ b/docs/sql-pyspark-pandas-with-arrow.md @@ -19,351 +19,4 @@ license: | limitations under the License. --- -* Table of contents -{:toc} - -## Apache Arrow in PySpark - -Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer -data between JVM and Python processes. This currently is most beneficial to Python users that -work with Pandas/NumPy data. Its usage is not automatic and might require some minor -changes to configuration or code to take full advantage and ensure compatibility. This guide will -give a high-level description of how to use Arrow in Spark and highlight any differences when -working with Arrow-enabled data. - -### Ensure PyArrow Installed - -To use Apache Arrow in PySpark, [the recommended version of PyArrow](#recommended-pandas-and-pyarrow-versions) -should be installed. -If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the -SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow -is installed and available on all cluster nodes. -You can install using pip or conda from the conda-forge channel. See PyArrow -[installation](https://arrow.apache.org/docs/python/install.html) for details. - -## Enabling for Conversion to/from Pandas - -Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame -using the call `toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with -`createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set -the Spark configuration `spark.sql.execution.arrow.pyspark.enabled` to `true`. This is disabled by default. - -In addition, optimizations enabled by `spark.sql.execution.arrow.pyspark.enabled` could fallback automatically -to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. -This can be controlled by `spark.sql.execution.arrow.pyspark.fallback.enabled`. - -
-
-{% include_example dataframe_with_arrow python/sql/arrow.py %} -
-
- -Using the above optimizations with Arrow will produce the same results as when Arrow is not -enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the -DataFrame to the driver program and should be done on a small subset of the data. Not all Spark -data types are currently supported and an error can be raised if a column has an unsupported type, -see [Supported SQL Types](#supported-sql-types). If an error occurs during `createDataFrame()`, -Spark will fall back to create the DataFrame without Arrow. - -## Pandas UDFs (a.k.a. Vectorized UDFs) - -Pandas UDFs are user defined functions that are executed by Spark using -Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas -UDF is defined using the `pandas_udf` as a decorator or to wrap the function, and no additional -configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. - -Before Spark 3.0, Pandas UDFs used to be defined with `PandasUDFType`. From Spark 3.0 -with Python 3.6+, you can also use [Python type hints](https://www.python.org/dev/peps/pep-0484). -Using Python type hints are preferred and using `PandasUDFType` will be deprecated in -the future release. - -Note that the type hint should use `pandas.Series` in all cases but there is one variant -that `pandas.DataFrame` should be used for its input or output type hint instead when the input -or output column is of `StructType`. The following example shows a Pandas UDF which takes long -column, string column and struct column, and outputs a struct column. It requires the function to -specify the type hints of `pandas.Series` and `pandas.DataFrame` as below: - -

-

-
-{% include_example ser_to_frame_pandas_udf python/sql/arrow.py %} -
-
-

- -In the following sections, it describes the combinations of the supported type hints. For simplicity, -`pandas.DataFrame` variant is omitted. - -### Series to Series - -The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`. - -By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given -function takes one or more `pandas.Series` and outputs one `pandas.Series`. The output of the function should -always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting -columns into batches and calling the function for each batch as a subset of the data, then concatenating -the results together. - -The following example shows how to create this Pandas UDF that computes the product of 2 columns. - -
-
-{% include_example ser_to_ser_pandas_udf python/sql/arrow.py %} -
-
- -For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) - -### Iterator of Series to Iterator of Series - -The type hint can be expressed as `Iterator[pandas.Series]` -> `Iterator[pandas.Series]`. - -By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given -function takes an iterator of `pandas.Series` and outputs an iterator of `pandas.Series`. The -length of the entire output from the function should be the same length of the entire input; therefore, it can -prefetch the data from the input iterator as long as the lengths are the same. -In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use -multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator -of Series. - -It is also useful when the UDF execution requires initializing some states although internally it works -identically as Series to Series case. The pseudocode below illustrates the example. - -{% highlight python %} -@pandas_udf("long") -def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: - # Do some expensive initialization with a state - state = very_expensive_initialization() - for x in iterator: - # Use that state for whole iterator. - yield calculate_with_state(x, state) - -df.select(calculate("value")).show() -{% endhighlight %} - -The following example shows how to create this Pandas UDF: - -
-
-{% include_example iter_ser_to_iter_ser_pandas_udf python/sql/arrow.py %} -
-
- -For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) - -### Iterator of Multiple Series to Iterator of Series - -The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -> `Iterator[pandas.Series]`. - -By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the -given function takes an iterator of a tuple of multiple `pandas.Series` and outputs an iterator of `pandas.Series`. -In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple -when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series -to Iterator of Series case. - -The following example shows how to create this Pandas UDF: - -
-
-{% include_example iter_sers_to_iter_ser_pandas_udf python/sql/arrow.py %} -
-
- -For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) - -### Series to Scalar - -The type hint can be expressed as `pandas.Series`, ... -> `Any`. - -By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF similar -to PySpark's aggregate functions. The given function takes `pandas.Series` and returns a scalar value. -The return type should be a primitive data type, and the returned scalar can be either a python -primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. -`Any` should ideally be a specific scalar type accordingly. - -This UDF can be also used with `groupBy().agg()` and [`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). -It defines an aggregation from one or more `pandas.Series` to a scalar value, where each `pandas.Series` -represents a column within the group or window. - -Note that this type of UDF does not support partial aggregation and all data for a group or window -will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas -UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by -and window operations: - -
-
-{% include_example ser_to_scalar_pandas_udf python/sql/arrow.py %} -
-
- -For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) - - -## Pandas Function APIs - -Pandas Function APIs can directly apply a Python native function against the whole `DataFrame` by -using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer -data and Pandas to work with the data, which allows vectorized operations. However, A Pandas Function -API behaves as a regular API under PySpark `DataFrame` instead of `Column`, and Python type hints in Pandas -Functions APIs are optional and do not affect how it works internally at this moment although they -might be required in the future. - -From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API, -`DataFrame.groupby().applyInPandas()`. It is still possible to use it with `PandasUDFType` -and `DataFrame.groupby().apply()` as it was; however, it is preferred to use -`DataFrame.groupby().applyInPandas()` directly. Using `PandasUDFType` will be deprecated -in the future. - -### Grouped Map - -Grouped map operations with Pandas instances are supported by `DataFrame.groupby().applyInPandas()` -which requires a Python function that takes a `pandas.DataFrame` and return another `pandas.DataFrame`. -It maps each group to each `pandas.DataFrame` in the Python function. - -This API implements the "split-apply-combine" pattern which consists of three steps: -* Split the data into groups by using `DataFrame.groupBy`. -* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The - input data contains all the rows and columns for each group. -* Combine the results into a new PySpark `DataFrame`. - -To use `groupBy().applyInPandas()`, the user needs to define the following: -* A Python function that defines the computation for each group. -* A `StructType` object or a string that defines the schema of the output PySpark `DataFrame`. - -The column labels of the returned `pandas.DataFrame` must either match the field names in the -defined output schema if specified as strings, or match the field data types by position if not -strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame) -on how to label columns when constructing a `pandas.DataFrame`. - -Note that all data for a group will be loaded into memory before the function is applied. This can -lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for -[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user -to ensure that the grouped data will fit into the available memory. - -The following example shows how to use `groupby().applyInPandas()` to subtract the mean from each value -in the group. - -
-
-{% include_example grouped_apply_in_pandas python/sql/arrow.py %} -
-
- -For detailed usage, please see [`pyspark.sql.GroupedData.applyInPandas`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.applyInPandas). - -### Map - -Map operations with Pandas instances are supported by `DataFrame.mapInPandas()` which maps an iterator -of `pandas.DataFrame`s to another iterator of `pandas.DataFrame`s that represents the current -PySpark `DataFrame` and returns the result as a PySpark `DataFrame`. The functions takes and outputs -an iterator of `pandas.DataFrame`. It can return the output of arbitrary length in contrast to some -Pandas UDFs although internally it works similarly with Series to Series Pandas UDF. - -The following example shows how to use `mapInPandas()`: - -
-
-{% include_example map_in_pandas python/sql/arrow.py %} -
-
- -For detailed usage, please see [`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas). - -### Co-grouped Map - -Co-grouped map operations with Pandas instances are supported by `DataFrame.groupby().cogroup().applyInPandas()` which -allows two PySpark `DataFrame`s to be cogrouped by a common key and then a Python function applied to each -cogroup. It consists of the following steps: -* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together. -* Apply a function to each cogroup. The input of the function is two `pandas.DataFrame` (with an optional tuple -representing the key). The output of the function is a `pandas.DataFrame`. -* Combine the `pandas.DataFrame`s from all groups into a new PySpark `DataFrame`. - -To use `groupBy().cogroup().applyInPandas()`, the user needs to define the following: -* A Python function that defines the computation for each cogroup. -* A `StructType` object or a string that defines the schema of the output PySpark `DataFrame`. - -The column labels of the returned `pandas.DataFrame` must either match the field names in the -defined output schema if specified as strings, or match the field data types by position if not -strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame) -on how to label columns when constructing a `pandas.DataFrame`. - -Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of -memory exceptions, especially if the group sizes are skewed. The configuration for [maxRecordsPerBatch](#setting-arrow-batch-size) -is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory. - -The following example shows how to use `groupby().cogroup().applyInPandas()` to perform an asof join between two datasets. - -
-
-{% include_example cogrouped_apply_in_pandas python/sql/arrow.py %} -
-
- -For detailed usage, please see [`pyspark.sql.PandasCogroupedOps.applyInPandas()`](api/python/pyspark.sql.html#pyspark.sql.PandasCogroupedOps.applyInPandas). - - -## Usage Notes - -### Supported SQL Types - -Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`, -`ArrayType` of `TimestampType`, and nested `StructType`. - -### Setting Arrow Batch Size - -Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to -high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow -record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch" -to an integer that will determine the maximum number of rows for each batch. The default value is -10,000 records per batch. If the number of columns is large, the value should be adjusted -accordingly. Using this limit, each data partition will be made into 1 or more record batches for -processing. - -### Timestamp with Time Zone Semantics - -Spark internally stores timestamps as UTC values, and timestamp data that is brought in without -a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp -data is exported or displayed in Spark, the session time zone is used to localize the timestamp -values. The session time zone is set with the configuration 'spark.sql.session.timeZone' and will -default to the JVM system local time zone if not set. Pandas uses a `datetime64` type with nanosecond -resolution, `datetime64[ns]`, with optional time zone on a per-column basis. - -When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds -and each column will be converted to the Spark session time zone then localized to that time -zone, which removes the time zone and displays values as local time. This will occur -when calling `toPandas()` or `pandas_udf` with timestamp columns. - -When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This -occurs when calling `createDataFrame` with a Pandas DataFrame or when returning a timestamp from a -`pandas_udf`. These conversions are done automatically to ensure Spark will have data in the -expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond -values will be truncated. - -Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is -different than a Pandas timestamp. It is recommended to use Pandas time series functionality when -working with timestamps in `pandas_udf`s to get the best performance, see -[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details. - -### Recommended Pandas and PyArrow Versions - -For usage with pyspark.sql, the supported versions of Pandas is 0.24.2 and PyArrow is 0.15.1. Higher -versions may be used, however, compatibility and data correctness can not be guaranteed and should -be verified by the user. - -### Compatibility Setting for PyArrow >= 0.15.0 and Spark 2.3.x, 2.4.x - -Since Arrow 0.15.0, a change in the binary IPC format requires an environment variable to be -compatible with previous versions of Arrow <= 0.14.1. This is only necessary to do for PySpark -users with versions 2.3.x and 2.4.x that have manually upgraded PyArrow to 0.15.0. The following -can be added to `conf/spark-env.sh` to use the legacy Arrow IPC format: - -``` -ARROW_PRE_0_15_IPC_FORMAT=1 -``` - -This will instruct PyArrow >= 0.15.0 to use the legacy IPC format with the older Arrow Java that -is in Spark 2.3.x and 2.4.x. Not setting this environment variable will lead to a similar error as -described in [SPARK-29367](https://issues.apache.org/jira/browse/SPARK-29367) when running -`pandas_udf`s or `toPandas()` with Arrow enabled. More information about the Arrow IPC change can -be read on the Arrow 0.15.0 release [blog](http://arrow.apache.org/blog/2019/10/06/0.15.0-release/#columnar-streaming-protocol-change-since-0140). +The Arrow usage guide is now archived on [this page](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html). diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index e46449dbef..1789a54f02 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -21,6 +21,9 @@ Run with: ./bin/spark-submit examples/src/main/python/sql/arrow.py """ +# NOTE that this file is imported in user guide in PySpark documentation. +# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx. + from pyspark.sql import SparkSession from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version @@ -29,7 +32,6 @@ require_minimum_pyarrow_version() def dataframe_with_arrow_example(spark): - # $example on:dataframe_with_arrow$ import numpy as np import pandas as pd @@ -44,12 +46,11 @@ def dataframe_with_arrow_example(spark): # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow result_pdf = df.select("*").toPandas() - # $example off:dataframe_with_arrow$ + print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe())) def ser_to_frame_pandas_udf_example(spark): - # $example on:ser_to_frame_pandas_udf$ import pandas as pd from pyspark.sql.functions import pandas_udf @@ -75,11 +76,9 @@ def ser_to_frame_pandas_udf_example(spark): # |-- func(long_col, string_col, struct_col): struct (nullable = true) # | |-- col1: string (nullable = true) # | |-- col2: long (nullable = true) - # $example off:ser_to_frame_pandas_udf$$ def ser_to_ser_pandas_udf_example(spark): - # $example on:ser_to_ser_pandas_udf$ import pandas as pd from pyspark.sql.functions import col, pandas_udf @@ -111,11 +110,9 @@ def ser_to_ser_pandas_udf_example(spark): # | 4| # | 9| # +-------------------+ - # $example off:ser_to_ser_pandas_udf$ def iter_ser_to_iter_ser_pandas_udf_example(spark): - # $example on:iter_ser_to_iter_ser_pandas_udf$ from typing import Iterator import pandas as pd @@ -139,11 +136,9 @@ def iter_ser_to_iter_ser_pandas_udf_example(spark): # | 3| # | 4| # +-----------+ - # $example off:iter_ser_to_iter_ser_pandas_udf$ def iter_sers_to_iter_ser_pandas_udf_example(spark): - # $example on:iter_sers_to_iter_ser_pandas_udf$ from typing import Iterator, Tuple import pandas as pd @@ -168,11 +163,9 @@ def iter_sers_to_iter_ser_pandas_udf_example(spark): # | 4| # | 9| # +-----------------------+ - # $example off:iter_sers_to_iter_ser_pandas_udf$ def ser_to_scalar_pandas_udf_example(spark): - # $example on:ser_to_scalar_pandas_udf$ import pandas as pd from pyspark.sql.functions import pandas_udf @@ -215,11 +208,9 @@ def ser_to_scalar_pandas_udf_example(spark): # | 2| 5.0| 6.0| # | 2|10.0| 6.0| # +---+----+------+ - # $example off:ser_to_scalar_pandas_udf$ def grouped_apply_in_pandas_example(spark): - # $example on:grouped_apply_in_pandas$ df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) @@ -239,11 +230,9 @@ def grouped_apply_in_pandas_example(spark): # | 2|-1.0| # | 2| 4.0| # +---+----+ - # $example off:grouped_apply_in_pandas$ def map_in_pandas_example(spark): - # $example on:map_in_pandas$ df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) def filter_func(iterator): @@ -256,11 +245,9 @@ def map_in_pandas_example(spark): # +---+---+ # | 1| 21| # +---+---+ - # $example off:map_in_pandas$ def cogrouped_apply_in_pandas_example(spark): - # $example on:cogrouped_apply_in_pandas$ import pandas as pd df1 = spark.createDataFrame( @@ -284,7 +271,6 @@ def cogrouped_apply_in_pandas_example(spark): # |20000101| 2|2.0| y| # |20000102| 2|4.0| y| # +--------+---+---+---+ - # $example off:cogrouped_apply_in_pandas$ if __name__ == "__main__": diff --git a/python/docs/source/reference/pyspark.sql.rst b/python/docs/source/reference/pyspark.sql.rst index 7e0357cf9d..e5348c6c6e 100644 --- a/python/docs/source/reference/pyspark.sql.rst +++ b/python/docs/source/reference/pyspark.sql.rst @@ -33,6 +33,7 @@ Core Classes Column Row GroupedData + PandasCogroupedOps DataFrameNaFunctions DataFrameStatFunctions Window @@ -539,4 +540,5 @@ Grouping GroupedData.min GroupedData.pivot GroupedData.sum + PandasCogroupedOps.applyInPandas diff --git a/python/docs/source/user_guide/arrow_pandas.rst b/python/docs/source/user_guide/arrow_pandas.rst new file mode 100644 index 0000000000..b1768e3f0e --- /dev/null +++ b/python/docs/source/user_guide/arrow_pandas.rst @@ -0,0 +1,411 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +======================= +Apache Arrow in PySpark +======================= + +.. currentmodule:: pyspark.sql + +Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer +data between JVM and Python processes. This currently is most beneficial to Python users that +work with Pandas/NumPy data. Its usage is not automatic and might require some minor +changes to configuration or code to take full advantage and ensure compatibility. This guide will +give a high-level description of how to use Arrow in Spark and highlight any differences when +working with Arrow-enabled data. + +Ensure PyArrow Installed +------------------------ + +To use Apache Arrow in PySpark, `the recommended version of PyArrow `_ +should be installed. +If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the +SQL module with the command ``pip install pyspark[sql]``. Otherwise, you must ensure that PyArrow +is installed and available on all cluster nodes. +You can install using pip or conda from the conda-forge channel. See PyArrow +`installation `_ for details. + +Enabling for Conversion to/from Pandas +-------------------------------------- + +Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame +using the call :meth:`DataFrame.toPandas` and when creating a Spark DataFrame from a Pandas DataFrame with +:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls, users need to first set +the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` to ``true``. This is disabled by default. + +In addition, optimizations enabled by ``spark.sql.execution.arrow.pyspark.enabled`` could fallback automatically +to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. +This can be controlled by ``spark.sql.execution.arrow.pyspark.fallback.enabled``. + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 35-48 + :dedent: 4 + +Using the above optimizations with Arrow will produce the same results as when Arrow is not +enabled. + +Note that even with Arrow, :meth:`DataFrame.toPandas` results in the collection of all records in the +DataFrame to the driver program and should be done on a small subset of the data. Not all Spark +data types are currently supported and an error can be raised if a column has an unsupported type. +If an error occurs during :meth:`SparkSession.createDataFrame`, Spark will fall back to create the +DataFrame without Arrow. + +Pandas UDFs (a.k.a. Vectorized UDFs) +------------------------------------ + +.. currentmodule:: pyspark.sql.functions + +Pandas UDFs are user defined functions that are executed by Spark using +Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas +UDF is defined using the :meth:`pandas_udf` as a decorator or to wrap the function, and no additional +configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. + +Before Spark 3.0, Pandas UDFs used to be defined with ``pyspark.sql.functions.PandasUDFType``. From Spark 3.0 +with Python 3.6+, you can also use `Python type hints `_. +Using Python type hints is preferred and using ``pyspark.sql.functions.PandasUDFType`` will be deprecated in +the future release. + +.. currentmodule:: pyspark.sql.types + +Note that the type hint should use ``pandas.Series`` in all cases but there is one variant +that ``pandas.DataFrame`` should be used for its input or output type hint instead when the input +or output column is of :class:`StructType`. The following example shows a Pandas UDF which takes long +column, string column and struct column, and outputs a struct column. It requires the function to +specify the type hints of ``pandas.Series`` and ``pandas.DataFrame`` as below: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 54-78 + :dedent: 4 + +In the following sections, it describes the combinations of the supported type hints. For simplicity, +``pandas.DataFrame`` variant is omitted. + +Series to Series +~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.functions + +The type hint can be expressed as ``pandas.Series``, ... -> ``pandas.Series``. + +By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given +function takes one or more ``pandas.Series`` and outputs one ``pandas.Series``. The output of the function should +always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting +columns into batches and calling the function for each batch as a subset of the data, then concatenating +the results together. + +The following example shows how to create this Pandas UDF that computes the product of 2 columns. + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 82-112 + :dedent: 4 + +For detailed usage, please see :func:`pandas_udf`. + +Iterator of Series to Iterator of Series +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.functions + +The type hint can be expressed as ``Iterator[pandas.Series]`` -> ``Iterator[pandas.Series]``. + +By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given +function takes an iterator of ``pandas.Series`` and outputs an iterator of ``pandas.Series``. The +length of the entire output from the function should be the same length of the entire input; therefore, it can +prefetch the data from the input iterator as long as the lengths are the same. +In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use +multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator +of Series. + +It is also useful when the UDF execution requires initializing some states although internally it works +identically as Series to Series case. The pseudocode below illustrates the example. + +.. code-block:: python + + @pandas_udf("long") + def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + # Do some expensive initialization with a state + state = very_expensive_initialization() + for x in iterator: + # Use that state for whole iterator. + yield calculate_with_state(x, state) + + df.select(calculate("value")).show() + +The following example shows how to create this Pandas UDF: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 116-138 + :dedent: 4 + +For detailed usage, please see :func:`pandas_udf`. + +Iterator of Multiple Series to Iterator of Series +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.functions + +The type hint can be expressed as ``Iterator[Tuple[pandas.Series, ...]]`` -> ``Iterator[pandas.Series]``. + +By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the +given function takes an iterator of a tuple of multiple ``pandas.Series`` and outputs an iterator of ``pandas.Series``. +In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple +when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series +to Iterator of Series case. + +The following example shows how to create this Pandas UDF: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 142-165 + :dedent: 4 + +For detailed usage, please see :func:`pandas_udf`. + +Series to Scalar +~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.functions + +The type hint can be expressed as ``pandas.Series``, ... -> ``Any``. + +By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF similar +to PySpark's aggregate functions. The given function takes `pandas.Series` and returns a scalar value. +The return type should be a primitive data type, and the returned scalar can be either a python +primitive type, e.g., ``int`` or ``float`` or a numpy data type, e.g., ``numpy.int64`` or ``numpy.float64``. +``Any`` should ideally be a specific scalar type accordingly. + +.. currentmodule:: pyspark.sql + +This UDF can be also used with :meth:`GroupedData.agg` and `Window`. +It defines an aggregation from one or more ``pandas.Series`` to a scalar value, where each ``pandas.Series`` +represents a column within the group or window. + +Note that this type of UDF does not support partial aggregation and all data for a group or window +will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas +UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by +and window operations: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 169-210 + :dedent: 4 + +.. currentmodule:: pyspark.sql.functions + +For detailed usage, please see :func:`pandas_udf`. + +Pandas Function APIs +-------------------- + +.. currentmodule:: pyspark.sql + +Pandas Function APIs can directly apply a Python native function against the whole :class:`DataFrame` by +using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer +data and Pandas to work with the data, which allows vectorized operations. However, a Pandas Function +API behaves as a regular API under PySpark :class:`DataFrame` instead of :class:`Column`, and Python type hints in Pandas +Functions APIs are optional and do not affect how it works internally at this moment although they +might be required in the future. + +.. currentmodule:: pyspark.sql.functions + +From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API, +``DataFrame.groupby().applyInPandas()``. It is still possible to use it with ``pyspark.sql.functions.PandasUDFType`` +and ``DataFrame.groupby().apply()`` as it was; however, it is preferred to use +``DataFrame.groupby().applyInPandas()`` directly. Using ``pyspark.sql.functions.PandasUDFType`` will be deprecated +in the future. + +Grouped Map +~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql + +Grouped map operations with Pandas instances are supported by ``DataFrame.groupby().applyInPandas()`` +which requires a Python function that takes a ``pandas.DataFrame`` and return another ``pandas.DataFrame``. +It maps each group to each ``pandas.DataFrame`` in the Python function. + +This API implements the "split-apply-combine" pattern which consists of three steps: + +* Split the data into groups by using :meth:`DataFrame.groupBy`. + +* Apply a function on each group. The input and output of the function are both ``pandas.DataFrame``. The input data contains all the rows and columns for each group. + +* Combine the results into a new PySpark :class:`DataFrame`. + +To use ``DataFrame.groupBy().applyInPandas()``, the user needs to define the following: + +* A Python function that defines the computation for each group. + +* A ``StructType`` object or a string that defines the schema of the output PySpark :class:`DataFrame`. + +The column labels of the returned ``pandas.DataFrame`` must either match the field names in the +defined output schema if specified as strings, or match the field data types by position if not +strings, e.g. integer indices. See `pandas.DataFrame `_ +on how to label columns when constructing a ``pandas.DataFrame``. + +Note that all data for a group will be loaded into memory before the function is applied. This can +lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for +`maxRecordsPerBatch `_ is not applied on groups and it is up to the user +to ensure that the grouped data will fit into the available memory. + +The following example shows how to use ``DataFrame.groupby().applyInPandas()`` to subtract the mean from each value +in the group. + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 214-232 + :dedent: 4 + +For detailed usage, please see please see :meth:`GroupedData.applyInPandas` + +Map +~~~ + +Map operations with Pandas instances are supported by :meth:`DataFrame.mapInPandas` which maps an iterator +of ``pandas.DataFrame``\s to another iterator of ``pandas.DataFrame``\s that represents the current +PySpark :class:`DataFrame` and returns the result as a PySpark :class:`DataFrame`. The function takes and outputs +an iterator of ``pandas.DataFrame``. It can return the output of arbitrary length in contrast to some +Pandas UDFs although internally it works similarly with Series to Series Pandas UDF. + +The following example shows how to use :meth:`DataFrame.mapInPandas`: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 236-247 + :dedent: 4 + +For detailed usage, please see :meth:`DataFrame.mapInPandas`. + +Co-grouped Map +~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql + +Co-grouped map operations with Pandas instances are supported by ``DataFrame.groupby().cogroup().applyInPandas()`` which +allows two PySpark :class:`DataFrame`\s to be cogrouped by a common key and then a Python function applied to each +cogroup. It consists of the following steps: + +* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together. + +* Apply a function to each cogroup. The input of the function is two ``pandas.DataFrame`` (with an optional tuple representing the key). The output of the function is a ``pandas.DataFrame``. + +* Combine the ``pandas.DataFrame``\s from all groups into a new PySpark :class:`DataFrame`. + +To use ``groupBy().cogroup().applyInPandas()``, the user needs to define the following: + +* A Python function that defines the computation for each cogroup. + +* A ``StructType`` object or a string that defines the schema of the output PySpark :class:`DataFrame`. + +The column labels of the returned ``pandas.DataFrame`` must either match the field names in the +defined output schema if specified as strings, or match the field data types by position if not +strings, e.g. integer indices. See `pandas.DataFrame `_. +on how to label columns when constructing a ``pandas.DataFrame``. + +Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of +memory exceptions, especially if the group sizes are skewed. The configuration for `maxRecordsPerBatch `_ +is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory. + +The following example shows how to use ``DataFrame.groupby().cogroup().applyInPandas()`` to perform an asof join between two datasets. + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 251-273 + :dedent: 4 + + +For detailed usage, please see :meth:`PandasCogroupedOps.applyInPandas` + +Usage Notes +----------- + +Supported SQL Types +~~~~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.types + +Currently, all Spark SQL data types are supported by Arrow-based conversion except :class:`MapType`, +:class:`ArrayType` of :class:`TimestampType`, and nested :class:`StructType`. + +Setting Arrow Batch Size +~~~~~~~~~~~~~~~~~~~~~~~~ + +Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to +high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow +record batches can be adjusted by setting the conf ``spark.sql.execution.arrow.maxRecordsPerBatch`` +to an integer that will determine the maximum number of rows for each batch. The default value is +10,000 records per batch. If the number of columns is large, the value should be adjusted +accordingly. Using this limit, each data partition will be made into 1 or more record batches for +processing. + +Timestamp with Time Zone Semantics +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql + +Spark internally stores timestamps as UTC values, and timestamp data that is brought in without +a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp +data is exported or displayed in Spark, the session time zone is used to localize the timestamp +values. The session time zone is set with the configuration ``spark.sql.session.timeZone`` and will +default to the JVM system local time zone if not set. Pandas uses a ``datetime64`` type with nanosecond +resolution, ``datetime64[ns]``, with optional time zone on a per-column basis. + +When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds +and each column will be converted to the Spark session time zone then localized to that time +zone, which removes the time zone and displays values as local time. This will occur +when calling :meth:`DataFrame.toPandas()` or ``pandas_udf`` with timestamp columns. + +When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This +occurs when calling :meth:`SparkSession.createDataFrame` with a Pandas DataFrame or when returning a timestamp from a +``pandas_udf``. These conversions are done automatically to ensure Spark will have data in the +expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond +values will be truncated. + +Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is +different than a Pandas timestamp. It is recommended to use Pandas time series functionality when +working with timestamps in ``pandas_udf``\s to get the best performance, see +`here `_ for details. + +Recommended Pandas and PyArrow Versions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For usage with pyspark.sql, the supported versions of Pandas is 0.24.2 and PyArrow is 0.15.1. Higher +versions may be used, however, compatibility and data correctness can not be guaranteed and should +be verified by the user. + +Compatibility Setting for PyArrow >= 0.15.0 and Spark 2.3.x, 2.4.x +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Since Arrow 0.15.0, a change in the binary IPC format requires an environment variable to be +compatible with previous versions of Arrow <= 0.14.1. This is only necessary to do for PySpark +users with versions 2.3.x and 2.4.x that have manually upgraded PyArrow to 0.15.0. The following +can be added to ``conf/spark-env.sh`` to use the legacy Arrow IPC format: + +.. code-block:: bash + + ARROW_PRE_0_15_IPC_FORMAT=1 + + +This will instruct PyArrow >= 0.15.0 to use the legacy IPC format with the older Arrow Java that +is in Spark 2.3.x and 2.4.x. Not setting this environment variable will lead to a similar error as +described in `SPARK-29367 `_ when running +``pandas_udf``\s or :meth:`DataFrame.toPandas` with Arrow enabled. More information about the Arrow IPC change can +be read on the Arrow 0.15.0 release `blog `_. + diff --git a/python/docs/source/user_guide/index.rst b/python/docs/source/user_guide/index.rst index e8a8d905f4..c39feace05 100644 --- a/python/docs/source/user_guide/index.rst +++ b/python/docs/source/user_guide/index.rst @@ -20,3 +20,8 @@ User Guide ========== +.. toctree:: + :maxdepth: 2 + + arrow_pandas + diff --git a/python/pyspark/sql/pandas/group_ops.py b/python/pyspark/sql/pandas/group_ops.py index b93f0516ca..ce021fac14 100644 --- a/python/pyspark/sql/pandas/group_ops.py +++ b/python/pyspark/sql/pandas/group_ops.py @@ -187,7 +187,7 @@ class PandasGroupedOpsMixin(object): """ Cogroups this group with another group so that we can run cogrouped operations. - See :class:`CoGroupedData` for the operations that can be run. + See :class:`PandasCogroupedOps` for the operations that can be run. """ from pyspark.sql import GroupedData @@ -230,7 +230,7 @@ class PandasCogroupedOps(object): :param func: a Python native function that takes two `pandas.DataFrame`\\s, and outputs a `pandas.DataFrame`, or that takes one tuple (grouping keys) and two - pandas ``DataFrame``s, and outputs a pandas ``DataFrame``. + pandas ``DataFrame``\\s, and outputs a pandas ``DataFrame``. :param schema: the return type of the `func` in PySpark. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 47cd3c7d62..7b15fd8bcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1810,7 +1810,7 @@ object SQLConf { "1. pyspark.sql.DataFrame.toPandas " + "2. pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame " + "The following data types are unsupported: " + - "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.") + "MapType, ArrayType of TimestampType, and nested StructType.") .version("3.0.0") .fallbackConf(ARROW_EXECUTION_ENABLED)