2018-11-14 01:51:11 -05:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
|
|
|
import datetime
|
|
|
|
import unittest
|
|
|
|
|
2018-12-13 21:45:24 -05:00
|
|
|
from collections import OrderedDict
|
|
|
|
from decimal import Decimal
|
|
|
|
|
2018-11-14 01:51:11 -05:00
|
|
|
from pyspark.sql import Row
|
2019-10-11 19:19:13 -04:00
|
|
|
from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType, \
|
|
|
|
window
|
2020-08-30 22:23:31 -04:00
|
|
|
from pyspark.sql.types import IntegerType, DoubleType, ArrayType, BinaryType, ByteType, \
|
|
|
|
LongType, DecimalType, ShortType, FloatType, StringType, BooleanType, StructType, \
|
|
|
|
StructField, NullType, MapType, TimestampType
|
2018-11-14 01:51:11 -05:00
|
|
|
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
|
|
|
|
pandas_requirement_message, pyarrow_requirement_message
|
2018-11-14 23:30:52 -05:00
|
|
|
from pyspark.testing.utils import QuietTest
|
2018-11-14 01:51:11 -05:00
|
|
|
|
2019-04-09 18:50:25 -04:00
|
|
|
if have_pandas:
|
|
|
|
import pandas as pd
|
|
|
|
from pandas.util.testing import assert_frame_equal
|
|
|
|
|
|
|
|
if have_pyarrow:
|
2020-08-08 11:51:57 -04:00
|
|
|
import pyarrow as pa # noqa: F401
|
2019-04-09 18:50:25 -04:00
|
|
|
|
|
|
|
|
2018-11-14 01:51:11 -05:00
|
|
|
@unittest.skipIf(
|
|
|
|
not have_pandas or not have_pyarrow,
|
2020-09-24 01:15:36 -04:00
|
|
|
pandas_requirement_message or pyarrow_requirement_message) # type: ignore[arg-type]
|
[SPARK-28264][PYTHON][SQL] Support type hints in pandas UDF and rename/move inconsistent pandas UDF types
### What changes were proposed in this pull request?
This PR proposes to redesign pandas UDFs as described in [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).
```python
from pyspark.sql.functions import pandas_udf
import pandas as pd
pandas_udf("long")
def plug_one(s: pd.Series) -> pd.Series:
return s + 1
spark.range(10).select(plug_one("id")).show()
```
```
+------------+
|plug_one(id)|
+------------+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+------------+
```
Note that, this PR address one of the future improvements described [here](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit#heading=h.h3ncjpk6ujqu), "A couple of less-intuitive pandas UDF types" (by zero323) together.
In short,
- Adds new way with type hints as an alternative and experimental way.
```python
pandas_udf(schema='...')
def func(c1: Series, c2: Series) -> DataFrame:
pass
```
- Replace and/or add an alias for three types below from UDF, and make them as separate standalone APIs. So, `pandas_udf` is now consistent with regular `udf`s and other expressions.
`df.mapInPandas(udf)` -replace-> `df.mapInPandas(f, schema)`
`df.groupby.apply(udf)` -alias-> `df.groupby.applyInPandas(f, schema)`
`df.groupby.cogroup.apply(udf)` -replace-> `df.groupby.cogroup.applyInPandas(f, schema)`
*`df.groupby.apply` was added from 2.3 while the other were added in the master only.
- No deprecation for the existing ways for now.
```python
pandas_udf(schema='...', functionType=PandasUDFType.SCALAR)
def func(c1, c2):
pass
```
If users are happy with this, I plan to deprecate the existing way and declare using type hints is not experimental anymore.
One design goal in this PR was that, avoid touching the internal (since we didn't deprecate the old ways for now), but supports type hints with a minimised changes only at the interface.
- Once we deprecate or remove the old ways, I think it requires another refactoring for the internal in the future. At the very least, we should rename internal pandas evaluation types.
- If users find this experimental type hints isn't quite helpful, we should simply revert the changes at the interface level.
### Why are the changes needed?
In order to address old design issues. Please see [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).
### Does this PR introduce any user-facing change?
For behaviour changes, No.
It adds new ways to use pandas UDFs by using type hints. See below.
**SCALAR**:
```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> Series:
pass # DataFrame represents a struct column
```
**SCALAR_ITER**:
```python
pandas_udf(schema='...')
def func(iter: Iterator[Tuple[Series, DataFrame, ...]]) -> Iterator[Series]:
pass # Same as SCALAR but wrapped by Iterator
```
**GROUPED_AGG**:
```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> int:
pass # DataFrame represents a struct column
```
**GROUPED_MAP**:
This was added in Spark 2.3 as of SPARK-20396. As described above, it keeps the existing behaviour. Additionally, we now have a new alias `groupby.applyInPandas` for `groupby.apply`. See the example below:
```python
def func(pdf):
return pdf
df.groupby("...").applyInPandas(func, schema=df.schema)
```
**MAP_ITER**: this is not a pandas UDF anymore
This was added in Spark 3.0 as of SPARK-28198; and this PR replaces the usages. See the example below:
```python
def func(iter):
for df in iter:
yield df
df.mapInPandas(func, df.schema)
```
**COGROUPED_MAP**: this is not a pandas UDF anymore
This was added in Spark 3.0 as of SPARK-27463; and this PR replaces the usages. See the example below:
```python
def asof_join(left, right):
return pd.merge_asof(left, right, on="...", by="...")
df1.groupby("...").cogroup(df2.groupby("...")).applyInPandas(asof_join, schema="...")
```
### How was this patch tested?
Unittests added and tested against Python 2.7, 3.6 and 3.7.
Closes #27165 from HyukjinKwon/revisit-pandas.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-22 01:32:58 -05:00
|
|
|
class GroupedMapInPandasTests(ReusedSQLTestCase):
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
@property
|
|
|
|
def data(self):
|
|
|
|
return self.spark.range(10).toDF('id') \
|
|
|
|
.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
|
|
|
|
.withColumn("v", explode(col('vs'))).drop('vs')
|
|
|
|
|
|
|
|
def test_supported_types(self):
|
|
|
|
|
|
|
|
values = [
|
|
|
|
1, 2, 3,
|
|
|
|
4, 5, 1.1,
|
|
|
|
2.2, Decimal(1.123),
|
2019-04-22 06:30:31 -04:00
|
|
|
[1, 2, 2], True, 'hello',
|
|
|
|
bytearray([0x01, 0x02])
|
2018-11-14 01:51:11 -05:00
|
|
|
]
|
|
|
|
output_fields = [
|
|
|
|
('id', IntegerType()), ('byte', ByteType()), ('short', ShortType()),
|
|
|
|
('int', IntegerType()), ('long', LongType()), ('float', FloatType()),
|
|
|
|
('double', DoubleType()), ('decim', DecimalType(10, 3)),
|
2019-04-22 06:30:31 -04:00
|
|
|
('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType()),
|
|
|
|
('bin', BinaryType())
|
2018-11-14 01:51:11 -05:00
|
|
|
]
|
|
|
|
|
|
|
|
output_schema = StructType([StructField(*x) for x in output_fields])
|
|
|
|
df = self.spark.createDataFrame([values], schema=output_schema)
|
|
|
|
|
|
|
|
# Different forms of group map pandas UDF, results of these are the same
|
|
|
|
udf1 = pandas_udf(
|
|
|
|
lambda pdf: pdf.assign(
|
|
|
|
byte=pdf.byte * 2,
|
|
|
|
short=pdf.short * 2,
|
|
|
|
int=pdf.int * 2,
|
|
|
|
long=pdf.long * 2,
|
|
|
|
float=pdf.float * 2,
|
|
|
|
double=pdf.double * 2,
|
|
|
|
decim=pdf.decim * 2,
|
|
|
|
bool=False if pdf.bool else True,
|
|
|
|
str=pdf.str + 'there',
|
|
|
|
array=pdf.array,
|
2019-04-22 06:30:31 -04:00
|
|
|
bin=pdf.bin
|
2018-11-14 01:51:11 -05:00
|
|
|
),
|
|
|
|
output_schema,
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
udf2 = pandas_udf(
|
|
|
|
lambda _, pdf: pdf.assign(
|
|
|
|
byte=pdf.byte * 2,
|
|
|
|
short=pdf.short * 2,
|
|
|
|
int=pdf.int * 2,
|
|
|
|
long=pdf.long * 2,
|
|
|
|
float=pdf.float * 2,
|
|
|
|
double=pdf.double * 2,
|
|
|
|
decim=pdf.decim * 2,
|
|
|
|
bool=False if pdf.bool else True,
|
|
|
|
str=pdf.str + 'there',
|
|
|
|
array=pdf.array,
|
2019-04-22 06:30:31 -04:00
|
|
|
bin=pdf.bin
|
2018-11-14 01:51:11 -05:00
|
|
|
),
|
|
|
|
output_schema,
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
udf3 = pandas_udf(
|
|
|
|
lambda key, pdf: pdf.assign(
|
|
|
|
id=key[0],
|
|
|
|
byte=pdf.byte * 2,
|
|
|
|
short=pdf.short * 2,
|
|
|
|
int=pdf.int * 2,
|
|
|
|
long=pdf.long * 2,
|
|
|
|
float=pdf.float * 2,
|
|
|
|
double=pdf.double * 2,
|
|
|
|
decim=pdf.decim * 2,
|
|
|
|
bool=False if pdf.bool else True,
|
|
|
|
str=pdf.str + 'there',
|
|
|
|
array=pdf.array,
|
2019-04-22 06:30:31 -04:00
|
|
|
bin=pdf.bin
|
2018-11-14 01:51:11 -05:00
|
|
|
),
|
|
|
|
output_schema,
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
result1 = df.groupby('id').apply(udf1).sort('id').toPandas()
|
|
|
|
expected1 = df.toPandas().groupby('id').apply(udf1.func).reset_index(drop=True)
|
|
|
|
|
|
|
|
result2 = df.groupby('id').apply(udf2).sort('id').toPandas()
|
|
|
|
expected2 = expected1
|
|
|
|
|
|
|
|
result3 = df.groupby('id').apply(udf3).sort('id').toPandas()
|
|
|
|
expected3 = expected1
|
|
|
|
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected1, result1)
|
|
|
|
assert_frame_equal(expected2, result2)
|
|
|
|
assert_frame_equal(expected3, result3)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_array_type_correct(self):
|
|
|
|
df = self.data.withColumn("arr", array(col("id"))).repartition(1, "id")
|
|
|
|
|
|
|
|
output_schema = StructType(
|
|
|
|
[StructField('id', LongType()),
|
|
|
|
StructField('v', IntegerType()),
|
|
|
|
StructField('arr', ArrayType(LongType()))])
|
|
|
|
|
|
|
|
udf = pandas_udf(
|
|
|
|
lambda pdf: pdf,
|
|
|
|
output_schema,
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
result = df.groupby('id').apply(udf).sort('id').toPandas()
|
|
|
|
expected = df.toPandas().groupby('id').apply(udf.func).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_register_grouped_map_udf(self):
|
|
|
|
foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
|
|
|
|
with QuietTest(self.sc):
|
|
|
|
with self.assertRaisesRegexp(
|
|
|
|
ValueError,
|
|
|
|
'f.*SQL_BATCHED_UDF.*SQL_SCALAR_PANDAS_UDF.*SQL_GROUPED_AGG_PANDAS_UDF.*'):
|
|
|
|
self.spark.catalog.registerFunction("foo_udf", foo_udf)
|
|
|
|
|
|
|
|
def test_decorator(self):
|
|
|
|
df = self.data
|
|
|
|
|
|
|
|
@pandas_udf(
|
|
|
|
'id long, v int, v1 double, v2 long',
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
def foo(pdf):
|
|
|
|
return pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id)
|
|
|
|
|
|
|
|
result = df.groupby('id').apply(foo).sort('id').toPandas()
|
|
|
|
expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_coerce(self):
|
|
|
|
df = self.data
|
|
|
|
|
|
|
|
foo = pandas_udf(
|
|
|
|
lambda pdf: pdf,
|
|
|
|
'id long, v double',
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
result = df.groupby('id').apply(foo).sort('id').toPandas()
|
|
|
|
expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
|
|
|
|
expected = expected.assign(v=expected.v.astype('float64'))
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_complex_groupby(self):
|
|
|
|
df = self.data
|
|
|
|
|
|
|
|
@pandas_udf(
|
|
|
|
'id long, v int, norm double',
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
def normalize(pdf):
|
|
|
|
v = pdf.v
|
|
|
|
return pdf.assign(norm=(v - v.mean()) / v.std())
|
|
|
|
|
|
|
|
result = df.groupby(col('id') % 2 == 0).apply(normalize).sort('id', 'v').toPandas()
|
|
|
|
pdf = df.toPandas()
|
[SPARK-26566][PYTHON][SQL] Upgrade Apache Arrow to version 0.12.0
## What changes were proposed in this pull request?
Upgrade Apache Arrow to version 0.12.0. This includes the Java artifacts and fixes to enable usage with pyarrow 0.12.0
Version 0.12.0 includes the following selected fixes/improvements relevant to Spark users:
* Safe cast fails from numpy float64 array with nans to integer, ARROW-4258
* Java, Reduce heap usage for variable width vectors, ARROW-4147
* Binary identity cast not implemented, ARROW-4101
* pyarrow open_stream deprecated, use ipc.open_stream, ARROW-4098
* conversion to date object no longer needed, ARROW-3910
* Error reading IPC file with no record batches, ARROW-3894
* Signed to unsigned integer cast yields incorrect results when type sizes are the same, ARROW-3790
* from_pandas gives incorrect results when converting floating point to bool, ARROW-3428
* Import pyarrow fails if scikit-learn is installed from conda (boost-cpp / libboost issue), ARROW-3048
* Java update to official Flatbuffers version 1.9.0, ARROW-3175
complete list [here](https://issues.apache.org/jira/issues/?jql=project%20%3D%20ARROW%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.12.0)
PySpark requires the following fixes to work with PyArrow 0.12.0
* Encrypted pyspark worker fails due to ChunkedStream missing closed property
* pyarrow now converts dates as objects by default, which causes error because type is assumed datetime64
* ArrowTests fails due to difference in raised error message
* pyarrow.open_stream deprecated
* tests fail because groupby adds index column with duplicate name
## How was this patch tested?
Ran unit tests with pyarrow versions 0.8.0, 0.10.0, 0.11.1, 0.12.0
Closes #23657 from BryanCutler/arrow-upgrade-012.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-29 01:18:45 -05:00
|
|
|
expected = pdf.groupby(pdf['id'] % 2 == 0, as_index=False).apply(normalize.func)
|
2018-11-14 01:51:11 -05:00
|
|
|
expected = expected.sort_values(['id', 'v']).reset_index(drop=True)
|
|
|
|
expected = expected.assign(norm=expected.norm.astype('float64'))
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_empty_groupby(self):
|
|
|
|
df = self.data
|
|
|
|
|
|
|
|
@pandas_udf(
|
|
|
|
'id long, v int, norm double',
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
def normalize(pdf):
|
|
|
|
v = pdf.v
|
|
|
|
return pdf.assign(norm=(v - v.mean()) / v.std())
|
|
|
|
|
|
|
|
result = df.groupby().apply(normalize).sort('id', 'v').toPandas()
|
|
|
|
pdf = df.toPandas()
|
|
|
|
expected = normalize.func(pdf)
|
|
|
|
expected = expected.sort_values(['id', 'v']).reset_index(drop=True)
|
|
|
|
expected = expected.assign(norm=expected.norm.astype('float64'))
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_datatype_string(self):
|
|
|
|
df = self.data
|
|
|
|
|
|
|
|
foo_udf = pandas_udf(
|
|
|
|
lambda pdf: pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id),
|
|
|
|
'id long, v int, v1 double, v2 long',
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
|
|
|
|
expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_wrong_return_type(self):
|
|
|
|
with QuietTest(self.sc):
|
|
|
|
with self.assertRaisesRegexp(
|
|
|
|
NotImplementedError,
|
[SPARK-28264][PYTHON][SQL] Support type hints in pandas UDF and rename/move inconsistent pandas UDF types
### What changes were proposed in this pull request?
This PR proposes to redesign pandas UDFs as described in [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).
```python
from pyspark.sql.functions import pandas_udf
import pandas as pd
pandas_udf("long")
def plug_one(s: pd.Series) -> pd.Series:
return s + 1
spark.range(10).select(plug_one("id")).show()
```
```
+------------+
|plug_one(id)|
+------------+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+------------+
```
Note that, this PR address one of the future improvements described [here](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit#heading=h.h3ncjpk6ujqu), "A couple of less-intuitive pandas UDF types" (by zero323) together.
In short,
- Adds new way with type hints as an alternative and experimental way.
```python
pandas_udf(schema='...')
def func(c1: Series, c2: Series) -> DataFrame:
pass
```
- Replace and/or add an alias for three types below from UDF, and make them as separate standalone APIs. So, `pandas_udf` is now consistent with regular `udf`s and other expressions.
`df.mapInPandas(udf)` -replace-> `df.mapInPandas(f, schema)`
`df.groupby.apply(udf)` -alias-> `df.groupby.applyInPandas(f, schema)`
`df.groupby.cogroup.apply(udf)` -replace-> `df.groupby.cogroup.applyInPandas(f, schema)`
*`df.groupby.apply` was added from 2.3 while the other were added in the master only.
- No deprecation for the existing ways for now.
```python
pandas_udf(schema='...', functionType=PandasUDFType.SCALAR)
def func(c1, c2):
pass
```
If users are happy with this, I plan to deprecate the existing way and declare using type hints is not experimental anymore.
One design goal in this PR was that, avoid touching the internal (since we didn't deprecate the old ways for now), but supports type hints with a minimised changes only at the interface.
- Once we deprecate or remove the old ways, I think it requires another refactoring for the internal in the future. At the very least, we should rename internal pandas evaluation types.
- If users find this experimental type hints isn't quite helpful, we should simply revert the changes at the interface level.
### Why are the changes needed?
In order to address old design issues. Please see [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).
### Does this PR introduce any user-facing change?
For behaviour changes, No.
It adds new ways to use pandas UDFs by using type hints. See below.
**SCALAR**:
```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> Series:
pass # DataFrame represents a struct column
```
**SCALAR_ITER**:
```python
pandas_udf(schema='...')
def func(iter: Iterator[Tuple[Series, DataFrame, ...]]) -> Iterator[Series]:
pass # Same as SCALAR but wrapped by Iterator
```
**GROUPED_AGG**:
```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> int:
pass # DataFrame represents a struct column
```
**GROUPED_MAP**:
This was added in Spark 2.3 as of SPARK-20396. As described above, it keeps the existing behaviour. Additionally, we now have a new alias `groupby.applyInPandas` for `groupby.apply`. See the example below:
```python
def func(pdf):
return pdf
df.groupby("...").applyInPandas(func, schema=df.schema)
```
**MAP_ITER**: this is not a pandas UDF anymore
This was added in Spark 3.0 as of SPARK-28198; and this PR replaces the usages. See the example below:
```python
def func(iter):
for df in iter:
yield df
df.mapInPandas(func, df.schema)
```
**COGROUPED_MAP**: this is not a pandas UDF anymore
This was added in Spark 3.0 as of SPARK-27463; and this PR replaces the usages. See the example below:
```python
def asof_join(left, right):
return pd.merge_asof(left, right, on="...", by="...")
df1.groupby("...").cogroup(df2.groupby("...")).applyInPandas(asof_join, schema="...")
```
### How was this patch tested?
Unittests added and tested against Python 2.7, 3.6 and 3.7.
Closes #27165 from HyukjinKwon/revisit-pandas.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-22 01:32:58 -05:00
|
|
|
'Invalid return type.*grouped map Pandas UDF.*MapType'):
|
2018-11-14 01:51:11 -05:00
|
|
|
pandas_udf(
|
|
|
|
lambda pdf: pdf,
|
|
|
|
'id long, v map<int, int>',
|
|
|
|
PandasUDFType.GROUPED_MAP)
|
|
|
|
|
|
|
|
def test_wrong_args(self):
|
|
|
|
df = self.data
|
|
|
|
|
|
|
|
with QuietTest(self.sc):
|
|
|
|
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
|
|
|
|
df.groupby('id').apply(lambda x: x)
|
|
|
|
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
|
|
|
|
df.groupby('id').apply(udf(lambda x: x, DoubleType()))
|
|
|
|
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
|
|
|
|
df.groupby('id').apply(sum(df.v))
|
|
|
|
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
|
|
|
|
df.groupby('id').apply(df.v + 1)
|
|
|
|
with self.assertRaisesRegexp(ValueError, 'Invalid function'):
|
|
|
|
df.groupby('id').apply(
|
|
|
|
pandas_udf(lambda: 1, StructType([StructField("d", DoubleType())])))
|
|
|
|
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
|
|
|
|
df.groupby('id').apply(pandas_udf(lambda x, y: x, DoubleType()))
|
|
|
|
with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUPED_MAP'):
|
|
|
|
df.groupby('id').apply(
|
|
|
|
pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR))
|
|
|
|
|
|
|
|
def test_unsupported_types(self):
|
[SPARK-28264][PYTHON][SQL] Support type hints in pandas UDF and rename/move inconsistent pandas UDF types
### What changes were proposed in this pull request?
This PR proposes to redesign pandas UDFs as described in [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).
```python
from pyspark.sql.functions import pandas_udf
import pandas as pd
pandas_udf("long")
def plug_one(s: pd.Series) -> pd.Series:
return s + 1
spark.range(10).select(plug_one("id")).show()
```
```
+------------+
|plug_one(id)|
+------------+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+------------+
```
Note that, this PR address one of the future improvements described [here](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit#heading=h.h3ncjpk6ujqu), "A couple of less-intuitive pandas UDF types" (by zero323) together.
In short,
- Adds new way with type hints as an alternative and experimental way.
```python
pandas_udf(schema='...')
def func(c1: Series, c2: Series) -> DataFrame:
pass
```
- Replace and/or add an alias for three types below from UDF, and make them as separate standalone APIs. So, `pandas_udf` is now consistent with regular `udf`s and other expressions.
`df.mapInPandas(udf)` -replace-> `df.mapInPandas(f, schema)`
`df.groupby.apply(udf)` -alias-> `df.groupby.applyInPandas(f, schema)`
`df.groupby.cogroup.apply(udf)` -replace-> `df.groupby.cogroup.applyInPandas(f, schema)`
*`df.groupby.apply` was added from 2.3 while the other were added in the master only.
- No deprecation for the existing ways for now.
```python
pandas_udf(schema='...', functionType=PandasUDFType.SCALAR)
def func(c1, c2):
pass
```
If users are happy with this, I plan to deprecate the existing way and declare using type hints is not experimental anymore.
One design goal in this PR was that, avoid touching the internal (since we didn't deprecate the old ways for now), but supports type hints with a minimised changes only at the interface.
- Once we deprecate or remove the old ways, I think it requires another refactoring for the internal in the future. At the very least, we should rename internal pandas evaluation types.
- If users find this experimental type hints isn't quite helpful, we should simply revert the changes at the interface level.
### Why are the changes needed?
In order to address old design issues. Please see [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).
### Does this PR introduce any user-facing change?
For behaviour changes, No.
It adds new ways to use pandas UDFs by using type hints. See below.
**SCALAR**:
```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> Series:
pass # DataFrame represents a struct column
```
**SCALAR_ITER**:
```python
pandas_udf(schema='...')
def func(iter: Iterator[Tuple[Series, DataFrame, ...]]) -> Iterator[Series]:
pass # Same as SCALAR but wrapped by Iterator
```
**GROUPED_AGG**:
```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> int:
pass # DataFrame represents a struct column
```
**GROUPED_MAP**:
This was added in Spark 2.3 as of SPARK-20396. As described above, it keeps the existing behaviour. Additionally, we now have a new alias `groupby.applyInPandas` for `groupby.apply`. See the example below:
```python
def func(pdf):
return pdf
df.groupby("...").applyInPandas(func, schema=df.schema)
```
**MAP_ITER**: this is not a pandas UDF anymore
This was added in Spark 3.0 as of SPARK-28198; and this PR replaces the usages. See the example below:
```python
def func(iter):
for df in iter:
yield df
df.mapInPandas(func, df.schema)
```
**COGROUPED_MAP**: this is not a pandas UDF anymore
This was added in Spark 3.0 as of SPARK-27463; and this PR replaces the usages. See the example below:
```python
def asof_join(left, right):
return pd.merge_asof(left, right, on="...", by="...")
df1.groupby("...").cogroup(df2.groupby("...")).applyInPandas(asof_join, schema="...")
```
### How was this patch tested?
Unittests added and tested against Python 2.7, 3.6 and 3.7.
Closes #27165 from HyukjinKwon/revisit-pandas.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-22 01:32:58 -05:00
|
|
|
common_err_msg = 'Invalid return type.*grouped map Pandas UDF.*'
|
2018-11-14 01:51:11 -05:00
|
|
|
unsupported_types = [
|
|
|
|
StructField('map', MapType(StringType(), IntegerType())),
|
|
|
|
StructField('arr_ts', ArrayType(TimestampType())),
|
|
|
|
StructField('null', NullType()),
|
2019-03-07 11:52:24 -05:00
|
|
|
StructField('struct', StructType([StructField('l', LongType())])),
|
2018-11-14 01:51:11 -05:00
|
|
|
]
|
|
|
|
|
|
|
|
for unsupported_type in unsupported_types:
|
|
|
|
schema = StructType([StructField('id', LongType(), True), unsupported_type])
|
|
|
|
with QuietTest(self.sc):
|
|
|
|
with self.assertRaisesRegexp(NotImplementedError, common_err_msg):
|
|
|
|
pandas_udf(lambda x: x, schema, PandasUDFType.GROUPED_MAP)
|
|
|
|
|
|
|
|
# Regression test for SPARK-23314
|
|
|
|
def test_timestamp_dst(self):
|
|
|
|
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
|
|
|
|
dt = [datetime.datetime(2015, 11, 1, 0, 30),
|
|
|
|
datetime.datetime(2015, 11, 1, 1, 30),
|
|
|
|
datetime.datetime(2015, 11, 1, 2, 30)]
|
|
|
|
df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
|
|
|
|
foo_udf = pandas_udf(lambda pdf: pdf, 'time timestamp', PandasUDFType.GROUPED_MAP)
|
|
|
|
result = df.groupby('time').apply(foo_udf).sort('time')
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(df.toPandas(), result.toPandas())
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_udf_with_key(self):
|
2018-12-13 21:45:24 -05:00
|
|
|
import numpy as np
|
|
|
|
|
2018-11-14 01:51:11 -05:00
|
|
|
df = self.data
|
|
|
|
pdf = df.toPandas()
|
|
|
|
|
|
|
|
def foo1(key, pdf):
|
|
|
|
assert type(key) == tuple
|
|
|
|
assert type(key[0]) == np.int64
|
|
|
|
|
|
|
|
return pdf.assign(v1=key[0],
|
|
|
|
v2=pdf.v * key[0],
|
|
|
|
v3=pdf.v * pdf.id,
|
|
|
|
v4=pdf.v * pdf.id.mean())
|
|
|
|
|
|
|
|
def foo2(key, pdf):
|
|
|
|
assert type(key) == tuple
|
|
|
|
assert type(key[0]) == np.int64
|
|
|
|
assert type(key[1]) == np.int32
|
|
|
|
|
|
|
|
return pdf.assign(v1=key[0],
|
|
|
|
v2=key[1],
|
|
|
|
v3=pdf.v * key[0],
|
|
|
|
v4=pdf.v + key[1])
|
|
|
|
|
|
|
|
def foo3(key, pdf):
|
|
|
|
assert type(key) == tuple
|
|
|
|
assert len(key) == 0
|
|
|
|
return pdf.assign(v1=pdf.v * pdf.id)
|
|
|
|
|
|
|
|
# v2 is int because numpy.int64 * pd.Series<int32> results in pd.Series<int32>
|
|
|
|
# v3 is long because pd.Series<int64> * pd.Series<int32> results in pd.Series<int64>
|
|
|
|
udf1 = pandas_udf(
|
|
|
|
foo1,
|
|
|
|
'id long, v int, v1 long, v2 int, v3 long, v4 double',
|
|
|
|
PandasUDFType.GROUPED_MAP)
|
|
|
|
|
|
|
|
udf2 = pandas_udf(
|
|
|
|
foo2,
|
|
|
|
'id long, v int, v1 long, v2 int, v3 int, v4 int',
|
|
|
|
PandasUDFType.GROUPED_MAP)
|
|
|
|
|
|
|
|
udf3 = pandas_udf(
|
|
|
|
foo3,
|
|
|
|
'id long, v int, v1 long',
|
|
|
|
PandasUDFType.GROUPED_MAP)
|
|
|
|
|
|
|
|
# Test groupby column
|
|
|
|
result1 = df.groupby('id').apply(udf1).sort('id', 'v').toPandas()
|
[SPARK-26566][PYTHON][SQL] Upgrade Apache Arrow to version 0.12.0
## What changes were proposed in this pull request?
Upgrade Apache Arrow to version 0.12.0. This includes the Java artifacts and fixes to enable usage with pyarrow 0.12.0
Version 0.12.0 includes the following selected fixes/improvements relevant to Spark users:
* Safe cast fails from numpy float64 array with nans to integer, ARROW-4258
* Java, Reduce heap usage for variable width vectors, ARROW-4147
* Binary identity cast not implemented, ARROW-4101
* pyarrow open_stream deprecated, use ipc.open_stream, ARROW-4098
* conversion to date object no longer needed, ARROW-3910
* Error reading IPC file with no record batches, ARROW-3894
* Signed to unsigned integer cast yields incorrect results when type sizes are the same, ARROW-3790
* from_pandas gives incorrect results when converting floating point to bool, ARROW-3428
* Import pyarrow fails if scikit-learn is installed from conda (boost-cpp / libboost issue), ARROW-3048
* Java update to official Flatbuffers version 1.9.0, ARROW-3175
complete list [here](https://issues.apache.org/jira/issues/?jql=project%20%3D%20ARROW%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.12.0)
PySpark requires the following fixes to work with PyArrow 0.12.0
* Encrypted pyspark worker fails due to ChunkedStream missing closed property
* pyarrow now converts dates as objects by default, which causes error because type is assumed datetime64
* ArrowTests fails due to difference in raised error message
* pyarrow.open_stream deprecated
* tests fail because groupby adds index column with duplicate name
## How was this patch tested?
Ran unit tests with pyarrow versions 0.8.0, 0.10.0, 0.11.1, 0.12.0
Closes #23657 from BryanCutler/arrow-upgrade-012.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-29 01:18:45 -05:00
|
|
|
expected1 = pdf.groupby('id', as_index=False)\
|
2018-11-14 01:51:11 -05:00
|
|
|
.apply(lambda x: udf1.func((x.id.iloc[0],), x))\
|
|
|
|
.sort_values(['id', 'v']).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected1, result1)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
# Test groupby expression
|
|
|
|
result2 = df.groupby(df.id % 2).apply(udf1).sort('id', 'v').toPandas()
|
[SPARK-26566][PYTHON][SQL] Upgrade Apache Arrow to version 0.12.0
## What changes were proposed in this pull request?
Upgrade Apache Arrow to version 0.12.0. This includes the Java artifacts and fixes to enable usage with pyarrow 0.12.0
Version 0.12.0 includes the following selected fixes/improvements relevant to Spark users:
* Safe cast fails from numpy float64 array with nans to integer, ARROW-4258
* Java, Reduce heap usage for variable width vectors, ARROW-4147
* Binary identity cast not implemented, ARROW-4101
* pyarrow open_stream deprecated, use ipc.open_stream, ARROW-4098
* conversion to date object no longer needed, ARROW-3910
* Error reading IPC file with no record batches, ARROW-3894
* Signed to unsigned integer cast yields incorrect results when type sizes are the same, ARROW-3790
* from_pandas gives incorrect results when converting floating point to bool, ARROW-3428
* Import pyarrow fails if scikit-learn is installed from conda (boost-cpp / libboost issue), ARROW-3048
* Java update to official Flatbuffers version 1.9.0, ARROW-3175
complete list [here](https://issues.apache.org/jira/issues/?jql=project%20%3D%20ARROW%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.12.0)
PySpark requires the following fixes to work with PyArrow 0.12.0
* Encrypted pyspark worker fails due to ChunkedStream missing closed property
* pyarrow now converts dates as objects by default, which causes error because type is assumed datetime64
* ArrowTests fails due to difference in raised error message
* pyarrow.open_stream deprecated
* tests fail because groupby adds index column with duplicate name
## How was this patch tested?
Ran unit tests with pyarrow versions 0.8.0, 0.10.0, 0.11.1, 0.12.0
Closes #23657 from BryanCutler/arrow-upgrade-012.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-29 01:18:45 -05:00
|
|
|
expected2 = pdf.groupby(pdf.id % 2, as_index=False)\
|
2018-11-14 01:51:11 -05:00
|
|
|
.apply(lambda x: udf1.func((x.id.iloc[0] % 2,), x))\
|
|
|
|
.sort_values(['id', 'v']).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected2, result2)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
# Test complex groupby
|
|
|
|
result3 = df.groupby(df.id, df.v % 2).apply(udf2).sort('id', 'v').toPandas()
|
[SPARK-26566][PYTHON][SQL] Upgrade Apache Arrow to version 0.12.0
## What changes were proposed in this pull request?
Upgrade Apache Arrow to version 0.12.0. This includes the Java artifacts and fixes to enable usage with pyarrow 0.12.0
Version 0.12.0 includes the following selected fixes/improvements relevant to Spark users:
* Safe cast fails from numpy float64 array with nans to integer, ARROW-4258
* Java, Reduce heap usage for variable width vectors, ARROW-4147
* Binary identity cast not implemented, ARROW-4101
* pyarrow open_stream deprecated, use ipc.open_stream, ARROW-4098
* conversion to date object no longer needed, ARROW-3910
* Error reading IPC file with no record batches, ARROW-3894
* Signed to unsigned integer cast yields incorrect results when type sizes are the same, ARROW-3790
* from_pandas gives incorrect results when converting floating point to bool, ARROW-3428
* Import pyarrow fails if scikit-learn is installed from conda (boost-cpp / libboost issue), ARROW-3048
* Java update to official Flatbuffers version 1.9.0, ARROW-3175
complete list [here](https://issues.apache.org/jira/issues/?jql=project%20%3D%20ARROW%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.12.0)
PySpark requires the following fixes to work with PyArrow 0.12.0
* Encrypted pyspark worker fails due to ChunkedStream missing closed property
* pyarrow now converts dates as objects by default, which causes error because type is assumed datetime64
* ArrowTests fails due to difference in raised error message
* pyarrow.open_stream deprecated
* tests fail because groupby adds index column with duplicate name
## How was this patch tested?
Ran unit tests with pyarrow versions 0.8.0, 0.10.0, 0.11.1, 0.12.0
Closes #23657 from BryanCutler/arrow-upgrade-012.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-29 01:18:45 -05:00
|
|
|
expected3 = pdf.groupby([pdf.id, pdf.v % 2], as_index=False)\
|
2018-11-14 01:51:11 -05:00
|
|
|
.apply(lambda x: udf2.func((x.id.iloc[0], (x.v % 2).iloc[0],), x))\
|
|
|
|
.sort_values(['id', 'v']).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected3, result3)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
# Test empty groupby
|
|
|
|
result4 = df.groupby().apply(udf3).sort('id', 'v').toPandas()
|
|
|
|
expected4 = udf3.func((), pdf)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected4, result4)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_column_order(self):
|
|
|
|
|
|
|
|
# Helper function to set column names from a list
|
|
|
|
def rename_pdf(pdf, names):
|
|
|
|
pdf.rename(columns={old: new for old, new in
|
|
|
|
zip(pd_result.columns, names)}, inplace=True)
|
|
|
|
|
|
|
|
df = self.data
|
|
|
|
grouped_df = df.groupby('id')
|
[SPARK-26566][PYTHON][SQL] Upgrade Apache Arrow to version 0.12.0
## What changes were proposed in this pull request?
Upgrade Apache Arrow to version 0.12.0. This includes the Java artifacts and fixes to enable usage with pyarrow 0.12.0
Version 0.12.0 includes the following selected fixes/improvements relevant to Spark users:
* Safe cast fails from numpy float64 array with nans to integer, ARROW-4258
* Java, Reduce heap usage for variable width vectors, ARROW-4147
* Binary identity cast not implemented, ARROW-4101
* pyarrow open_stream deprecated, use ipc.open_stream, ARROW-4098
* conversion to date object no longer needed, ARROW-3910
* Error reading IPC file with no record batches, ARROW-3894
* Signed to unsigned integer cast yields incorrect results when type sizes are the same, ARROW-3790
* from_pandas gives incorrect results when converting floating point to bool, ARROW-3428
* Import pyarrow fails if scikit-learn is installed from conda (boost-cpp / libboost issue), ARROW-3048
* Java update to official Flatbuffers version 1.9.0, ARROW-3175
complete list [here](https://issues.apache.org/jira/issues/?jql=project%20%3D%20ARROW%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%200.12.0)
PySpark requires the following fixes to work with PyArrow 0.12.0
* Encrypted pyspark worker fails due to ChunkedStream missing closed property
* pyarrow now converts dates as objects by default, which causes error because type is assumed datetime64
* ArrowTests fails due to difference in raised error message
* pyarrow.open_stream deprecated
* tests fail because groupby adds index column with duplicate name
## How was this patch tested?
Ran unit tests with pyarrow versions 0.8.0, 0.10.0, 0.11.1, 0.12.0
Closes #23657 from BryanCutler/arrow-upgrade-012.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-29 01:18:45 -05:00
|
|
|
grouped_pdf = df.toPandas().groupby('id', as_index=False)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
# Function returns a pdf with required column names, but order could be arbitrary using dict
|
|
|
|
def change_col_order(pdf):
|
|
|
|
# Constructing a DataFrame from a dict should result in the same order,
|
2020-02-10 20:03:01 -05:00
|
|
|
# but use OrderedDict to ensure the pdf column order is different than schema
|
|
|
|
return pd.DataFrame.from_dict(OrderedDict([
|
2018-11-14 01:51:11 -05:00
|
|
|
('id', pdf.id),
|
|
|
|
('u', pdf.v * 2),
|
2020-02-10 20:03:01 -05:00
|
|
|
('v', pdf.v)]))
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
ordered_udf = pandas_udf(
|
|
|
|
change_col_order,
|
|
|
|
'id long, v int, u int',
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
# The UDF result should assign columns by name from the pdf
|
|
|
|
result = grouped_df.apply(ordered_udf).sort('id', 'v')\
|
|
|
|
.select('id', 'u', 'v').toPandas()
|
|
|
|
pd_result = grouped_pdf.apply(change_col_order)
|
|
|
|
expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
# Function returns a pdf with positional columns, indexed by range
|
|
|
|
def range_col_order(pdf):
|
|
|
|
# Create a DataFrame with positional columns, fix types to long
|
|
|
|
return pd.DataFrame(list(zip(pdf.id, pdf.v * 3, pdf.v)), dtype='int64')
|
|
|
|
|
|
|
|
range_udf = pandas_udf(
|
|
|
|
range_col_order,
|
|
|
|
'id long, u long, v long',
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
# The UDF result uses positional columns from the pdf
|
|
|
|
result = grouped_df.apply(range_udf).sort('id', 'v') \
|
|
|
|
.select('id', 'u', 'v').toPandas()
|
|
|
|
pd_result = grouped_pdf.apply(range_col_order)
|
|
|
|
rename_pdf(pd_result, ['id', 'u', 'v'])
|
|
|
|
expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
# Function returns a pdf with columns indexed with integers
|
|
|
|
def int_index(pdf):
|
|
|
|
return pd.DataFrame(OrderedDict([(0, pdf.id), (1, pdf.v * 4), (2, pdf.v)]))
|
|
|
|
|
|
|
|
int_index_udf = pandas_udf(
|
|
|
|
int_index,
|
|
|
|
'id long, u int, v int',
|
|
|
|
PandasUDFType.GROUPED_MAP
|
|
|
|
)
|
|
|
|
|
|
|
|
# The UDF result should assign columns by position of integer index
|
|
|
|
result = grouped_df.apply(int_index_udf).sort('id', 'v') \
|
|
|
|
.select('id', 'u', 'v').toPandas()
|
|
|
|
pd_result = grouped_pdf.apply(int_index)
|
|
|
|
rename_pdf(pd_result, ['id', 'u', 'v'])
|
|
|
|
expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True)
|
2020-07-13 22:22:44 -04:00
|
|
|
assert_frame_equal(expected, result)
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
@pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP)
|
|
|
|
def column_name_typo(pdf):
|
|
|
|
return pd.DataFrame({'iid': pdf.id, 'v': pdf.v})
|
|
|
|
|
[SPARK-33073][PYTHON] Improve error handling on Pandas to Arrow conversion failures
### What changes were proposed in this pull request?
This improves error handling when a failure in conversion from Pandas to Arrow occurs. And fixes tests to be compatible with upcoming Arrow 2.0.0 release.
### Why are the changes needed?
Current tests will fail with Arrow 2.0.0 because of a change in error message when the schema is invalid. For these cases, the current error message also includes information on disabling safe conversion config, which is mainly meant for floating point truncation and overflow. The tests have been updated to use a message that is show for past Arrow versions, and upcoming.
If the user enters an invalid schema, the error produced by pyarrow is not consistent and either `TypeError` or `ArrowInvalid`, with the latter being caught, and raised as a `RuntimeError` with the extra info.
The error handling is improved by:
- narrowing the exception type to `TypeError`s, which `ArrowInvalid` is a subclass and what is raised on safe conversion failures.
- The exception is only raised with additional information on disabling "spark.sql.execution.pandas.convertToArrowArraySafely" if it is enabled in the first place.
- The original exception is chained to better show it to the user.
### Does this PR introduce _any_ user-facing change?
Yes, the error re-raised changes from a RuntimeError to a ValueError, which better categorizes this type of error and in-line with the original Arrow error.
### How was this patch tested?
Existing tests, using pyarrow 1.0.1 and 2.0.0-snapshot
Closes #29951 from BryanCutler/arrow-better-handle-pandas-errors-SPARK-33073.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-06 05:11:24 -04:00
|
|
|
@pandas_udf('id long, v decimal', PandasUDFType.GROUPED_MAP)
|
2018-11-14 01:51:11 -05:00
|
|
|
def invalid_positional_types(pdf):
|
[SPARK-33073][PYTHON] Improve error handling on Pandas to Arrow conversion failures
### What changes were proposed in this pull request?
This improves error handling when a failure in conversion from Pandas to Arrow occurs. And fixes tests to be compatible with upcoming Arrow 2.0.0 release.
### Why are the changes needed?
Current tests will fail with Arrow 2.0.0 because of a change in error message when the schema is invalid. For these cases, the current error message also includes information on disabling safe conversion config, which is mainly meant for floating point truncation and overflow. The tests have been updated to use a message that is show for past Arrow versions, and upcoming.
If the user enters an invalid schema, the error produced by pyarrow is not consistent and either `TypeError` or `ArrowInvalid`, with the latter being caught, and raised as a `RuntimeError` with the extra info.
The error handling is improved by:
- narrowing the exception type to `TypeError`s, which `ArrowInvalid` is a subclass and what is raised on safe conversion failures.
- The exception is only raised with additional information on disabling "spark.sql.execution.pandas.convertToArrowArraySafely" if it is enabled in the first place.
- The original exception is chained to better show it to the user.
### Does this PR introduce _any_ user-facing change?
Yes, the error re-raised changes from a RuntimeError to a ValueError, which better categorizes this type of error and in-line with the original Arrow error.
### How was this patch tested?
Existing tests, using pyarrow 1.0.1 and 2.0.0-snapshot
Closes #29951 from BryanCutler/arrow-better-handle-pandas-errors-SPARK-33073.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-06 05:11:24 -04:00
|
|
|
return pd.DataFrame([(1, datetime.date(2020, 10, 5))])
|
2018-11-14 01:51:11 -05:00
|
|
|
|
[SPARK-33073][PYTHON] Improve error handling on Pandas to Arrow conversion failures
### What changes were proposed in this pull request?
This improves error handling when a failure in conversion from Pandas to Arrow occurs. And fixes tests to be compatible with upcoming Arrow 2.0.0 release.
### Why are the changes needed?
Current tests will fail with Arrow 2.0.0 because of a change in error message when the schema is invalid. For these cases, the current error message also includes information on disabling safe conversion config, which is mainly meant for floating point truncation and overflow. The tests have been updated to use a message that is show for past Arrow versions, and upcoming.
If the user enters an invalid schema, the error produced by pyarrow is not consistent and either `TypeError` or `ArrowInvalid`, with the latter being caught, and raised as a `RuntimeError` with the extra info.
The error handling is improved by:
- narrowing the exception type to `TypeError`s, which `ArrowInvalid` is a subclass and what is raised on safe conversion failures.
- The exception is only raised with additional information on disabling "spark.sql.execution.pandas.convertToArrowArraySafely" if it is enabled in the first place.
- The original exception is chained to better show it to the user.
### Does this PR introduce _any_ user-facing change?
Yes, the error re-raised changes from a RuntimeError to a ValueError, which better categorizes this type of error and in-line with the original Arrow error.
### How was this patch tested?
Existing tests, using pyarrow 1.0.1 and 2.0.0-snapshot
Closes #29951 from BryanCutler/arrow-better-handle-pandas-errors-SPARK-33073.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-06 05:11:24 -04:00
|
|
|
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
|
|
|
|
with QuietTest(self.sc):
|
|
|
|
with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
|
|
|
|
grouped_df.apply(column_name_typo).collect()
|
|
|
|
with self.assertRaisesRegexp(Exception, "[D|d]ecimal.*got.*date"):
|
|
|
|
grouped_df.apply(invalid_positional_types).collect()
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
def test_positional_assignment_conf(self):
|
|
|
|
with self.sql_conf({
|
|
|
|
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
|
|
|
|
|
|
|
|
@pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP)
|
|
|
|
def foo(_):
|
|
|
|
return pd.DataFrame([('hi', 1)], columns=['x', 'y'])
|
|
|
|
|
|
|
|
df = self.data
|
|
|
|
result = df.groupBy('id').apply(foo).select('a', 'b').collect()
|
|
|
|
for r in result:
|
|
|
|
self.assertEqual(r.a, 'hi')
|
|
|
|
self.assertEqual(r.b, 1)
|
|
|
|
|
|
|
|
def test_self_join_with_pandas(self):
|
2018-12-13 21:45:24 -05:00
|
|
|
@pandas_udf('key long, col string', PandasUDFType.GROUPED_MAP)
|
2018-11-14 01:51:11 -05:00
|
|
|
def dummy_pandas_udf(df):
|
|
|
|
return df[['key', 'col']]
|
|
|
|
|
|
|
|
df = self.spark.createDataFrame([Row(key=1, col='A'), Row(key=1, col='B'),
|
|
|
|
Row(key=2, col='C')])
|
|
|
|
df_with_pandas = df.groupBy('key').apply(dummy_pandas_udf)
|
|
|
|
|
|
|
|
# this was throwing an AnalysisException before SPARK-24208
|
|
|
|
res = df_with_pandas.alias('temp0').join(df_with_pandas.alias('temp1'),
|
2018-12-13 21:45:24 -05:00
|
|
|
col('temp0.key') == col('temp1.key'))
|
2018-11-14 01:51:11 -05:00
|
|
|
self.assertEquals(res.count(), 5)
|
|
|
|
|
|
|
|
def test_mixed_scalar_udfs_followed_by_grouby_apply(self):
|
|
|
|
df = self.spark.range(0, 10).toDF('v1')
|
|
|
|
df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \
|
|
|
|
.withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1']))
|
|
|
|
|
|
|
|
result = df.groupby() \
|
|
|
|
.apply(pandas_udf(lambda x: pd.DataFrame([x.sum().sum()]),
|
|
|
|
'sum int',
|
|
|
|
PandasUDFType.GROUPED_MAP))
|
|
|
|
|
|
|
|
self.assertEquals(result.collect()[0]['sum'], 165)
|
|
|
|
|
[SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions
## What changes were proposed in this pull request?
When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". If the data is small, e.g. in testing, many of the partitions will be empty but are treated just the same.
This PR checks the `mapPartitionsInternal` iterator to be non-empty before calling `ArrowPythonRunner` to start computation on the iterator.
## How was this patch tested?
Existing tests. Ran the following benchmarks a simple example where most partitions are empty:
```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
v = pdf.v
return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").apply(normalize).count()
```
**Before**
```
In [4]: %timeit df.groupby("id").apply(normalize).count()
1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [5]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [6]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```
**After this Change**
```
In [2]: %timeit df.groupby("id").apply(normalize).count()
646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [3]: %timeit df.groupby("id").apply(normalize).count()
408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [4]: %timeit df.groupby("id").apply(normalize).count()
381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```
Closes #24926 from BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-21 22:20:35 -04:00
|
|
|
def test_grouped_with_empty_partition(self):
|
|
|
|
data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
|
|
|
|
expected = [Row(id=1, x=5), Row(id=1, x=5), Row(id=2, x=4)]
|
|
|
|
num_parts = len(data) + 1
|
|
|
|
df = self.spark.createDataFrame(self.sc.parallelize(data, numSlices=num_parts))
|
|
|
|
|
|
|
|
f = pandas_udf(lambda pdf: pdf.assign(x=pdf['x'].sum()),
|
|
|
|
'id long, x int', PandasUDFType.GROUPED_MAP)
|
|
|
|
|
|
|
|
result = df.groupBy('id').apply(f).collect()
|
|
|
|
self.assertEqual(result, expected)
|
|
|
|
|
2019-10-11 19:19:13 -04:00
|
|
|
def test_grouped_over_window(self):
|
|
|
|
|
|
|
|
data = [(0, 1, "2018-03-10T00:00:00+00:00", [0]),
|
|
|
|
(1, 2, "2018-03-11T00:00:00+00:00", [0]),
|
|
|
|
(2, 2, "2018-03-12T00:00:00+00:00", [0]),
|
|
|
|
(3, 3, "2018-03-15T00:00:00+00:00", [0]),
|
|
|
|
(4, 3, "2018-03-16T00:00:00+00:00", [0]),
|
|
|
|
(5, 3, "2018-03-17T00:00:00+00:00", [0]),
|
|
|
|
(6, 3, "2018-03-21T00:00:00+00:00", [0])]
|
|
|
|
|
|
|
|
expected = {0: [0],
|
|
|
|
1: [1, 2],
|
|
|
|
2: [1, 2],
|
|
|
|
3: [3, 4, 5],
|
|
|
|
4: [3, 4, 5],
|
|
|
|
5: [3, 4, 5],
|
|
|
|
6: [6]}
|
|
|
|
|
|
|
|
df = self.spark.createDataFrame(data, ['id', 'group', 'ts', 'result'])
|
|
|
|
df = df.select(col('id'), col('group'), col('ts').cast('timestamp'), col('result'))
|
|
|
|
|
|
|
|
def f(pdf):
|
|
|
|
# Assign each result element the ids of the windowed group
|
|
|
|
pdf['result'] = [pdf['id']] * len(pdf)
|
|
|
|
return pdf
|
|
|
|
|
[SPARK-28264][PYTHON][SQL] Support type hints in pandas UDF and rename/move inconsistent pandas UDF types
### What changes were proposed in this pull request?
This PR proposes to redesign pandas UDFs as described in [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).
```python
from pyspark.sql.functions import pandas_udf
import pandas as pd
pandas_udf("long")
def plug_one(s: pd.Series) -> pd.Series:
return s + 1
spark.range(10).select(plug_one("id")).show()
```
```
+------------+
|plug_one(id)|
+------------+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+------------+
```
Note that, this PR address one of the future improvements described [here](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit#heading=h.h3ncjpk6ujqu), "A couple of less-intuitive pandas UDF types" (by zero323) together.
In short,
- Adds new way with type hints as an alternative and experimental way.
```python
pandas_udf(schema='...')
def func(c1: Series, c2: Series) -> DataFrame:
pass
```
- Replace and/or add an alias for three types below from UDF, and make them as separate standalone APIs. So, `pandas_udf` is now consistent with regular `udf`s and other expressions.
`df.mapInPandas(udf)` -replace-> `df.mapInPandas(f, schema)`
`df.groupby.apply(udf)` -alias-> `df.groupby.applyInPandas(f, schema)`
`df.groupby.cogroup.apply(udf)` -replace-> `df.groupby.cogroup.applyInPandas(f, schema)`
*`df.groupby.apply` was added from 2.3 while the other were added in the master only.
- No deprecation for the existing ways for now.
```python
pandas_udf(schema='...', functionType=PandasUDFType.SCALAR)
def func(c1, c2):
pass
```
If users are happy with this, I plan to deprecate the existing way and declare using type hints is not experimental anymore.
One design goal in this PR was that, avoid touching the internal (since we didn't deprecate the old ways for now), but supports type hints with a minimised changes only at the interface.
- Once we deprecate or remove the old ways, I think it requires another refactoring for the internal in the future. At the very least, we should rename internal pandas evaluation types.
- If users find this experimental type hints isn't quite helpful, we should simply revert the changes at the interface level.
### Why are the changes needed?
In order to address old design issues. Please see [the proposal](https://docs.google.com/document/d/1-kV0FS_LF2zvaRh_GhkV32Uqksm_Sq8SvnBBmRyxm30/edit?usp=sharing).
### Does this PR introduce any user-facing change?
For behaviour changes, No.
It adds new ways to use pandas UDFs by using type hints. See below.
**SCALAR**:
```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> Series:
pass # DataFrame represents a struct column
```
**SCALAR_ITER**:
```python
pandas_udf(schema='...')
def func(iter: Iterator[Tuple[Series, DataFrame, ...]]) -> Iterator[Series]:
pass # Same as SCALAR but wrapped by Iterator
```
**GROUPED_AGG**:
```python
pandas_udf(schema='...')
def func(c1: Series, c2: DataFrame) -> int:
pass # DataFrame represents a struct column
```
**GROUPED_MAP**:
This was added in Spark 2.3 as of SPARK-20396. As described above, it keeps the existing behaviour. Additionally, we now have a new alias `groupby.applyInPandas` for `groupby.apply`. See the example below:
```python
def func(pdf):
return pdf
df.groupby("...").applyInPandas(func, schema=df.schema)
```
**MAP_ITER**: this is not a pandas UDF anymore
This was added in Spark 3.0 as of SPARK-28198; and this PR replaces the usages. See the example below:
```python
def func(iter):
for df in iter:
yield df
df.mapInPandas(func, df.schema)
```
**COGROUPED_MAP**: this is not a pandas UDF anymore
This was added in Spark 3.0 as of SPARK-27463; and this PR replaces the usages. See the example below:
```python
def asof_join(left, right):
return pd.merge_asof(left, right, on="...", by="...")
df1.groupby("...").cogroup(df2.groupby("...")).applyInPandas(asof_join, schema="...")
```
### How was this patch tested?
Unittests added and tested against Python 2.7, 3.6 and 3.7.
Closes #27165 from HyukjinKwon/revisit-pandas.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-22 01:32:58 -05:00
|
|
|
result = df.groupby('group', window('ts', '5 days')).applyInPandas(f, df.schema)\
|
2019-10-11 19:19:13 -04:00
|
|
|
.select('id', 'result').collect()
|
|
|
|
for r in result:
|
|
|
|
self.assertListEqual(expected[r[0]], r[1])
|
|
|
|
|
|
|
|
def test_grouped_over_window_with_key(self):
|
|
|
|
|
[SPARK-32162][PYTHON][TESTS] Improve error message of Pandas grouped map test with window
### What changes were proposed in this pull request?
Improve the error message in test GroupedMapInPandasTests.test_grouped_over_window_with_key to show the incorrect values.
### Why are the changes needed?
This test failure has come up often in Arrow testing because it tests a struct with timestamp values through a Pandas UDF. The current error message is not helpful as it doesn't show the incorrect values, only that it failed. This change will instead raise an assertion error with the incorrect values on a failure.
Before:
```
======================================================================
FAIL: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/spark/python/pyspark/sql/tests/test_pandas_grouped_map.py", line 588, in test_grouped_over_window_with_key
self.assertTrue(all([r[0] for r in result]))
AssertionError: False is not true
```
After:
```
======================================================================
ERROR: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
...
AssertionError: {'start': datetime.datetime(2018, 3, 20, 0, 0), 'end': datetime.datetime(2018, 3, 25, 0, 0)}, != {'start': datetime.datetime(2020, 3, 20, 0, 0), 'end': datetime.datetime(2020, 3, 25, 0, 0)}
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Improved existing test
Closes #28987 from BryanCutler/pandas-grouped-map-test-output-SPARK-32162.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-06 08:39:41 -04:00
|
|
|
data = [(0, 1, "2018-03-10T00:00:00+00:00", [0]),
|
|
|
|
(1, 2, "2018-03-11T00:00:00+00:00", [0]),
|
|
|
|
(2, 2, "2018-03-12T00:00:00+00:00", [0]),
|
|
|
|
(3, 3, "2018-03-15T00:00:00+00:00", [0]),
|
|
|
|
(4, 3, "2018-03-16T00:00:00+00:00", [0]),
|
|
|
|
(5, 3, "2018-03-17T00:00:00+00:00", [0]),
|
|
|
|
(6, 3, "2018-03-21T00:00:00+00:00", [0])]
|
2019-10-11 19:19:13 -04:00
|
|
|
|
|
|
|
expected_window = [
|
|
|
|
{'start': datetime.datetime(2018, 3, 10, 0, 0),
|
|
|
|
'end': datetime.datetime(2018, 3, 15, 0, 0)},
|
|
|
|
{'start': datetime.datetime(2018, 3, 15, 0, 0),
|
|
|
|
'end': datetime.datetime(2018, 3, 20, 0, 0)},
|
|
|
|
{'start': datetime.datetime(2018, 3, 20, 0, 0),
|
|
|
|
'end': datetime.datetime(2018, 3, 25, 0, 0)},
|
|
|
|
]
|
|
|
|
|
[SPARK-32162][PYTHON][TESTS] Improve error message of Pandas grouped map test with window
### What changes were proposed in this pull request?
Improve the error message in test GroupedMapInPandasTests.test_grouped_over_window_with_key to show the incorrect values.
### Why are the changes needed?
This test failure has come up often in Arrow testing because it tests a struct with timestamp values through a Pandas UDF. The current error message is not helpful as it doesn't show the incorrect values, only that it failed. This change will instead raise an assertion error with the incorrect values on a failure.
Before:
```
======================================================================
FAIL: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/spark/python/pyspark/sql/tests/test_pandas_grouped_map.py", line 588, in test_grouped_over_window_with_key
self.assertTrue(all([r[0] for r in result]))
AssertionError: False is not true
```
After:
```
======================================================================
ERROR: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
...
AssertionError: {'start': datetime.datetime(2018, 3, 20, 0, 0), 'end': datetime.datetime(2018, 3, 25, 0, 0)}, != {'start': datetime.datetime(2020, 3, 20, 0, 0), 'end': datetime.datetime(2020, 3, 25, 0, 0)}
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Improved existing test
Closes #28987 from BryanCutler/pandas-grouped-map-test-output-SPARK-32162.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-06 08:39:41 -04:00
|
|
|
expected_key = {0: (1, expected_window[0]),
|
|
|
|
1: (2, expected_window[0]),
|
|
|
|
2: (2, expected_window[0]),
|
|
|
|
3: (3, expected_window[1]),
|
|
|
|
4: (3, expected_window[1]),
|
|
|
|
5: (3, expected_window[1]),
|
|
|
|
6: (3, expected_window[2])}
|
|
|
|
|
|
|
|
# id -> array of group with len of num records in window
|
|
|
|
expected = {0: [1],
|
|
|
|
1: [2, 2],
|
|
|
|
2: [2, 2],
|
|
|
|
3: [3, 3, 3],
|
|
|
|
4: [3, 3, 3],
|
|
|
|
5: [3, 3, 3],
|
|
|
|
6: [3]}
|
2019-10-11 19:19:13 -04:00
|
|
|
|
|
|
|
df = self.spark.createDataFrame(data, ['id', 'group', 'ts', 'result'])
|
|
|
|
df = df.select(col('id'), col('group'), col('ts').cast('timestamp'), col('result'))
|
|
|
|
|
|
|
|
def f(key, pdf):
|
|
|
|
group = key[0]
|
|
|
|
window_range = key[1]
|
|
|
|
|
[SPARK-32162][PYTHON][TESTS] Improve error message of Pandas grouped map test with window
### What changes were proposed in this pull request?
Improve the error message in test GroupedMapInPandasTests.test_grouped_over_window_with_key to show the incorrect values.
### Why are the changes needed?
This test failure has come up often in Arrow testing because it tests a struct with timestamp values through a Pandas UDF. The current error message is not helpful as it doesn't show the incorrect values, only that it failed. This change will instead raise an assertion error with the incorrect values on a failure.
Before:
```
======================================================================
FAIL: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/spark/python/pyspark/sql/tests/test_pandas_grouped_map.py", line 588, in test_grouped_over_window_with_key
self.assertTrue(all([r[0] for r in result]))
AssertionError: False is not true
```
After:
```
======================================================================
ERROR: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
...
AssertionError: {'start': datetime.datetime(2018, 3, 20, 0, 0), 'end': datetime.datetime(2018, 3, 25, 0, 0)}, != {'start': datetime.datetime(2020, 3, 20, 0, 0), 'end': datetime.datetime(2020, 3, 25, 0, 0)}
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Improved existing test
Closes #28987 from BryanCutler/pandas-grouped-map-test-output-SPARK-32162.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-06 08:39:41 -04:00
|
|
|
# Make sure the key with group and window values are correct
|
|
|
|
for _, i in pdf.id.iteritems():
|
|
|
|
assert expected_key[i][0] == group, "{} != {}".format(expected_key[i][0], group)
|
|
|
|
assert expected_key[i][1] == window_range, \
|
|
|
|
"{} != {}".format(expected_key[i][1], window_range)
|
2019-10-11 19:19:13 -04:00
|
|
|
|
[SPARK-32162][PYTHON][TESTS] Improve error message of Pandas grouped map test with window
### What changes were proposed in this pull request?
Improve the error message in test GroupedMapInPandasTests.test_grouped_over_window_with_key to show the incorrect values.
### Why are the changes needed?
This test failure has come up often in Arrow testing because it tests a struct with timestamp values through a Pandas UDF. The current error message is not helpful as it doesn't show the incorrect values, only that it failed. This change will instead raise an assertion error with the incorrect values on a failure.
Before:
```
======================================================================
FAIL: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/spark/python/pyspark/sql/tests/test_pandas_grouped_map.py", line 588, in test_grouped_over_window_with_key
self.assertTrue(all([r[0] for r in result]))
AssertionError: False is not true
```
After:
```
======================================================================
ERROR: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
...
AssertionError: {'start': datetime.datetime(2018, 3, 20, 0, 0), 'end': datetime.datetime(2018, 3, 25, 0, 0)}, != {'start': datetime.datetime(2020, 3, 20, 0, 0), 'end': datetime.datetime(2020, 3, 25, 0, 0)}
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Improved existing test
Closes #28987 from BryanCutler/pandas-grouped-map-test-output-SPARK-32162.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-06 08:39:41 -04:00
|
|
|
return pdf.assign(result=[[group] * len(pdf)] * len(pdf))
|
|
|
|
|
|
|
|
result = df.groupby('group', window('ts', '5 days')).applyInPandas(f, df.schema)\
|
|
|
|
.select('id', 'result').collect()
|
|
|
|
|
|
|
|
for r in result:
|
|
|
|
self.assertListEqual(expected[r[0]], r[1])
|
2019-10-11 19:19:13 -04:00
|
|
|
|
[SPARK-31915][SQL][PYTHON] Resolve the grouping column properly per the case sensitivity in grouped and cogrouped pandas UDFs
### What changes were proposed in this pull request?
This is another approach to fix the issue. See the previous try https://github.com/apache/spark/pull/28745. It was too invasive so I took more conservative approach.
This PR proposes to resolve grouping attributes separately first so it can be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` are resolved without ambiguity.
Previously,
```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
return pdf.assign(Score=0.5)
df.groupby('COLUMN').apply(my_pandas_udf).show()
```
was failed as below:
```
pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;"
```
because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know which reference to take from the child projection.
After this fix, it resolves the child projection first with grouping keys and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the child projection that is positionally selected.
### Why are the changes needed?
To resolve grouping keys correctly.
### Does this PR introduce _any_ user-facing change?
Yes,
```python
from pyspark.sql.functions import *
df = spark.createDataFrame([[1, 1]], ["column", "Score"])
pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
def my_pandas_udf(pdf):
return pdf.assign(Score=0.5)
df.groupby('COLUMN').apply(my_pandas_udf).show()
```
```python
df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))
df1.groupby("COLUMN").cogroup(
df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema).show()
```
Before:
```
pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
```
```
pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
:- Project [COLUMN#9L, column#9L, value#10L]
: +- LogicalRDD [column#9L, value#10L], false
+- Project [COLUMN#13L, column#13L, value#14L]
+- LogicalRDD [column#13L, value#14L], false
```
After:
```
+------+-----+
|column|Score|
+------+-----+
| 1| 0.5|
+------+-----+
```
```
+------+-----+
|column|value|
+------+-----+
| 2| 2|
+------+-----+
```
### How was this patch tested?
Unittests were added and manually tested.
Closes #28777 from HyukjinKwon/SPARK-31915-another.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2020-06-10 18:54:07 -04:00
|
|
|
def test_case_insensitive_grouping_column(self):
|
|
|
|
# SPARK-31915: case-insensitive grouping column should work.
|
|
|
|
def my_pandas_udf(pdf):
|
|
|
|
return pdf.assign(score=0.5)
|
|
|
|
|
|
|
|
df = self.spark.createDataFrame([[1, 1]], ["column", "score"])
|
|
|
|
row = df.groupby('COLUMN').applyInPandas(
|
|
|
|
my_pandas_udf, schema="column integer, score float").first()
|
|
|
|
self.assertEquals(row.asDict(), Row(column=1, score=0.5).asDict())
|
|
|
|
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2020-08-30 22:23:31 -04:00
|
|
|
from pyspark.sql.tests.test_pandas_grouped_map import * # noqa: F401
|
2018-11-14 01:51:11 -05:00
|
|
|
|
|
|
|
try:
|
2020-09-24 01:15:36 -04:00
|
|
|
import xmlrunner # type: ignore[import]
|
2019-06-23 20:58:17 -04:00
|
|
|
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
|
2018-11-14 01:51:11 -05:00
|
|
|
except ImportError:
|
2018-11-14 23:30:52 -05:00
|
|
|
testRunner = None
|
|
|
|
unittest.main(testRunner=testRunner, verbosity=2)
|