3959f0d987
### What changes were proposed in this pull request? This PR proposes to migrate to [NumPy documentation style](https://numpydoc.readthedocs.io/en/latest/format.html), see also SPARK-33243. While I am migrating, I also fixed some Python type hints accordingly. ### Why are the changes needed? For better documentation as text itself, and generated HTMLs ### Does this PR introduce _any_ user-facing change? Yes, they will see a better format of HTMLs, and better text format. See SPARK-33243. ### How was this patch tested? Manually tested via running `./dev/lint-python`. Closes #30181 from HyukjinKwon/SPARK-33250. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
361 lines
14 KiB
Python
361 lines
14 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
import sys
|
|
import warnings
|
|
|
|
from pyspark.rdd import PythonEvalType
|
|
from pyspark.sql.column import Column
|
|
from pyspark.sql.dataframe import DataFrame
|
|
|
|
|
|
class PandasGroupedOpsMixin(object):
|
|
"""
|
|
Min-in for pandas grouped operations. Currently, only :class:`GroupedData`
|
|
can use this class.
|
|
"""
|
|
|
|
def apply(self, udf):
|
|
"""
|
|
It is an alias of :meth:`pyspark.sql.GroupedData.applyInPandas`; however, it takes a
|
|
:meth:`pyspark.sql.functions.pandas_udf` whereas
|
|
:meth:`pyspark.sql.GroupedData.applyInPandas` takes a Python native function.
|
|
|
|
.. versionadded:: 2.3.0
|
|
|
|
Parameters
|
|
----------
|
|
udf : :func:`pyspark.sql.functions.pandas_udf`
|
|
a grouped map user-defined function returned by
|
|
:func:`pyspark.sql.functions.pandas_udf`.
|
|
|
|
Notes
|
|
-----
|
|
It is preferred to use :meth:`pyspark.sql.GroupedData.applyInPandas` over this
|
|
API. This API will be deprecated in the future releases.
|
|
|
|
Examples
|
|
--------
|
|
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
|
|
>>> df = spark.createDataFrame(
|
|
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
|
|
... ("id", "v"))
|
|
>>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP
|
|
... def normalize(pdf):
|
|
... v = pdf.v
|
|
... return pdf.assign(v=(v - v.mean()) / v.std())
|
|
>>> df.groupby("id").apply(normalize).show() # doctest: +SKIP
|
|
+---+-------------------+
|
|
| id| v|
|
|
+---+-------------------+
|
|
| 1|-0.7071067811865475|
|
|
| 1| 0.7071067811865475|
|
|
| 2|-0.8320502943378437|
|
|
| 2|-0.2773500981126146|
|
|
| 2| 1.1094003924504583|
|
|
+---+-------------------+
|
|
|
|
See Also
|
|
--------
|
|
pyspark.sql.functions.pandas_udf
|
|
"""
|
|
# Columns are special because hasattr always return True
|
|
if isinstance(udf, Column) or not hasattr(udf, 'func') \
|
|
or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
|
|
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
|
|
"GROUPED_MAP.")
|
|
|
|
warnings.warn(
|
|
"It is preferred to use 'applyInPandas' over this "
|
|
"API. This API will be deprecated in the future releases. See SPARK-28264 for "
|
|
"more details.", UserWarning)
|
|
|
|
return self.applyInPandas(udf.func, schema=udf.returnType)
|
|
|
|
def applyInPandas(self, func, schema):
|
|
"""
|
|
Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result
|
|
as a `DataFrame`.
|
|
|
|
The function should take a `pandas.DataFrame` and return another
|
|
`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame`
|
|
to the user-function and the returned `pandas.DataFrame` are combined as a
|
|
:class:`DataFrame`.
|
|
|
|
The `schema` should be a :class:`StructType` describing the schema of the returned
|
|
`pandas.DataFrame`. The column labels of the returned `pandas.DataFrame` must either match
|
|
the field names in the defined schema if specified as strings, or match the
|
|
field data types by position if not strings, e.g. integer indices.
|
|
The length of the returned `pandas.DataFrame` can be arbitrary.
|
|
|
|
.. versionadded:: 3.0.0
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
a Python native function that takes a `pandas.DataFrame`, and outputs a
|
|
`pandas.DataFrame`.
|
|
schema : :class:`pyspark.sql.types.DataType` or str
|
|
the return type of the `func` in PySpark. The value can be either a
|
|
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
|
|
|
|
Examples
|
|
--------
|
|
>>> import pandas as pd # doctest: +SKIP
|
|
>>> from pyspark.sql.functions import pandas_udf, ceil
|
|
>>> df = spark.createDataFrame(
|
|
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
|
|
... ("id", "v")) # doctest: +SKIP
|
|
>>> def normalize(pdf):
|
|
... v = pdf.v
|
|
... return pdf.assign(v=(v - v.mean()) / v.std())
|
|
>>> df.groupby("id").applyInPandas(
|
|
... normalize, schema="id long, v double").show() # doctest: +SKIP
|
|
+---+-------------------+
|
|
| id| v|
|
|
+---+-------------------+
|
|
| 1|-0.7071067811865475|
|
|
| 1| 0.7071067811865475|
|
|
| 2|-0.8320502943378437|
|
|
| 2|-0.2773500981126146|
|
|
| 2| 1.1094003924504583|
|
|
+---+-------------------+
|
|
|
|
Alternatively, the user can pass a function that takes two arguments.
|
|
In this case, the grouping key(s) will be passed as the first argument and the data will
|
|
be passed as the second argument. The grouping key(s) will be passed as a tuple of numpy
|
|
data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
|
|
as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
|
|
This is useful when the user does not want to hardcode grouping key(s) in the function.
|
|
|
|
>>> df = spark.createDataFrame(
|
|
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
|
|
... ("id", "v")) # doctest: +SKIP
|
|
>>> def mean_func(key, pdf):
|
|
... # key is a tuple of one numpy.int64, which is the value
|
|
... # of 'id' for the current group
|
|
... return pd.DataFrame([key + (pdf.v.mean(),)])
|
|
>>> df.groupby('id').applyInPandas(
|
|
... mean_func, schema="id long, v double").show() # doctest: +SKIP
|
|
+---+---+
|
|
| id| v|
|
|
+---+---+
|
|
| 1|1.5|
|
|
| 2|6.0|
|
|
+---+---+
|
|
|
|
>>> def sum_func(key, pdf):
|
|
... # key is a tuple of two numpy.int64s, which is the values
|
|
... # of 'id' and 'ceil(df.v / 2)' for the current group
|
|
... return pd.DataFrame([key + (pdf.v.sum(),)])
|
|
>>> df.groupby(df.id, ceil(df.v / 2)).applyInPandas(
|
|
... sum_func, schema="id long, `ceil(v / 2)` long, v double").show() # doctest: +SKIP
|
|
+---+-----------+----+
|
|
| id|ceil(v / 2)| v|
|
|
+---+-----------+----+
|
|
| 2| 5|10.0|
|
|
| 1| 1| 3.0|
|
|
| 2| 3| 5.0|
|
|
| 2| 2| 3.0|
|
|
+---+-----------+----+
|
|
|
|
Notes
|
|
-----
|
|
This function requires a full shuffle. All the data of a group will be loaded
|
|
into memory, so the user should be aware of the potential OOM risk if data is skewed
|
|
and certain groups are too large to fit in memory.
|
|
|
|
If returning a new `pandas.DataFrame` constructed with a dictionary, it is
|
|
recommended to explicitly index the columns by name to ensure the positions are correct,
|
|
or alternatively use an `OrderedDict`.
|
|
For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])` or
|
|
`pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
|
|
|
|
This API is experimental.
|
|
|
|
See Also
|
|
--------
|
|
pyspark.sql.functions.pandas_udf
|
|
"""
|
|
from pyspark.sql import GroupedData
|
|
from pyspark.sql.functions import pandas_udf, PandasUDFType
|
|
|
|
assert isinstance(self, GroupedData)
|
|
|
|
udf = pandas_udf(
|
|
func, returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
|
|
df = self._df
|
|
udf_column = udf(*[df[col] for col in df.columns])
|
|
jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
def cogroup(self, other):
|
|
"""
|
|
Cogroups this group with another group so that we can run cogrouped operations.
|
|
|
|
.. versionadded:: 3.0.0
|
|
|
|
See :class:`PandasCogroupedOps` for the operations that can be run.
|
|
"""
|
|
from pyspark.sql import GroupedData
|
|
|
|
assert isinstance(self, GroupedData)
|
|
|
|
return PandasCogroupedOps(self, other)
|
|
|
|
|
|
class PandasCogroupedOps(object):
|
|
"""
|
|
A logical grouping of two :class:`GroupedData`,
|
|
created by :func:`GroupedData.cogroup`.
|
|
|
|
.. versionadded:: 3.0.0
|
|
|
|
Notes
|
|
-----
|
|
This API is experimental.
|
|
"""
|
|
|
|
def __init__(self, gd1, gd2):
|
|
self._gd1 = gd1
|
|
self._gd2 = gd2
|
|
self.sql_ctx = gd1.sql_ctx
|
|
|
|
def applyInPandas(self, func, schema):
|
|
"""
|
|
Applies a function to each cogroup using pandas and returns the result
|
|
as a `DataFrame`.
|
|
|
|
The function should take two `pandas.DataFrame`\\s and return another
|
|
`pandas.DataFrame`. For each side of the cogroup, all columns are passed together as a
|
|
`pandas.DataFrame` to the user-function and the returned `pandas.DataFrame` are combined as
|
|
a :class:`DataFrame`.
|
|
|
|
The `schema` should be a :class:`StructType` describing the schema of the returned
|
|
`pandas.DataFrame`. The column labels of the returned `pandas.DataFrame` must either match
|
|
the field names in the defined schema if specified as strings, or match the
|
|
field data types by position if not strings, e.g. integer indices.
|
|
The length of the returned `pandas.DataFrame` can be arbitrary.
|
|
|
|
.. versionadded:: 3.0.0
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
a Python native function that takes two `pandas.DataFrame`\\s, and
|
|
outputs a `pandas.DataFrame`, or that takes one tuple (grouping keys) and two
|
|
pandas ``DataFrame``\\s, and outputs a pandas ``DataFrame``.
|
|
schema : :class:`pyspark.sql.types.DataType` or str
|
|
the return type of the `func` in PySpark. The value can be either a
|
|
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
|
|
|
|
Examples
|
|
--------
|
|
>>> from pyspark.sql.functions import pandas_udf
|
|
>>> df1 = spark.createDataFrame(
|
|
... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
|
|
... ("time", "id", "v1"))
|
|
>>> df2 = spark.createDataFrame(
|
|
... [(20000101, 1, "x"), (20000101, 2, "y")],
|
|
... ("time", "id", "v2"))
|
|
>>> def asof_join(l, r):
|
|
... return pd.merge_asof(l, r, on="time", by="id")
|
|
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
|
|
... asof_join, schema="time int, id int, v1 double, v2 string"
|
|
... ).show() # doctest: +SKIP
|
|
+--------+---+---+---+
|
|
| time| id| v1| v2|
|
|
+--------+---+---+---+
|
|
|20000101| 1|1.0| x|
|
|
|20000102| 1|3.0| x|
|
|
|20000101| 2|2.0| y|
|
|
|20000102| 2|4.0| y|
|
|
+--------+---+---+---+
|
|
|
|
Alternatively, the user can define a function that takes three arguments. In this case,
|
|
the grouping key(s) will be passed as the first argument and the data will be passed as the
|
|
second and third arguments. The grouping key(s) will be passed as a tuple of numpy data
|
|
types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
|
|
`pandas.DataFrame` containing all columns from the original Spark DataFrames.
|
|
|
|
>>> def asof_join(k, l, r):
|
|
... if k == (1,):
|
|
... return pd.merge_asof(l, r, on="time", by="id")
|
|
... else:
|
|
... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
|
|
>>> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
|
|
... asof_join, "time int, id int, v1 double, v2 string").show() # doctest: +SKIP
|
|
+--------+---+---+---+
|
|
| time| id| v1| v2|
|
|
+--------+---+---+---+
|
|
|20000101| 1|1.0| x|
|
|
|20000102| 1|3.0| x|
|
|
+--------+---+---+---+
|
|
|
|
Notes
|
|
-----
|
|
This function requires a full shuffle. All the data of a cogroup will be loaded
|
|
into memory, so the user should be aware of the potential OOM risk if data is skewed
|
|
and certain groups are too large to fit in memory.
|
|
|
|
If returning a new `pandas.DataFrame` constructed with a dictionary, it is
|
|
recommended to explicitly index the columns by name to ensure the positions are correct,
|
|
or alternatively use an `OrderedDict`.
|
|
For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])` or
|
|
`pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
|
|
|
|
This API is experimental.
|
|
|
|
See Also
|
|
--------
|
|
pyspark.sql.functions.pandas_udf
|
|
"""
|
|
from pyspark.sql.pandas.functions import pandas_udf
|
|
|
|
udf = pandas_udf(
|
|
func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
|
|
all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2)
|
|
udf_column = udf(*all_cols)
|
|
jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())
|
|
return DataFrame(jdf, self.sql_ctx)
|
|
|
|
@staticmethod
|
|
def _extract_cols(gd):
|
|
df = gd._df
|
|
return [df[col] for col in df.columns]
|
|
|
|
|
|
def _test():
|
|
import doctest
|
|
from pyspark.sql import SparkSession
|
|
import pyspark.sql.pandas.group_ops
|
|
globs = pyspark.sql.pandas.group_ops.__dict__.copy()
|
|
spark = SparkSession.builder\
|
|
.master("local[4]")\
|
|
.appName("sql.pandas.group tests")\
|
|
.getOrCreate()
|
|
globs['spark'] = spark
|
|
(failure_count, test_count) = doctest.testmod(
|
|
pyspark.sql.pandas.group_ops, globs=globs,
|
|
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
|
|
spark.stop()
|
|
if failure_count:
|
|
sys.exit(-1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_test()
|