[SPARK-28264][PYTHON][SQL] Support type hints in pandas UDF and rename/move inconsistent pandas UDF types

### What changes were proposed in this pull request?

This PR proposes to redesign pandas UDFs as described in [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).

```python
from pyspark.sql.functions import pandas_udf
import pandas as pd

pandas_udf("long")
def plug_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.range(10).select(plug_one("id")).show()
```

```
+------------+
|plug_one(id)|
+------------+
|           1|
|           2|
|           3|
|           4|
|           5|
|           6|
|           7|
|           8|
|           9|
|          10|
+------------+
```

Note that, this PR address one of the future improvements described [here](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit#heading=h.h3ncjpk6ujqu), "A couple of less-intuitive pandas UDF types" (by zero323) together.

In short,

- Adds new way with type hints as an alternative and experimental way.
    ```python
    pandas_udf(schema='...')
    def func(c1: Series, c2: Series) -> DataFrame:
        pass
    ```

- Replace and/or add an alias for three types below from UDF, and make them as separate standalone APIs. So, `pandas_udf` is now consistent with regular `udf`s and other expressions.

    `df.mapInPandas(udf)`  -replace-> `df.mapInPandas(f, schema)`
    `df.groupby.apply(udf)`  -alias-> `df.groupby.applyInPandas(f, schema)`
    `df.groupby.cogroup.apply(udf)`  -replace-> `df.groupby.cogroup.applyInPandas(f, schema)`

    *`df.groupby.apply` was added from 2.3 while the other were added in the master only.

- No deprecation for the existing ways for now.
    ```python
    pandas_udf(schema='...', functionType=PandasUDFType.SCALAR)
    def func(c1, c2):
        pass
    ```
If users are happy with this, I plan to deprecate the existing way and declare using type hints is not experimental anymore.

One design goal in this PR was that, avoid touching the internal (since we didn't deprecate the old ways for now), but supports type hints with a minimised changes only at the interface.

- Once we deprecate or remove the old ways, I think it requires another refactoring for the internal in the future. At the very least, we should rename internal pandas evaluation types.
- If users find this experimental type hints isn't quite helpful, we should simply revert the changes at the interface level.

### Why are the changes needed?

In order to address old design issues. Please see [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).

### Does this PR introduce any user-facing change?

For behaviour changes, No.

It adds new ways to use pandas UDFs by using type hints. See below.

**SCALAR**:

```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> Series:
    pass  # DataFrame represents a struct column
```

**SCALAR_ITER**:

```python
pandas_udf(schema='...')
def func(iter: Iterator[Tuple[Series, DataFrame, ...]]) -> Iterator[Series]:
    pass  # Same as SCALAR but wrapped by Iterator
```

**GROUPED_AGG**:

```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> int:
    pass  # DataFrame represents a struct column
```

**GROUPED_MAP**:

This was added in Spark 2.3 as of SPARK-20396. As described above, it keeps the existing behaviour. Additionally, we now have a new alias `groupby.applyInPandas` for `groupby.apply`. See the example below:

```python
def func(pdf):
    return pdf

df.groupby("...").applyInPandas(func, schema=df.schema)
```

**MAP_ITER**: this is not a pandas UDF anymore

This was added in Spark 3.0 as of SPARK-28198; and this PR replaces the usages. See the example below:

```python
def func(iter):
    for df in iter:
        yield df

df.mapInPandas(func, df.schema)
```

**COGROUPED_MAP**: this is not a pandas UDF anymore

This was added in Spark 3.0 as of SPARK-27463; and this PR replaces the usages. See the example below:

```python
def asof_join(left, right):
    return pd.merge_asof(left, right, on="...", by="...")

 df1.groupby("...").cogroup(df2.groupby("...")).applyInPandas(asof_join, schema="...")
```

### How was this patch tested?

Unittests added and tested against Python 2.7, 3.6 and 3.7.

Closes #27165 from HyukjinKwon/revisit-pandas.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
HyukjinKwon 2020-01-22 15:32:58 +09:00
parent 3c4e61918f
commit ab0890bdb1
13 changed files with 653 additions and 292 deletions

View file

@ -368,6 +368,7 @@ pyspark_sql = Module(
"pyspark.sql.pandas.group_ops",
"pyspark.sql.pandas.types",
"pyspark.sql.pandas.serializers",
"pyspark.sql.pandas.typehints",
"pyspark.sql.pandas.utils",
# unittests
"pyspark.sql.tests.test_arrow",
@ -379,12 +380,13 @@ pyspark_sql = Module(
"pyspark.sql.tests.test_datasources",
"pyspark.sql.tests.test_functions",
"pyspark.sql.tests.test_group",
"pyspark.sql.tests.test_pandas_cogrouped_map",
"pyspark.sql.tests.test_pandas_grouped_map",
"pyspark.sql.tests.test_pandas_map",
"pyspark.sql.tests.test_pandas_udf",
"pyspark.sql.tests.test_pandas_udf_cogrouped_map",
"pyspark.sql.tests.test_pandas_udf_grouped_agg",
"pyspark.sql.tests.test_pandas_udf_grouped_map",
"pyspark.sql.tests.test_pandas_udf_iter",
"pyspark.sql.tests.test_pandas_udf_scalar",
"pyspark.sql.tests.test_pandas_udf_typehints",
"pyspark.sql.tests.test_pandas_udf_window",
"pyspark.sql.tests.test_readwriter",
"pyspark.sql.tests.test_serde",

View file

@ -17,11 +17,15 @@
import functools
import sys
import warnings
from pyspark import since
from pyspark.rdd import PythonEvalType
from pyspark.sql.pandas.typehints import infer_eval_type
from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
from pyspark.sql.types import DataType
from pyspark.sql.udf import _create_udf
from pyspark.util import _get_argspec
class PandasUDFType(object):
@ -33,12 +37,8 @@ class PandasUDFType(object):
GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
COGROUPED_MAP = PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF
GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
MAP_ITER = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
@since(2.3)
def pandas_udf(f=None, returnType=None, functionType=None):
@ -51,6 +51,10 @@ def pandas_udf(f=None, returnType=None, functionType=None):
:param functionType: an enum value in :class:`pyspark.sql.functions.PandasUDFType`.
Default: SCALAR.
.. seealso:: :meth:`pyspark.sql.DataFrame.mapInPandas`
.. seealso:: :meth:`pyspark.sql.GroupedData.applyInPandas`
.. seealso:: :meth:`pyspark.sql.PandasCogroupedOps.applyInPandas`
The function type of the UDF can be one of the following:
1. SCALAR
@ -337,97 +341,6 @@ def pandas_udf(f=None, returnType=None, functionType=None):
.. seealso:: :meth:`pyspark.sql.GroupedData.agg` and :class:`pyspark.sql.Window`
5. MAP_ITER
A map iterator Pandas UDFs are used to transform data with an iterator of batches.
It can be used with :meth:`pyspark.sql.DataFrame.mapInPandas`.
It can return the output of arbitrary length in contrast to the scalar Pandas UDF.
It maps an iterator of batches in the current :class:`DataFrame` using a Pandas user-defined
function and returns the result as a :class:`DataFrame`.
The user-defined function should take an iterator of `pandas.DataFrame`\\s and return another
iterator of `pandas.DataFrame`\\s. All columns are passed together as an
iterator of `pandas.DataFrame`\\s to the user-defined function and the returned iterator of
`pandas.DataFrame`\\s are combined as a :class:`DataFrame`.
>>> df = spark.createDataFrame([(1, 21), (2, 30)],
... ("id", "age")) # doctest: +SKIP
>>> @pandas_udf(df.schema, PandasUDFType.MAP_ITER) # doctest: +SKIP
... def filter_func(batch_iter):
... for pdf in batch_iter:
... yield pdf[pdf.id == 1]
>>> df.mapInPandas(filter_func).show() # doctest: +SKIP
+---+---+
| id|age|
+---+---+
| 1| 21|
+---+---+
6. COGROUPED_MAP
A cogrouped map UDF defines transformation: (`pandas.DataFrame`, `pandas.DataFrame`) ->
`pandas.DataFrame`. The `returnType` should be a :class:`StructType` describing the schema
of the returned `pandas.DataFrame`. The column labels of the returned `pandas.DataFrame`
must either match the field names in the defined `returnType` schema if specified as strings,
or match the field data types by position if not strings, e.g. integer indices. The length
of the returned `pandas.DataFrame` can be arbitrary.
CoGrouped map UDFs are used with :meth:`pyspark.sql.CoGroupedData.apply`.
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df1 = spark.createDataFrame(
... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
... ("time", "id", "v1"))
>>> df2 = spark.createDataFrame(
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
>>> @pandas_udf("time int, id int, v1 double, v2 string",
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
... def asof_join(l, r):
... return pd.merge_asof(l, r, on="time", by="id")
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
+---------+---+---+---+
| time| id| v1| v2|
+---------+---+---+---+
| 20000101| 1|1.0| x|
| 20000102| 1|3.0| x|
| 20000101| 2|2.0| y|
| 20000102| 2|4.0| y|
+---------+---+---+---+
Alternatively, the user can define a function that takes three arguments. In this case,
the grouping key(s) will be passed as the first argument and the data will be passed as the
second and third arguments. The grouping key(s) will be passed as a tuple of numpy data
types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
`pandas.DataFrame` containing all columns from the original Spark DataFrames.
>>> @pandas_udf("time int, id int, v1 double, v2 string",
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
... def asof_join(k, l, r):
... if k == (1,):
... return pd.merge_asof(l, r, on="time", by="id")
... else:
... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
+---------+---+---+---+
| time| id| v1| v2|
+---------+---+---+---+
| 20000101| 1|1.0| x|
| 20000102| 1|3.0| x|
+---------+---+---+---+
.. note:: The user-defined functions are considered deterministic by default. Due to
optimization, duplicate invocations may be eliminated or the function may even be invoked
more times than it is present in the query. If your function is not deterministic, call
`asNondeterministic` on the user defined function. E.g.:
>>> @pandas_udf('double', PandasUDFType.SCALAR) # doctest: +SKIP
... def random(v):
... import numpy as np
... import pandas as pd
... return pd.Series(np.random.randn(len(v))
>>> random = random.asNondeterministic() # doctest: +SKIP
.. note:: The user-defined functions do not support conditional expressions or short circuiting
in boolean expressions and it ends up with being executed all internally. If the functions
can fail on special rows, the workaround is to incorporate the condition into the functions.
@ -472,6 +385,8 @@ def pandas_udf(f=None, returnType=None, functionType=None):
# Note: Python 3.7.3, Pandas 0.24.2 and PyArrow 0.13.0 are used.
# Note: Timezone is KST.
# Note: 'X' means it throws an exception during the conversion.
require_minimum_pandas_version()
require_minimum_pyarrow_version()
# decorator @pandas_udf(returnType, functionType)
is_decorator = f is None or isinstance(f, (str, DataType))
@ -490,31 +405,81 @@ def pandas_udf(f=None, returnType=None, functionType=None):
eval_type = returnType
else:
# @pandas_udf(dataType) or @pandas_udf(returnType=dataType)
eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
eval_type = None
else:
return_type = returnType
if functionType is not None:
eval_type = functionType
else:
eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
eval_type = None
if return_type is None:
raise ValueError("Invalid returnType: returnType can not be None")
raise ValueError("Invalid return type: returnType can not be None")
if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF]:
raise ValueError("Invalid functionType: "
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
None]: # None means it should infer the type from type hints.
raise ValueError("Invalid function type: "
"functionType must be one the values from PandasUDFType")
if is_decorator:
return functools.partial(_create_udf, returnType=return_type, evalType=eval_type)
return functools.partial(_create_pandas_udf, returnType=return_type, evalType=eval_type)
else:
return _create_udf(f=f, returnType=return_type, evalType=eval_type)
return _create_pandas_udf(f=f, returnType=return_type, evalType=eval_type)
def _create_pandas_udf(f, returnType, evalType):
argspec = _get_argspec(f)
# pandas UDF by type hints.
if sys.version_info >= (3, 6):
from inspect import signature
if evalType in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
warnings.warn(
"In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for "
"pandas UDF instead of specifying pandas UDF type which will be deprecated "
"in the future releases. See SPARK-28264 for more details.", UserWarning)
elif len(argspec.annotations) > 0:
evalType = infer_eval_type(signature(f))
assert evalType is not None
if evalType is None:
# Set default is scalar UDF.
evalType = PythonEvalType.SQL_SCALAR_PANDAS_UDF
if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF or
evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) and \
len(argspec.args) == 0 and \
argspec.varargs is None:
raise ValueError(
"Invalid function: 0-arg pandas_udfs are not supported. "
"Instead, create a 1-arg pandas_udf and ignore the arg in your function."
)
if evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF \
and len(argspec.args) not in (1, 2):
raise ValueError(
"Invalid function: pandas_udf with function type GROUPED_MAP or "
"the function in groupby.applyInPandas "
"must take either one argument (data) or two arguments (key, data).")
if evalType == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF \
and len(argspec.args) not in (2, 3):
raise ValueError(
"Invalid function: the function in cogroup.applyInPandas "
"must take either two arguments (left, right) "
"or three arguments (key, left, right).")
return _create_udf(f, returnType, evalType)
def _test():

View file

@ -15,6 +15,7 @@
# limitations under the License.
#
import sys
import warnings
from pyspark import since
from pyspark.rdd import PythonEvalType
@ -28,22 +29,15 @@ class PandasGroupedOpsMixin(object):
can use this class.
"""
@since(2.3)
def apply(self, udf):
"""
Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result
as a `DataFrame`.
It is an alias of :meth:`pyspark.sql.GroupedData.applyInPandas`; however, it takes a
:meth:`pyspark.sql.functions.pandas_udf` whereas
:meth:`pyspark.sql.GroupedData.applyInPandas` takes a Python native function.
The user-defined function should take a `pandas.DataFrame` and return another
`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame`
to the user-function and the returned `pandas.DataFrame` are combined as a
:class:`DataFrame`.
The returned `pandas.DataFrame` can be of arbitrary length and its schema must match the
returnType of the pandas udf.
.. note:: This function requires a full shuffle. All the data of a group will be loaded
into memory, so the user should be aware of the potential OOM risk if data is skewed
and certain groups are too large to fit in memory.
.. note:: It is preferred to use :meth:`pyspark.sql.GroupedData.applyInPandas` over this
API. This API will be deprecated in the future releases.
:param udf: a grouped map user-defined function returned by
:func:`pyspark.sql.functions.pandas_udf`.
@ -69,16 +63,74 @@ class PandasGroupedOpsMixin(object):
.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
"""
# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') \
or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
"GROUPED_MAP.")
warnings.warn(
"It is preferred to use 'applyInPandas' over this "
"API. This API will be deprecated in the future releases. See SPARK-28264 for "
"more details.", UserWarning)
return self.applyInPandas(udf.func, schema=udf.returnType)
@since(3.0)
def applyInPandas(self, func, schema):
"""
Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result
as a `DataFrame`.
The function should take a `pandas.DataFrame` and return another
`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame`
to the user-function and the returned `pandas.DataFrame` are combined as a
:class:`DataFrame`.
The returned `pandas.DataFrame` can be of arbitrary length and its schema must match the
returnType of the pandas udf.
.. note:: This function requires a full shuffle. All the data of a group will be loaded
into memory, so the user should be aware of the potential OOM risk if data is skewed
and certain groups are too large to fit in memory.
:param func: a Python native function that takes a `pandas.DataFrame`, and outputs a
`pandas.DataFrame`.
:param schema: the return type of the `func` in PySpark. The value can be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
.. note:: Experimental
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
>>> df.groupby("id").applyInPandas(normalize, schema="id long, v double").show()
... # doctest: +SKIP
+---+-------------------+
| id| v|
+---+-------------------+
| 1|-0.7071067811865475|
| 1| 0.7071067811865475|
| 2|-0.8320502943378437|
| 2|-0.2773500981126146|
| 2| 1.1094003924504583|
+---+-------------------+
.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
"""
from pyspark.sql import GroupedData
from pyspark.sql.functions import pandas_udf, PandasUDFType
assert isinstance(self, GroupedData)
# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') \
or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
"GROUPED_MAP.")
udf = pandas_udf(
func, returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
df = self._df
udf_column = udf(*[df[col] for col in df.columns])
jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
@ -114,12 +166,12 @@ class PandasCogroupedOps(object):
self.sql_ctx = gd1.sql_ctx
@since(3.0)
def apply(self, udf):
def applyInPandas(self, func, schema):
"""
Applies a function to each cogroup using a pandas udf and returns the result
Applies a function to each cogroup using pandas and returns the result
as a `DataFrame`.
The user-defined function should take two `pandas.DataFrame` and return another
The function should take two `pandas.DataFrame`\\s and return another
`pandas.DataFrame`. For each side of the cogroup, all columns are passed together as a
`pandas.DataFrame` to the user-function and the returned `pandas.DataFrame` are combined as
a :class:`DataFrame`.
@ -133,8 +185,11 @@ class PandasCogroupedOps(object):
.. note:: Experimental
:param udf: a cogrouped map user-defined function returned by
:func:`pyspark.sql.functions.pandas_udf`.
:param func: a Python native function that takes two `pandas.DataFrame`\\s, and
outputs a `pandas.DataFrame`, or that takes one tuple (grouping keys) and two
pandas ``DataFrame``s, and outputs a pandas ``DataFrame``.
:param schema: the return type of the `func` in PySpark. The value can be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df1 = spark.createDataFrame(
@ -143,11 +198,11 @@ class PandasCogroupedOps(object):
>>> df2 = spark.createDataFrame(
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
>>> @pandas_udf("time int, id int, v1 double, v2 string",
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
... def asof_join(l, r):
>>> def asof_join(l, r):
... return pd.merge_asof(l, r, on="time", by="id")
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
... asof_join, schema="time int, id int, v1 double, v2 string"
... ).show() # doctest: +SKIP
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
@ -163,14 +218,13 @@ class PandasCogroupedOps(object):
types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
`pandas.DataFrame` containing all columns from the original Spark DataFrames.
>>> @pandas_udf("time int, id int, v1 double, v2 string",
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
... def asof_join(k, l, r):
>>> def asof_join(k, l, r):
... if k == (1,):
... return pd.merge_asof(l, r, on="time", by="id")
... else:
... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
... asof_join, "time int, id int, v1 double, v2 string").show() # doctest: +SKIP
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
@ -181,11 +235,10 @@ class PandasCogroupedOps(object):
.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
"""
# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') \
or udf.evalType != PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
"COGROUPED_MAP.")
from pyspark.sql.pandas.functions import pandas_udf
udf = pandas_udf(
func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2)
udf_column = udf(*all_cols)
jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())

View file

@ -27,29 +27,30 @@ class PandasMapOpsMixin(object):
"""
@since(3.0)
def mapInPandas(self, udf):
def mapInPandas(self, func, schema):
"""
Maps an iterator of batches in the current :class:`DataFrame` using a Pandas user-defined
function and returns the result as a :class:`DataFrame`.
Maps an iterator of batches in the current :class:`DataFrame` using a Python native
function that takes and outputs a pandas DataFrame, and returns the result as a
:class:`DataFrame`.
The user-defined function should take an iterator of `pandas.DataFrame`\\s and return
The function should take an iterator of `pandas.DataFrame`\\s and return
another iterator of `pandas.DataFrame`\\s. All columns are passed
together as an iterator of `pandas.DataFrame`\\s to the user-defined function and the
together as an iterator of `pandas.DataFrame`\\s to the function and the
returned iterator of `pandas.DataFrame`\\s are combined as a :class:`DataFrame`.
Each `pandas.DataFrame` size can be controlled by
`spark.sql.execution.arrow.maxRecordsPerBatch`.
Its schema must match the returnType of the Pandas user-defined function.
:param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`
:param func: a Python native function that takes an iterator of `pandas.DataFrame`\\s, and
outputs an iterator of `pandas.DataFrame`\\s.
:param schema: the return type of the `func` in PySpark. The value can be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame([(1, 21), (2, 30)],
... ("id", "age")) # doctest: +SKIP
>>> @pandas_udf(df.schema, PandasUDFType.MAP_ITER) # doctest: +SKIP
... def filter_func(batch_iter):
>>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
>>> def filter_func(batch_iter):
... for pdf in batch_iter:
... yield pdf[pdf.id == 1]
>>> df.mapInPandas(filter_func).show() # doctest: +SKIP
>>> df.mapInPandas(filter_func, df.schema).show() # doctest: +SKIP
+---+---+
| id|age|
+---+---+
@ -58,17 +59,15 @@ class PandasMapOpsMixin(object):
.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
.. note:: Experimental
"""
from pyspark.sql import Column, DataFrame
from pyspark.sql import DataFrame
from pyspark.sql.pandas.functions import pandas_udf
assert isinstance(self, DataFrame)
# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') \
or udf.evalType != PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
"MAP_ITER.")
udf = pandas_udf(
func, returnType=schema, functionType=PythonEvalType.SQL_MAP_PANDAS_ITER_UDF)
udf_column = udf(*[self[col] for col in self.columns])
jdf = self._jdf.mapInPandas(udf_column._jc.expr())
return DataFrame(jdf, self.sql_ctx)

View file

@ -0,0 +1,141 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.sql.pandas.utils import require_minimum_pandas_version
def infer_eval_type(sig):
"""
Infers the evaluation type in :class:`pyspark.rdd.PythonEvalType` from
:class:`inspect.Signature` instance.
"""
from pyspark.sql.pandas.functions import PandasUDFType
require_minimum_pandas_version()
import pandas as pd
annotations = {}
for param in sig.parameters.values():
if param.annotation is not param.empty:
annotations[param.name] = param.annotation
# Check if all arguments have type hints
parameters_sig = [annotations[parameter] for parameter
in sig.parameters if parameter in annotations]
if len(parameters_sig) != len(sig.parameters):
raise ValueError(
"Type hints for all parameters should be specified; however, got %s" % sig)
# Check if the return has a type hint
return_annotation = sig.return_annotation
if sig.empty is return_annotation:
raise ValueError(
"Type hint for the return type should be specified; however, got %s" % sig)
# Series, Frame or Union[DataFrame, Series], ... -> Series or Frame
is_series_or_frame = (
all(a == pd.Series or # Series
a == pd.DataFrame or # DataFrame
check_union_annotation( # Union[DataFrame, Series]
a,
parameter_check_func=lambda na: na == pd.Series or na == pd.DataFrame)
for a in parameters_sig) and
(return_annotation == pd.Series or return_annotation == pd.DataFrame))
# Iterator[Tuple[Series, Frame or Union[DataFrame, Series], ...] -> Iterator[Series or Frame]
is_iterator_tuple_series_or_frame = (
len(parameters_sig) == 1 and
check_iterator_annotation( # Iterator
parameters_sig[0],
parameter_check_func=lambda a: check_tuple_annotation( # Tuple
a,
parameter_check_func=lambda ta: (
ta == Ellipsis or # ...
ta == pd.Series or # Series
ta == pd.DataFrame or # DataFrame
check_union_annotation( # Union[DataFrame, Series]
ta,
parameter_check_func=lambda na: (
na == pd.Series or na == pd.DataFrame))))) and
check_iterator_annotation(
return_annotation,
parameter_check_func=lambda a: a == pd.DataFrame or a == pd.Series))
# Iterator[Series, Frame or Union[DataFrame, Series]] -> Iterator[Series or Frame]
is_iterator_series_or_frame = (
len(parameters_sig) == 1 and
check_iterator_annotation(
parameters_sig[0],
parameter_check_func=lambda a: (
a == pd.Series or # Series
a == pd.DataFrame or # DataFrame
check_union_annotation( # Union[DataFrame, Series]
a,
parameter_check_func=lambda ua: ua == pd.Series or ua == pd.DataFrame))) and
check_iterator_annotation(
return_annotation,
parameter_check_func=lambda a: a == pd.DataFrame or a == pd.Series))
# Series, Frame or Union[DataFrame, Series], ... -> Any
is_series_or_frame_agg = (
all(a == pd.Series or # Series
a == pd.DataFrame or # DataFrame
check_union_annotation( # Union[DataFrame, Series]
a,
parameter_check_func=lambda ua: ua == pd.Series or ua == pd.DataFrame)
for a in parameters_sig) and (
# It's tricky to whitelist which types pd.Series constructor can take.
# Simply blacklist common types used here for now (which becomes object
# types Spark can't recognize).
return_annotation != pd.Series and
return_annotation != pd.DataFrame and
not check_iterator_annotation(return_annotation) and
not check_tuple_annotation(return_annotation)
))
if is_series_or_frame:
return PandasUDFType.SCALAR
elif is_iterator_tuple_series_or_frame or is_iterator_series_or_frame:
return PandasUDFType.SCALAR_ITER
elif is_series_or_frame_agg:
return PandasUDFType.GROUPED_AGG
else:
raise NotImplementedError("Unsupported signature: %s." % sig)
def check_tuple_annotation(annotation, parameter_check_func=None):
# Python 3.6 has `__name__`. Python 3.7 and 3.8 have `_name`.
# Check if the name is Tuple first. After that, check the generic types.
name = getattr(annotation, "_name", getattr(annotation, "__name__", None))
return name == "Tuple" and (
parameter_check_func is None or all(map(parameter_check_func, annotation.__args__)))
def check_iterator_annotation(annotation, parameter_check_func=None):
name = getattr(annotation, "_name", getattr(annotation, "__name__", None))
return name == "Iterator" and (
parameter_check_func is None or all(map(parameter_check_func, annotation.__args__)))
def check_union_annotation(annotation, parameter_check_func=None):
import typing
# Note that we cannot rely on '__origin__' in other type hints as it has changed from version
# to version. For example, it's abc.Iterator in Python 3.7 but typing.Iterator in Python 3.6.
origin = getattr(annotation, "__origin__", None)
return origin == typing.Union and (
parameter_check_func is None or all(map(parameter_check_func, annotation.__args__)))

View file

@ -40,7 +40,7 @@ _check_column_type = sys.version >= '3'
@unittest.skipIf(
not have_pandas or not have_pyarrow,
pandas_requirement_message or pyarrow_requirement_message)
class CoGroupedMapPandasUDFTests(ReusedSQLTestCase):
class CogroupedMapInPandasTests(ReusedSQLTestCase):
@property
def data1(self):
@ -94,13 +94,12 @@ class CoGroupedMapPandasUDFTests(ReusedSQLTestCase):
.createDataFrame(right) \
.groupby(col('id') % 2 == 0)
@pandas_udf('k long, v long, v2 long', PandasUDFType.COGROUPED_MAP)
def merge_pandas(l, r):
return pd.merge(l[['k', 'v']], r[['k', 'v2']], on=['k'])
result = left_gdf \
.cogroup(right_gdf) \
.apply(merge_pandas) \
.applyInPandas(merge_pandas, 'k long, v long, v2 long') \
.sort(['k']) \
.toPandas()
@ -116,12 +115,11 @@ class CoGroupedMapPandasUDFTests(ReusedSQLTestCase):
left = self.data1
right = self.data2
@pandas_udf('id long, k int, v int, v2 int', PandasUDFType.COGROUPED_MAP)
def merge_pandas(l, r):
return pd.merge(l, r, on=['id', 'k'])
result = left.groupby().cogroup(right.groupby())\
.apply(merge_pandas) \
.applyInPandas(merge_pandas, 'id long, k int, v int, v2 int') \
.sort(['id', 'k']) \
.toPandas()
@ -139,10 +137,9 @@ class CoGroupedMapPandasUDFTests(ReusedSQLTestCase):
df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \
.withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1']))
result = df.groupby().cogroup(df.groupby())\
.apply(pandas_udf(lambda x, y: pd.DataFrame([(x.sum().sum(), y.sum().sum())]),
'sum1 int, sum2 int',
PandasUDFType.COGROUPED_MAP)).collect()
result = df.groupby().cogroup(df.groupby()) \
.applyInPandas(lambda x, y: pd.DataFrame([(x.sum().sum(), y.sum().sum())]),
'sum1 int, sum2 int').collect()
self.assertEquals(result[0]['sum1'], 165)
self.assertEquals(result[0]['sum2'], 165)
@ -163,14 +160,13 @@ class CoGroupedMapPandasUDFTests(ReusedSQLTestCase):
def test_with_key_complex(self):
@pandas_udf('id long, k int, v int, key boolean', PandasUDFType.COGROUPED_MAP)
def left_assign_key(key, l, _):
return l.assign(key=key[0])
result = self.data1 \
.groupby(col('id') % 2 == 0)\
.cogroup(self.data2.groupby(col('id') % 2 == 0)) \
.apply(left_assign_key) \
.applyInPandas(left_assign_key, 'id long, k int, v int, key boolean') \
.sort(['id', 'k']) \
.toPandas()
@ -180,59 +176,33 @@ class CoGroupedMapPandasUDFTests(ReusedSQLTestCase):
assert_frame_equal(expected, result, check_column_type=_check_column_type)
def test_wrong_return_type(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*cogrouped map Pandas UDF.*MapType'):
pandas_udf(
lambda l, r: l,
'id long, v map<int, int>',
PandasUDFType.COGROUPED_MAP)
def test_wrong_args(self):
# Test that we get a sensible exception invalid values passed to apply
left = self.data1
right = self.data2
with QuietTest(self.sc):
# Function rather than a udf
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
left.groupby('id').cogroup(right.groupby('id')).apply(lambda l, r: l)
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid return type.*MapType'):
left.groupby('id').cogroup(right.groupby('id')).applyInPandas(
lambda l, r: l, 'id long, v map<int, int>')
# Udf missing return type
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
left.groupby('id').cogroup(right.groupby('id'))\
.apply(udf(lambda l, r: l, DoubleType()))
# Pass in expression rather than udf
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
left.groupby('id').cogroup(right.groupby('id')).apply(left.v + 1)
# Zero arg function
with self.assertRaisesRegexp(ValueError, 'Invalid function'):
left.groupby('id').cogroup(right.groupby('id'))\
.apply(pandas_udf(lambda: 1, StructType([StructField("d", DoubleType())])))
# Udf without PandasUDFType
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
left.groupby('id').cogroup(right.groupby('id'))\
.apply(pandas_udf(lambda x, y: x, DoubleType()))
# Udf with incorrect PandasUDFType
with self.assertRaisesRegexp(ValueError, 'Invalid udf.*COGROUPED_MAP'):
left.groupby('id').cogroup(right.groupby('id'))\
.apply(pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR))
def test_wrong_args(self):
left = self.data1
right = self.data2
with self.assertRaisesRegexp(ValueError, 'Invalid function'):
left.groupby('id').cogroup(right.groupby('id')) \
.applyInPandas(lambda: 1, StructType([StructField("d", DoubleType())]))
@staticmethod
def _test_with_key(left, right, isLeft):
@pandas_udf('id long, k int, v int, key long', PandasUDFType.COGROUPED_MAP)
def right_assign_key(key, l, r):
return l.assign(key=key[0]) if isLeft else r.assign(key=key[0])
result = left \
.groupby('id') \
.cogroup(right.groupby('id')) \
.apply(right_assign_key) \
.applyInPandas(right_assign_key, 'id long, k int, v int, key long') \
.toPandas()
expected = left.toPandas() if isLeft else right.toPandas()
@ -243,14 +213,13 @@ class CoGroupedMapPandasUDFTests(ReusedSQLTestCase):
@staticmethod
def _test_merge(left, right, output_schema='id long, k int, v int, v2 int'):
@pandas_udf(output_schema, PandasUDFType.COGROUPED_MAP)
def merge_pandas(l, r):
return pd.merge(l, r, on=['id', 'k'])
result = left \
.groupby('id') \
.cogroup(right.groupby('id')) \
.apply(merge_pandas)\
.applyInPandas(merge_pandas, output_schema)\
.sort(['id', 'k']) \
.toPandas()
@ -265,7 +234,7 @@ class CoGroupedMapPandasUDFTests(ReusedSQLTestCase):
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf_cogrouped_map import *
from pyspark.sql.tests.test_pandas_cogrouped_map import *
try:
import xmlrunner

View file

@ -46,7 +46,7 @@ _check_column_type = sys.version >= '3'
@unittest.skipIf(
not have_pandas or not have_pyarrow,
pandas_requirement_message or pyarrow_requirement_message)
class GroupedMapPandasUDFTests(ReusedSQLTestCase):
class GroupedMapInPandasTests(ReusedSQLTestCase):
@property
def data(self):
@ -250,7 +250,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*grouped map Pandas UDF.*MapType'):
'Invalid return type.*grouped map Pandas UDF.*MapType'):
pandas_udf(
lambda pdf: pdf,
'id long, v map<int, int>',
@ -278,7 +278,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR))
def test_unsupported_types(self):
common_err_msg = 'Invalid returnType.*grouped map Pandas UDF.*'
common_err_msg = 'Invalid return type.*grouped map Pandas UDF.*'
unsupported_types = [
StructField('map', MapType(StringType(), IntegerType())),
StructField('arr_ts', ArrayType(TimestampType())),
@ -533,13 +533,12 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
df = self.spark.createDataFrame(data, ['id', 'group', 'ts', 'result'])
df = df.select(col('id'), col('group'), col('ts').cast('timestamp'), col('result'))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def f(pdf):
# Assign each result element the ids of the windowed group
pdf['result'] = [pdf['id']] * len(pdf)
return pdf
result = df.groupby('group', window('ts', '5 days')).apply(f)\
result = df.groupby('group', window('ts', '5 days')).applyInPandas(f, df.schema)\
.select('id', 'result').collect()
for r in result:
self.assertListEqual(expected[r[0]], r[1])
@ -590,7 +589,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf_grouped_map import *
from pyspark.sql.tests.test_pandas_grouped_map import *
try:
import xmlrunner

View file

@ -33,7 +33,7 @@ if have_pandas:
@unittest.skipIf(
not have_pandas or not have_pyarrow,
pandas_requirement_message or pyarrow_requirement_message)
class ScalarPandasIterUDFTests(ReusedSQLTestCase):
class MapInPandasTests(ReusedSQLTestCase):
@classmethod
def setUpClass(cls):
@ -57,7 +57,6 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
ReusedSQLTestCase.tearDownClass()
def test_map_partitions_in_pandas(self):
@pandas_udf('id long', PandasUDFType.MAP_ITER)
def func(iterator):
for pdf in iterator:
assert isinstance(pdf, pd.DataFrame)
@ -65,7 +64,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
yield pdf
df = self.spark.range(10)
actual = df.mapInPandas(func).collect()
actual = df.mapInPandas(func, 'id long').collect()
expected = df.collect()
self.assertEquals(actual, expected)
@ -73,45 +72,40 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
data = [(1, "foo"), (2, None), (3, "bar"), (4, "bar")]
df = self.spark.createDataFrame(data, "a int, b string")
@pandas_udf(df.schema, PandasUDFType.MAP_ITER)
def func(iterator):
for pdf in iterator:
assert isinstance(pdf, pd.DataFrame)
assert [d.name for d in list(pdf.dtypes)] == ['int32', 'object']
yield pdf
actual = df.mapInPandas(func).collect()
actual = df.mapInPandas(func, df.schema).collect()
expected = df.collect()
self.assertEquals(actual, expected)
def test_different_output_length(self):
@pandas_udf('a long', PandasUDFType.MAP_ITER)
def func(iterator):
for _ in iterator:
yield pd.DataFrame({'a': list(range(100))})
df = self.spark.range(10)
actual = df.repartition(1).mapInPandas(func).collect()
actual = df.repartition(1).mapInPandas(func, 'a long').collect()
self.assertEquals(set((r.a for r in actual)), set(range(100)))
def test_empty_iterator(self):
@pandas_udf('a int, b string', PandasUDFType.MAP_ITER)
def empty_iter(_):
return iter([])
self.assertEqual(
self.spark.range(10).mapInPandas(empty_iter).count(), 0)
self.spark.range(10).mapInPandas(empty_iter, 'a int, b string').count(), 0)
def test_empty_rows(self):
@pandas_udf('a int', PandasUDFType.MAP_ITER)
def empty_rows(_):
return iter([pd.DataFrame({'a': []})])
self.assertEqual(
self.spark.range(10).mapInPandas(empty_rows).count(), 0)
self.spark.range(10).mapInPandas(empty_rows, 'a int').count(), 0)
def test_chain_map_partitions_in_pandas(self):
@pandas_udf('id long', PandasUDFType.MAP_ITER)
def func(iterator):
for pdf in iterator:
assert isinstance(pdf, pd.DataFrame)
@ -119,13 +113,13 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
yield pdf
df = self.spark.range(10)
actual = df.mapInPandas(func).mapInPandas(func).collect()
actual = df.mapInPandas(func, 'id long').mapInPandas(func, 'id long').collect()
expected = df.collect()
self.assertEquals(actual, expected)
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf_iter import *
from pyspark.sql.tests.test_pandas_map import *
try:
import xmlrunner

View file

@ -116,11 +116,11 @@ class PandasUDFTests(ReusedSQLTestCase):
@pandas_udf('blah')
def foo(x):
return x
with self.assertRaisesRegexp(ValueError, 'Invalid returnType.*None'):
with self.assertRaisesRegexp(ValueError, 'Invalid return type.*None'):
@pandas_udf(functionType=PandasUDFType.SCALAR)
def foo(x):
return x
with self.assertRaisesRegexp(ValueError, 'Invalid functionType'):
with self.assertRaisesRegexp(ValueError, 'Invalid function'):
@pandas_udf('double', 100)
def foo(x):
return x
@ -132,11 +132,11 @@ class PandasUDFTests(ReusedSQLTestCase):
def zero_with_type():
return 1
with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
with self.assertRaisesRegexp(TypeError, 'Invalid return type'):
@pandas_udf(returnType=PandasUDFType.GROUPED_MAP)
def foo(df):
return df
with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
with self.assertRaisesRegexp(TypeError, 'Invalid return type'):
@pandas_udf(returnType='double', functionType=PandasUDFType.GROUPED_MAP)
def foo(df):
return df

View file

@ -381,7 +381,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Invalid returnType with scalar Pandas UDFs'):
'Invalid return type with scalar Pandas UDFs'):
pandas_udf(lambda x: x, returnType=nested_type, functionType=udf_type)
def test_vectorized_udf_complex(self):
@ -509,7 +509,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*scalar Pandas UDF.*MapType'):
'Invalid return type.*scalar Pandas UDF.*MapType'):
pandas_udf(lambda x: x, MapType(LongType(), LongType()), udf_type)
def test_vectorized_udf_return_scalar(self):
@ -582,11 +582,11 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*scalar Pandas UDF.*MapType'):
'Invalid return type.*scalar Pandas UDF.*MapType'):
pandas_udf(lambda x: x, MapType(StringType(), IntegerType()), udf_type)
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*scalar Pandas UDF.*ArrayType.StructType'):
'Invalid return type.*scalar Pandas UDF.*ArrayType.StructType'):
pandas_udf(lambda x: x,
ArrayType(StructType([StructField('a', IntegerType())])), udf_type)

View file

@ -0,0 +1,273 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
import unittest
import inspect
from pyspark.sql.functions import mean, lit
from pyspark.testing.sqlutils import ReusedSQLTestCase, \
have_pandas, have_pyarrow, pandas_requirement_message, \
pyarrow_requirement_message
from pyspark.sql.pandas.typehints import infer_eval_type
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
if have_pandas:
import pandas as pd
from pandas.util.testing import assert_frame_equal
python_requirement_message = "pandas UDF with type hints are supported with Python 3.6+."
@unittest.skipIf(
not have_pandas or not have_pyarrow or sys.version_info[:2] < (3, 6),
pandas_requirement_message or pyarrow_requirement_message or python_requirement_message)
class PandasUDFTypeHintsTests(ReusedSQLTestCase):
# Note that, we should remove `exec` once we drop Python 2 in this class.
def setUp(self):
self.local = {'pd': pd}
def test_type_annotation_scalar(self):
exec(
"def func(col: pd.Series) -> pd.Series: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR)
exec(
"def func(col: pd.DataFrame, col1: pd.Series) -> pd.DataFrame: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR)
exec(
"def func(col: pd.DataFrame, *args: pd.Series) -> pd.Series: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR)
exec(
"def func(col: pd.Series, *args: pd.Series, **kwargs: pd.DataFrame) -> pd.Series:\n"
" pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR)
exec(
"def func(col: pd.Series, *, col2: pd.DataFrame) -> pd.DataFrame:\n"
" pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR)
exec(
"from typing import Union\n"
"def func(col: Union[pd.Series, pd.DataFrame], *, col2: pd.DataFrame) -> pd.Series:\n"
" pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR)
def test_type_annotation_scalar_iter(self):
exec(
"from typing import Iterator\n"
"def func(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR_ITER)
exec(
"from typing import Iterator, Tuple\n"
"def func(iter: Iterator[Tuple[pd.DataFrame, pd.Series]]) -> Iterator[pd.DataFrame]:\n"
" pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR_ITER)
exec(
"from typing import Iterator, Tuple\n"
"def func(iter: Iterator[Tuple[pd.DataFrame, ...]]) -> Iterator[pd.Series]: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR_ITER)
exec(
"from typing import Iterator, Tuple, Union\n"
"def func(iter: Iterator[Tuple[Union[pd.DataFrame, pd.Series], ...]])"
" -> Iterator[pd.Series]: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.SCALAR_ITER)
def test_type_annotation_group_agg(self):
exec(
"def func(col: pd.Series) -> str: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.GROUPED_AGG)
exec(
"def func(col: pd.DataFrame, col1: pd.Series) -> int: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.GROUPED_AGG)
exec(
"from pyspark.sql import Row\n"
"def func(col: pd.DataFrame, *args: pd.Series) -> Row: pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.GROUPED_AGG)
exec(
"def func(col: pd.Series, *args: pd.Series, **kwargs: pd.DataFrame) -> str:\n"
" pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.GROUPED_AGG)
exec(
"def func(col: pd.Series, *, col2: pd.DataFrame) -> float:\n"
" pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.GROUPED_AGG)
exec(
"from typing import Union\n"
"def func(col: Union[pd.Series, pd.DataFrame], *, col2: pd.DataFrame) -> float:\n"
" pass",
self.local)
self.assertEqual(
infer_eval_type(inspect.signature(self.local['func'])), PandasUDFType.GROUPED_AGG)
def test_type_annotation_negative(self):
exec(
"def func(col: str) -> pd.Series: pass",
self.local)
self.assertRaisesRegex(
NotImplementedError,
"Unsupported signature.*str",
infer_eval_type, inspect.signature(self.local['func']))
exec(
"def func(col: pd.DataFrame, col1: int) -> pd.DataFrame: pass",
self.local)
self.assertRaisesRegex(
NotImplementedError,
"Unsupported signature.*int",
infer_eval_type, inspect.signature(self.local['func']))
exec(
"from typing import Union\n"
"def func(col: Union[pd.DataFrame, str], col1: int) -> pd.DataFrame: pass",
self.local)
self.assertRaisesRegex(
NotImplementedError,
"Unsupported signature.*str",
infer_eval_type, inspect.signature(self.local['func']))
exec(
"from typing import Tuple\n"
"def func(col: pd.Series) -> Tuple[pd.DataFrame]: pass",
self.local)
self.assertRaisesRegex(
NotImplementedError,
"Unsupported signature.*Tuple",
infer_eval_type, inspect.signature(self.local['func']))
exec(
"def func(col, *args: pd.Series) -> pd.Series: pass",
self.local)
self.assertRaisesRegex(
ValueError,
"should be specified.*Series",
infer_eval_type, inspect.signature(self.local['func']))
exec(
"def func(col: pd.Series, *args: pd.Series, **kwargs: pd.DataFrame):\n"
" pass",
self.local)
self.assertRaisesRegex(
ValueError,
"should be specified.*Series",
infer_eval_type, inspect.signature(self.local['func']))
exec(
"def func(col: pd.Series, *, col2) -> pd.DataFrame:\n"
" pass",
self.local)
self.assertRaisesRegex(
ValueError,
"should be specified.*Series",
infer_eval_type, inspect.signature(self.local['func']))
def test_scalar_udf_type_hint(self):
df = self.spark.range(10).selectExpr("id", "id as v")
exec(
"import typing\n"
"def plus_one(v: typing.Union[pd.Series, pd.DataFrame]) -> pd.Series:\n"
" return v + 1",
self.local)
plus_one = pandas_udf("long")(self.local["plus_one"])
actual = df.select(plus_one(df.v).alias("plus_one"))
expected = df.selectExpr("(v + 1) as plus_one")
assert_frame_equal(expected.toPandas(), actual.toPandas())
def test_scalar_iter_udf_type_hint(self):
df = self.spark.range(10).selectExpr("id", "id as v")
exec(
"import typing\n"
"def plus_one(itr: typing.Iterator[pd.Series]) -> typing.Iterator[pd.Series]:\n"
" for s in itr:\n"
" yield s + 1",
self.local)
plus_one = pandas_udf("long")(self.local["plus_one"])
actual = df.select(plus_one(df.v).alias("plus_one"))
expected = df.selectExpr("(v + 1) as plus_one")
assert_frame_equal(expected.toPandas(), actual.toPandas())
def test_group_agg_udf_type_hint(self):
df = self.spark.range(10).selectExpr("id", "id as v")
exec(
"import numpy as np\n"
"def weighted_mean(v: pd.Series, w: pd.Series) -> float:\n"
" return np.average(v, weights=w)",
self.local)
weighted_mean = pandas_udf("double")(self.local["weighted_mean"])
actual = df.groupby('id').agg(weighted_mean(df.v, lit(1.0))).sort('id')
expected = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v, 1.0)')).sort('id')
assert_frame_equal(expected.toPandas(), actual.toPandas())
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf_typehints import *
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)

View file

@ -96,7 +96,7 @@ class UDFTests(ReusedSQLTestCase):
def test_udf_registration_return_type_not_none(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(TypeError, "Invalid returnType"):
with self.assertRaisesRegexp(TypeError, "Invalid return type"):
self.spark.catalog.registerFunction(
"f", UserDefinedFunction(lambda x, y: len(x) + y, StringType()), StringType())

View file

@ -25,7 +25,6 @@ from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType, ignore_unicode_
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql.types import StringType, DataType, StructType, _parse_datatype_string
from pyspark.sql.pandas.types import to_arrow_type
from pyspark.util import _get_argspec
__all__ = ["UDFRegistration"]
@ -38,41 +37,6 @@ def _wrap_function(sc, func, returnType):
def _create_udf(f, returnType, evalType):
if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_MAP_PANDAS_ITER_UDF):
from pyspark.sql.pandas.utils import require_minimum_pyarrow_version
require_minimum_pyarrow_version()
argspec = _get_argspec(f)
if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF or
evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) and \
len(argspec.args) == 0 and \
argspec.varargs is None:
raise ValueError(
"Invalid function: 0-arg pandas_udfs are not supported. "
"Instead, create a 1-arg pandas_udf and ignore the arg in your function."
)
if evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF \
and len(argspec.args) not in (1, 2):
raise ValueError(
"Invalid function: pandas_udfs with function type GROUPED_MAP "
"must take either one argument (data) or two arguments (key, data).")
if evalType == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF \
and len(argspec.args) not in (2, 3):
raise ValueError(
"Invalid function: pandas_udfs with function type COGROUPED_MAP "
"must take either two arguments (left, right) "
"or three arguments (key, left, right).")
# Set the name of the UserDefinedFunction object to be the name of function f
udf_obj = UserDefinedFunction(
f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
@ -101,12 +65,12 @@ class UserDefinedFunction(object):
if not isinstance(returnType, (DataType, str)):
raise TypeError(
"Invalid returnType: returnType should be DataType or str "
"Invalid return type: returnType should be DataType or str "
"but is {}".format(returnType))
if not isinstance(evalType, int):
raise TypeError(
"Invalid evalType: evalType should be an int but is {}".format(evalType))
"Invalid evaluation type: evalType should be an int but is {}".format(evalType))
self.func = func
self._returnType = returnType
@ -135,7 +99,7 @@ class UserDefinedFunction(object):
to_arrow_type(self._returnType_placeholder)
except TypeError:
raise NotImplementedError(
"Invalid returnType with scalar Pandas UDFs: %s is "
"Invalid return type with scalar Pandas UDFs: %s is "
"not supported" % str(self._returnType_placeholder))
elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
if isinstance(self._returnType_placeholder, StructType):
@ -143,33 +107,35 @@ class UserDefinedFunction(object):
to_arrow_type(self._returnType_placeholder)
except TypeError:
raise NotImplementedError(
"Invalid returnType with grouped map Pandas UDFs: "
"%s is not supported" % str(self._returnType_placeholder))
"Invalid return type with grouped map Pandas UDFs or "
"at groupby.applyInPandas: %s is not supported" % str(
self._returnType_placeholder))
else:
raise TypeError("Invalid returnType for grouped map Pandas "
"UDFs: returnType must be a StructType.")
raise TypeError("Invalid return type for grouped map Pandas "
"UDFs or at groupby.applyInPandas: return type must be a "
"StructType.")
elif self.evalType == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
if isinstance(self._returnType_placeholder, StructType):
try:
to_arrow_type(self._returnType_placeholder)
except TypeError:
raise NotImplementedError(
"Invalid returnType with map iterator Pandas UDFs: "
"Invalid return type in mapInPandas: "
"%s is not supported" % str(self._returnType_placeholder))
else:
raise TypeError("Invalid returnType for map iterator Pandas "
"UDFs: returnType must be a StructType.")
raise TypeError("Invalid return type in mapInPandas: "
"return type must be a StructType.")
elif self.evalType == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
if isinstance(self._returnType_placeholder, StructType):
try:
to_arrow_type(self._returnType_placeholder)
except TypeError:
raise NotImplementedError(
"Invalid returnType with cogrouped map Pandas UDFs: "
"Invalid return type in cogroup.applyInPandas: "
"%s is not supported" % str(self._returnType_placeholder))
else:
raise TypeError("Invalid returnType for cogrouped map Pandas "
"UDFs: returnType must be a StructType.")
raise TypeError("Invalid return type in cogroup.applyInPandas: "
"return type must be a StructType.")
elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
try:
# StructType is not yet allowed as a return type, explicitly check here to fail fast
@ -178,7 +144,7 @@ class UserDefinedFunction(object):
to_arrow_type(self._returnType_placeholder)
except TypeError:
raise NotImplementedError(
"Invalid returnType with grouped aggregate Pandas UDFs: "
"Invalid return type with grouped aggregate Pandas UDFs: "
"%s is not supported" % str(self._returnType_placeholder))
return self._returnType_placeholder
@ -358,7 +324,7 @@ class UDFRegistration(object):
if hasattr(f, 'asNondeterministic'):
if returnType is not None:
raise TypeError(
"Invalid returnType: data type can not be specified when f is"
"Invalid return type: data type can not be specified when f is"
"a user-defined function, but got %s." % returnType)
if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_UDF,