For detailed usage, please see :func:`pandas_udf`.
Pandas Function APIs
--------------------
..currentmodule:: pyspark.sql
Pandas Function APIs can directly apply a Python native function against the whole :class:`DataFrame` by
using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer
data and Pandas to work with the data, which allows vectorized operations. However, a Pandas Function
API behaves as a regular API under PySpark :class:`DataFrame` instead of :class:`Column`, and Python type hints in Pandas
Functions APIs are optional and do not affect how it works internally at this moment although they
might be required in the future.
..currentmodule:: pyspark.sql.functions
From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API,
``DataFrame.groupby().applyInPandas()``. It is still possible to use it with ``pyspark.sql.functions.PandasUDFType``
and ``DataFrame.groupby().apply()`` as it was; however, it is preferred to use
``DataFrame.groupby().applyInPandas()`` directly. Using ``pyspark.sql.functions.PandasUDFType`` will be deprecated
in the future.
Grouped Map
~~~~~~~~~~~
..currentmodule:: pyspark.sql
Grouped map operations with Pandas instances are supported by ``DataFrame.groupby().applyInPandas()``
which requires a Python function that takes a ``pandas.DataFrame`` and return another ``pandas.DataFrame``.
It maps each group to each ``pandas.DataFrame`` in the Python function.
This API implements the "split-apply-combine" pattern which consists of three steps:
* Split the data into groups by using :meth:`DataFrame.groupBy`.
* Apply a function on each group. The input and output of the function are both ``pandas.DataFrame``. The input data contains all the rows and columns for each group.
* Combine the results into a new PySpark :class:`DataFrame`.
To use ``DataFrame.groupBy().applyInPandas()``, the user needs to define the following:
* A Python function that defines the computation for each group.
* A ``StructType`` object or a string that defines the schema of the output PySpark :class:`DataFrame`.
The column labels of the returned ``pandas.DataFrame`` must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See `pandas.DataFrame <https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame>`_
on how to label columns when constructing a ``pandas.DataFrame``.
Note that all data for a group will be loaded into memory before the function is applied. This can
lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for
`maxRecordsPerBatch <arrow_pandas.rst#setting-arrow-batch-size>`_ is not applied on groups and it is up to the user
to ensure that the grouped data will fit into the available memory.
The following example shows how to use ``DataFrame.groupby().applyInPandas()`` to subtract the mean from each value
For detailed usage, please see :meth:`DataFrame.mapInPandas`.
Co-grouped Map
~~~~~~~~~~~~~~
..currentmodule:: pyspark.sql
Co-grouped map operations with Pandas instances are supported by ``DataFrame.groupby().cogroup().applyInPandas()`` which
allows two PySpark :class:`DataFrame`\s to be cogrouped by a common key and then a Python function applied to each
cogroup. It consists of the following steps:
* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
* Apply a function to each cogroup. The input of the function is two ``pandas.DataFrame`` (with an optional tuple representing the key). The output of the function is a ``pandas.DataFrame``.
* Combine the ``pandas.DataFrame``\s from all groups into a new PySpark :class:`DataFrame`.
To use ``groupBy().cogroup().applyInPandas()``, the user needs to define the following:
* A Python function that defines the computation for each cogroup.
* A ``StructType`` object or a string that defines the schema of the output PySpark :class:`DataFrame`.
The column labels of the returned ``pandas.DataFrame`` must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See `pandas.DataFrame <https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame>`_.
on how to label columns when constructing a ``pandas.DataFrame``.
Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
memory exceptions, especially if the group sizes are skewed. The configuration for `maxRecordsPerBatch <arrow_pandas.rst#setting-arrow-batch-size>`_
is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.
The following example shows how to use ``DataFrame.groupby().cogroup().applyInPandas()`` to perform an asof join between two datasets.
Since Spark 3.2, the Spark configuration ``spark.sql.execution.arrow.pyspark.selfDestruct.enabled`` can be used to enable PyArrow's ``self_destruct`` feature, which can save memory when creating a Pandas DataFrame via ``toPandas`` by freeing Arrow-allocated memory while building the Pandas DataFrame.
This option is experimental, and some operations may fail on the resulting Pandas DataFrame due to immutable backing arrays.
Typically, you would see the error ``ValueError: buffer source array is read-only``.
Newer versions of Pandas may fix these errors by improving support for such cases.
You can work around this error by copying the column(s) beforehand.
Additionally, this conversion may be slower because it is single-threaded.