[SPARK-35474] Enable disallow_untyped_defs mypy check for pyspark.pandas.indexing

### What changes were proposed in this pull request?

Adds more type annotations in the file:
`python/pyspark/pandas/spark/indexing.py`
and fixes the mypy check failures.

### Why are the changes needed?

We should enable more disallow_untyped_defs mypy checks.

### Does this PR introduce _any_ user-facing change?

Yes.
This PR adds more type annotations in pandas APIs on Spark module, which can impact interaction with development tools for users.

### How was this patch tested?

The mypy check with a new configuration and existing tests should pass.
`./dev/lint-python`

Closes #32738 from pingsutw/SPARK-35474.

Authored-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
This commit is contained in:
Kevin Su 2021-06-09 22:35:12 -07:00 committed by Takuya UESHIN
parent aa3de40773
commit cadd3a0588
3 changed files with 38 additions and 33 deletions

View file

@ -174,9 +174,6 @@ disallow_untyped_defs = False
[mypy-pyspark.pandas.groupby]
disallow_untyped_defs = False
[mypy-pyspark.pandas.indexing]
disallow_untyped_defs = False
[mypy-pyspark.pandas.namespace]
disallow_untyped_defs = False

View file

@ -3064,25 +3064,25 @@ class Frame(object, metaclass=ABCMeta):
@property
def at(self) -> AtIndexer:
return AtIndexer(self)
return AtIndexer(self) # type: ignore
at.__doc__ = AtIndexer.__doc__
@property
def iat(self) -> iAtIndexer:
return iAtIndexer(self)
return iAtIndexer(self) # type: ignore
iat.__doc__ = iAtIndexer.__doc__
@property
def iloc(self) -> iLocIndexer:
return iLocIndexer(self)
return iLocIndexer(self) # type: ignore
iloc.__doc__ = iLocIndexer.__doc__
@property
def loc(self) -> LocIndexer:
return LocIndexer(self)
return LocIndexer(self) # type: ignore
loc.__doc__ = LocIndexer.__doc__

View file

@ -57,7 +57,7 @@ if TYPE_CHECKING:
class IndexerLike(object):
def __init__(self, psdf_or_psser):
def __init__(self, psdf_or_psser: Union["Series", "DataFrame"]):
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series
@ -67,27 +67,27 @@ class IndexerLike(object):
self._psdf_or_psser = psdf_or_psser
@property
def _is_df(self):
def _is_df(self) -> bool:
from pyspark.pandas.frame import DataFrame
return isinstance(self._psdf_or_psser, DataFrame)
@property
def _is_series(self):
def _is_series(self) -> bool:
from pyspark.pandas.series import Series
return isinstance(self._psdf_or_psser, Series)
@property
def _psdf(self):
def _psdf(self) -> "DataFrame":
if self._is_df:
return self._psdf_or_psser
return cast("DataFrame", self._psdf_or_psser)
else:
assert self._is_series
return self._psdf_or_psser._psdf
@property
def _internal(self):
def _internal(self) -> InternalFrame:
return self._psdf._internal
@ -130,7 +130,7 @@ class AtIndexer(IndexerLike):
array([ 4, 20])
"""
def __getitem__(self, key) -> Union["Series", "DataFrame", Scalar]:
def __getitem__(self, key: Any) -> Union["Series", "DataFrame", Scalar]:
if self._is_df:
if not isinstance(key, tuple) or len(key) != 2:
raise TypeError("Use DataFrame.at like .at[row_index, column_name]")
@ -170,7 +170,7 @@ class AtIndexer(IndexerLike):
if len(pdf) < 1:
raise KeyError(name_like_string(row_sel))
values = pdf.iloc[:, 0].values
values = cast(pd.DataFrame, pdf).iloc[:, 0].values
return (
values if (len(row_sel) < self._internal.index_level or len(values) > 1) else values[0]
)
@ -217,7 +217,7 @@ class iAtIndexer(IndexerLike):
2
"""
def __getitem__(self, key) -> Union["Series", "DataFrame", Scalar]:
def __getitem__(self, key: Any) -> Union["Series", "DataFrame", Scalar]:
if self._is_df:
if not isinstance(key, tuple) or len(key) != 2:
raise TypeError(
@ -431,7 +431,7 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
"""Select columns by other type key."""
pass
def __getitem__(self, key) -> Union["Series", "DataFrame"]:
def __getitem__(self, key: Any) -> Union["Series", "DataFrame"]:
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series, first_series
@ -465,7 +465,7 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
if isinstance(rows_sel, Series) and not same_anchor(rows_sel, self._psdf_or_psser):
psdf = self._psdf_or_psser.copy()
temp_col = verify_temp_column_name(psdf, "__temp_col__")
temp_col = verify_temp_column_name(cast("DataFrame", psdf), "__temp_col__")
psdf[temp_col] = rows_sel
return type(self)(psdf)[psdf[temp_col], cols_sel][list(self._psdf_or_psser.columns)]
@ -534,7 +534,7 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
except AnalysisException:
raise KeyError(
"[{}] don't exist in columns".format(
[col._jc.toString() for col in data_spark_columns]
[col._jc.toString() for col in data_spark_columns] # type: ignore
)
)
@ -569,7 +569,7 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
else:
return psdf_or_psser
def __setitem__(self, key, value):
def __setitem__(self, key: Any, value: Any) -> None:
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series, first_series
@ -628,7 +628,9 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
if cond is None:
cond = F.lit(True)
if limit is not None:
cond = cond & (self._internal.spark_frame[self._sequence_col] < F.lit(limit))
cond = cond & (
self._internal.spark_frame[cast(iLocIndexer, self)._sequence_col] < F.lit(limit)
)
if isinstance(value, (Series, spark.Column)):
if remaining_index is not None and remaining_index == 0:
@ -675,7 +677,7 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
isinstance(value, Series)
and (isinstance(self, iLocIndexer) or not same_anchor(value, self._psdf_or_psser))
):
psdf = self._psdf_or_psser.copy()
psdf = cast(DataFrame, self._psdf_or_psser.copy())
temp_natural_order = verify_temp_column_name(psdf, "__temp_natural_order__")
temp_key_col = verify_temp_column_name(psdf, "__temp_key_col__")
temp_value_col = verify_temp_column_name(psdf, "__temp_value_col__")
@ -705,13 +707,15 @@ class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
return
cond, limit, remaining_index = self._select_rows(rows_sel)
missing_keys = []
missing_keys = [] # type: Optional[List[Tuple]]
_, data_spark_columns, _, _, _ = self._select_cols(cols_sel, missing_keys=missing_keys)
if cond is None:
cond = F.lit(True)
if limit is not None:
cond = cond & (self._internal.spark_frame[self._sequence_col] < F.lit(limit))
cond = cond & (
self._internal.spark_frame[cast(iLocIndexer, self)._sequence_col] < F.lit(limit)
)
if isinstance(value, (Series, spark.Column)):
if remaining_index is not None and remaining_index == 0:
@ -972,7 +976,7 @@ class LocIndexer(LocIndexerLike):
"""
@staticmethod
def _NotImplemented(description):
def _NotImplemented(description: str) -> SparkPandasNotImplementedError:
return SparkPandasNotImplementedError(
description=description,
pandas_function=".loc[..., ...]",
@ -1158,7 +1162,11 @@ class LocIndexer(LocIndexerLike):
)
def _get_from_multiindex_column(
self, key, missing_keys, labels=None, recursed=0
self,
key: Optional[Tuple],
missing_keys: Optional[List[Tuple]],
labels: Optional[List[Tuple]] = None,
recursed: int = 0,
) -> Tuple[
List[Tuple], Optional[List[spark.Column]], List[InternalField], bool, Optional[Tuple]
]:
@ -1186,7 +1194,7 @@ class LocIndexer(LocIndexerLike):
else:
returns_series = all(lbl is None or len(lbl) == 0 for _, lbl in labels)
if returns_series:
labels = set(label for label, _ in labels)
labels = set(label for label, _ in labels) # type: ignore
assert len(labels) == 1
label = list(labels)[0]
column_labels = [label]
@ -1513,7 +1521,7 @@ class iLocIndexer(LocIndexerLike):
"""
@staticmethod
def _NotImplemented(description):
def _NotImplemented(description: str) -> SparkPandasNotImplementedError:
return SparkPandasNotImplementedError(
description=description,
pandas_function=".iloc[..., ...]",
@ -1521,7 +1529,7 @@ class iLocIndexer(LocIndexerLike):
)
@lazy_property
def _internal(self):
def _internal(self) -> "InternalFrame":
# Use resolved_copy to fix the natural order.
internal = super()._internal.resolved_copy
sdf, force_nullable = InternalFrame.attach_distributed_sequence_column(
@ -1542,7 +1550,7 @@ class iLocIndexer(LocIndexerLike):
)
@lazy_property
def _sequence_col(self):
def _sequence_col(self) -> Union[Any, Tuple]:
# Use resolved_copy to fix the natural order.
internal = super()._internal.resolved_copy
return verify_temp_column_name(internal.spark_frame, "__distributed_sequence_column__")
@ -1568,7 +1576,7 @@ class iLocIndexer(LocIndexerLike):
def _select_rows_by_slice(
self, rows_sel: slice
) -> Tuple[Optional[spark.Column], Optional[int], Optional[int]]:
def verify_type(i):
def verify_type(i: int) -> None:
if not isinstance(i, int):
raise TypeError(
"cannot do slice indexing with these indexers [{}] of {}".format(i, type(i))
@ -1780,7 +1788,7 @@ class iLocIndexer(LocIndexerLike):
"listlike of integers, boolean array] types, got {}".format(cols_sel)
)
def __setitem__(self, key, value):
def __setitem__(self, key: Any, value: Any) -> None:
if is_list_like(value) and not isinstance(value, spark.Column):
iloc_item = self[key]
if not is_list_like(key) or not is_list_like(iloc_item):
@ -1811,7 +1819,7 @@ class iLocIndexer(LocIndexerLike):
del self._sequence_col
def _test():
def _test() -> None:
import os
import doctest
import sys