[SPARK-36785][PYTHON] Fix DataFrame.isin when DataFrame has NaN value

### What changes were proposed in this pull request?
Fix DataFrame.isin when DataFrame has NaN value

### Why are the changes needed?
Fix DataFrame.isin when DataFrame has NaN value

``` python
>>> psdf = ps.DataFrame(
...     {"a": [None, 2, 3, 4, 5, 6, 7, 8, None], "b": [None, 5, None, 3, 2, 1, None, 0, 0], "c": [1, 5, 1, 3, 2, 1, 1, 0, 0]},
... )
>>> psdf
     a    b  c
0  NaN  NaN  1
1  2.0  5.0  5
2  3.0  NaN  1
3  4.0  3.0  3
4  5.0  2.0  2
5  6.0  1.0  1
6  7.0  NaN  1
7  8.0  0.0  0
8  NaN  0.0  0
>>> other = [1, 2, None]

>>> psdf.isin(other)
      a     b     c
0  None  None  True
1  True  None  None
2  None  None  True
3  None  None  None
4  None  True  True
5  None  True  True
6  None  None  True
7  None  None  None
8  None  None  None

>>> psdf.to_pandas().isin(other)
       a      b      c
0  False  False   True
1   True  False  False
2  False  False   True
3  False  False  False
4  False   True   True
5  False   True   True
6  False  False   True
7  False  False  False
8  False  False  False
```

### Does this PR introduce _any_ user-facing change?
After this PR

``` python
>>> psdf = ps.DataFrame(
...     {"a": [None, 2, 3, 4, 5, 6, 7, 8, None], "b": [None, 5, None, 3, 2, 1, None, 0, 0], "c": [1, 5, 1, 3, 2, 1, 1, 0, 0]},
... )
>>> psdf
     a    b  c
0  NaN  NaN  1
1  2.0  5.0  5
2  3.0  NaN  1
3  4.0  3.0  3
4  5.0  2.0  2
5  6.0  1.0  1
6  7.0  NaN  1
7  8.0  0.0  0
8  NaN  0.0  0
>>> other = [1, 2, None]

>>> psdf.isin(other)
       a      b      c
0  False  False   True
1   True  False  False
2  False  False   True
3  False  False  False
4  False   True   True
5  False   True   True
6  False  False   True
7  False  False  False
8  False  False  False
```

### How was this patch tested?
Unit tests

Closes #34040 from dgd-contributor/SPARK-36785_dataframe.isin_fix.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit cc182fe6f6)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
This commit is contained in:
dgd-contributor 2021-09-20 17:52:51 -07:00 committed by Takuya UESHIN
parent 5d0e51e943
commit 3d47c692d2
2 changed files with 55 additions and 14 deletions

View file

@ -7394,31 +7394,37 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
if col in values:
item = values[col]
item = item.tolist() if isinstance(item, np.ndarray) else list(item)
data_spark_columns.append(
self._internal.spark_column_for(self._internal.column_labels[i])
.isin(item)
.alias(self._internal.data_spark_column_names[i])
scol = self._internal.spark_column_for(self._internal.column_labels[i]).isin(
[SF.lit(v) for v in item]
)
scol = F.coalesce(scol, F.lit(False))
else:
data_spark_columns.append(
SF.lit(False).alias(self._internal.data_spark_column_names[i])
)
scol = SF.lit(False)
data_spark_columns.append(scol.alias(self._internal.data_spark_column_names[i]))
elif is_list_like(values):
values = (
cast(np.ndarray, values).tolist()
if isinstance(values, np.ndarray)
else list(values)
)
data_spark_columns += [
self._internal.spark_column_for(label)
.isin(values)
.alias(self._internal.spark_column_name_for(label))
for label in self._internal.column_labels
]
for label in self._internal.column_labels:
scol = self._internal.spark_column_for(label).isin([SF.lit(v) for v in values])
scol = F.coalesce(scol, F.lit(False))
data_spark_columns.append(scol.alias(self._internal.spark_column_name_for(label)))
else:
raise TypeError("Values should be iterable, Series, DataFrame or dict.")
return DataFrame(self._internal.with_new_columns(data_spark_columns))
return DataFrame(
self._internal.with_new_columns(
data_spark_columns,
data_fields=[
field.copy(dtype=np.dtype("bool"), spark_type=BooleanType(), nullable=False)
for field in self._internal.data_fields
],
)
)
@property
def shape(self) -> Tuple[int, int]:

View file

@ -1954,6 +1954,41 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
with self.assertRaisesRegex(TypeError, msg):
psdf.isin(1)
pdf = pd.DataFrame(
{
"a": [4, 2, 3, 4, 8, 6],
"b": [1, None, 9, 4, None, 4],
"c": [None, 5, None, 3, 2, 1],
},
)
psdf = ps.from_pandas(pdf)
if LooseVersion(pd.__version__) >= LooseVersion("1.2"):
self.assert_eq(psdf.isin([4, 3, 1, 1, None]), pdf.isin([4, 3, 1, 1, None]))
else:
expected = pd.DataFrame(
{
"a": [True, False, True, True, False, False],
"b": [True, False, False, True, False, True],
"c": [False, False, False, True, False, True],
}
)
self.assert_eq(psdf.isin([4, 3, 1, 1, None]), expected)
if LooseVersion(pd.__version__) >= LooseVersion("1.2"):
self.assert_eq(
psdf.isin({"b": [4, 3, 1, 1, None]}), pdf.isin({"b": [4, 3, 1, 1, None]})
)
else:
expected = pd.DataFrame(
{
"a": [False, False, False, False, False, False],
"b": [True, False, False, True, False, True],
"c": [False, False, False, False, False, False],
}
)
self.assert_eq(psdf.isin({"b": [4, 3, 1, 1, None]}), expected)
def test_merge(self):
left_pdf = pd.DataFrame(
{