# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ Base and utility classes for pandas-on-Spark objects. """ from abc import ABCMeta, abstractmethod from functools import wraps, partial from itertools import chain from typing import Any, Callable, Optional, Sequence, Tuple, TypeVar, Union, cast, TYPE_CHECKING import numpy as np import pandas as pd # noqa: F401 from pandas.api.types import is_list_like, CategoricalDtype from pyspark import sql as spark from pyspark.sql import functions as F, Window, Column from pyspark.sql.types import ( DoubleType, FloatType, LongType, ) from pyspark import pandas as ps # For running doctests and reference resolution in PyCharm. from pyspark.pandas.config import get_option, option_context from pyspark.pandas.internal import ( InternalField, InternalFrame, NATURAL_ORDER_COLUMN_NAME, SPARK_DEFAULT_INDEX_NAME, ) from pyspark.pandas.spark import functions as SF from pyspark.pandas.spark.accessors import SparkIndexOpsMethods from pyspark.pandas.typedef import ( Dtype, extension_dtypes, ) from pyspark.pandas.utils import ( combine_frames, same_anchor, scol_for, validate_axis, ERROR_MESSAGE_CANNOT_COMBINE, ) from pyspark.pandas.frame import DataFrame if TYPE_CHECKING: from pyspark.pandas.data_type_ops.base import DataTypeOps # noqa: F401 (SPARK-34943) from pyspark.pandas.indexes import Index # noqa: F401 (SPARK-34943) from pyspark.pandas.series import Series # noqa: F401 (SPARK-34943) T_IndexOps = TypeVar("T_IndexOps", bound="IndexOpsMixin") IndexOpsLike = Union["Series", "Index"] def should_alignment_for_column_op(self: IndexOpsLike, other: IndexOpsLike) -> bool: from pyspark.pandas.series import Series if isinstance(self, Series) and isinstance(other, Series): return not same_anchor(self, other) else: return self._internal.spark_frame is not other._internal.spark_frame def align_diff_index_ops( func: Callable[..., Column], this_index_ops: IndexOpsLike, *args: Any ) -> IndexOpsLike: """ Align the `IndexOpsMixin` objects and apply the function. Parameters ---------- func : The function to apply this_index_ops : IndexOpsMixin A base `IndexOpsMixin` object args : list of other arguments including other `IndexOpsMixin` objects Returns ------- `Index` if all `this_index_ops` and arguments are `Index`; otherwise `Series` """ from pyspark.pandas.indexes import Index from pyspark.pandas.series import Series, first_series cols = [arg for arg in args if isinstance(arg, IndexOpsMixin)] if isinstance(this_index_ops, Series) and all(isinstance(col, Series) for col in cols): combined = combine_frames( this_index_ops.to_frame(), *[cast(Series, col).rename(i) for i, col in enumerate(cols)], how="full" ) return column_op(func)( combined["this"]._psser_for(combined["this"]._internal.column_labels[0]), *[ combined["that"]._psser_for(label) for label in combined["that"]._internal.column_labels ] ).rename(this_index_ops.name) else: # This could cause as many counts, reset_index calls, joins for combining # as the number of `Index`s in `args`. So far it's fine since we can assume the ops # only work between at most two `Index`s. We might need to fix it in the future. self_len = len(this_index_ops) if any(len(col) != self_len for col in args if isinstance(col, IndexOpsMixin)): raise ValueError("operands could not be broadcast together with shapes") with option_context("compute.default_index_type", "distributed-sequence"): if isinstance(this_index_ops, Index) and all(isinstance(col, Index) for col in cols): return Index( column_op(func)( this_index_ops.to_series().reset_index(drop=True), *[ arg.to_series().reset_index(drop=True) if isinstance(arg, Index) else arg for arg in args ] ).sort_index(), name=this_index_ops.name, ) elif isinstance(this_index_ops, Series): this = cast(DataFrame, this_index_ops.reset_index()) that = [ cast(Series, col.to_series() if isinstance(col, Index) else col) .rename(i) .reset_index(drop=True) for i, col in enumerate(cols) ] combined = combine_frames(this, *that, how="full").sort_index() combined = combined.set_index( combined._internal.column_labels[: this_index_ops._internal.index_level] ) combined.index.names = this_index_ops._internal.index_names return column_op(func)( first_series(combined["this"]), *[ combined["that"]._psser_for(label) for label in combined["that"]._internal.column_labels ] ).rename(this_index_ops.name) else: this = cast(Index, this_index_ops).to_frame().reset_index(drop=True) that_series = next(col for col in cols if isinstance(col, Series)) that_frame = that_series._psdf[ [ cast(Series, col.to_series() if isinstance(col, Index) else col).rename(i) for i, col in enumerate(cols) ] ] combined = combine_frames(this, that_frame.reset_index()).sort_index() self_index = ( combined["this"].set_index(combined["this"]._internal.column_labels).index ) other = combined["that"].set_index( combined["that"]._internal.column_labels[: that_series._internal.index_level] ) other.index.names = that_series._internal.index_names return column_op(func)( self_index, *[ other._psser_for(label) for label, col in zip(other._internal.column_labels, cols) ] ).rename(that_series.name) def booleanize_null(scol: Column, f: Callable[..., Column]) -> Column: """ Booleanize Null in Spark Column """ comp_ops = [ getattr(Column, "__{}__".format(comp_op)) for comp_op in ["eq", "ne", "lt", "le", "ge", "gt"] ] if f in comp_ops: # if `f` is "!=", fill null with True otherwise False filler = f == Column.__ne__ scol = F.when(scol.isNull(), filler).otherwise(scol) return scol def column_op(f: Callable[..., Column]) -> Callable[..., IndexOpsLike]: """ A decorator that wraps APIs taking/returning Spark Column so that pandas-on-Spark Series can be supported too. If this decorator is used for the `f` function that takes Spark Column and returns Spark Column, decorated `f` takes pandas-on-Spark Series as well and returns pandas-on-Spark Series. :param f: a function that takes Spark Column and returns Spark Column. :param self: pandas-on-Spark Series :param args: arguments that the function `f` takes. """ @wraps(f) def wrapper(self: IndexOpsLike, *args: Any) -> IndexOpsLike: from pyspark.pandas.indexes.base import Index from pyspark.pandas.series import Series # It is possible for the function `f` takes other arguments than Spark Column. # To cover this case, explicitly check if the argument is pandas-on-Spark Series and # extract Spark Column. For other arguments, they are used as are. cols = [arg for arg in args if isinstance(arg, (Series, Index))] if all(not should_alignment_for_column_op(self, col) for col in cols): # Same DataFrame anchors scol = f( self.spark.column, *[arg.spark.column if isinstance(arg, IndexOpsMixin) else arg for arg in args] ) field = InternalField.from_struct_field( self._internal.spark_frame.select(scol).schema[0], use_extension_dtypes=any( isinstance(col.dtype, extension_dtypes) for col in [self] + cols ), ) if not field.is_extension_dtype: scol = booleanize_null(scol, f).alias(field.name) if isinstance(self, Series) or not any(isinstance(col, Series) for col in cols): index_ops = self._with_new_scol(scol, field=field) else: psser = next(col for col in cols if isinstance(col, Series)) index_ops = psser._with_new_scol(scol, field=field) elif get_option("compute.ops_on_diff_frames"): index_ops = align_diff_index_ops(f, self, *args) else: raise ValueError(ERROR_MESSAGE_CANNOT_COMBINE) if not all(self.name == col.name for col in cols): index_ops = index_ops.rename(None) return index_ops return wrapper def numpy_column_op(f: Callable[..., Column]) -> Callable[..., IndexOpsLike]: @wraps(f) def wrapper(self: IndexOpsLike, *args: Any) -> IndexOpsLike: # PySpark does not support NumPy type out of the box. For now, we convert NumPy types # into some primitive types understandable in PySpark. new_args = [] for arg in args: # TODO: This is a quick hack to support NumPy type. We should revisit this. if isinstance(self.spark.data_type, LongType) and isinstance(arg, np.timedelta64): new_args.append(float(arg / np.timedelta64(1, "s"))) else: new_args.append(arg) return column_op(f)(self, *new_args) return wrapper class IndexOpsMixin(object, metaclass=ABCMeta): """common ops mixin to support a unified interface / docs for Series / Index Assuming there are following attributes or properties and function. """ @property @abstractmethod def _internal(self) -> InternalFrame: pass @property @abstractmethod def _psdf(self) -> DataFrame: pass @abstractmethod def _with_new_scol( self: T_IndexOps, scol: spark.Column, *, field: Optional[InternalField] = None ) -> T_IndexOps: pass @property @abstractmethod def _column_label(self) -> Optional[Tuple]: pass @property @abstractmethod def spark(self: T_IndexOps) -> SparkIndexOpsMethods[T_IndexOps]: pass @property def _dtype_op(self) -> "DataTypeOps": from pyspark.pandas.data_type_ops.base import DataTypeOps return DataTypeOps(self.dtype, self.spark.data_type) @abstractmethod def copy(self: T_IndexOps) -> T_IndexOps: pass # arithmetic operators def __neg__(self: T_IndexOps) -> T_IndexOps: return cast(T_IndexOps, column_op(Column.__neg__)(self)) def __add__(self, other: Any) -> IndexOpsLike: return self._dtype_op.add(self, other) def __sub__(self, other: Any) -> IndexOpsLike: return self._dtype_op.sub(self, other) def __mul__(self, other: Any) -> IndexOpsLike: return self._dtype_op.mul(self, other) def __truediv__(self, other: Any) -> IndexOpsLike: """ __truediv__ has different behaviour between pandas and PySpark for several cases. 1. When divide np.inf by zero, PySpark returns null whereas pandas returns np.inf 2. When divide positive number by zero, PySpark returns null whereas pandas returns np.inf 3. When divide -np.inf by zero, PySpark returns null whereas pandas returns -np.inf 4. When divide negative number by zero, PySpark returns null whereas pandas returns -np.inf +-------------------------------------------+ | dividend (divisor: 0) | PySpark | pandas | |-----------------------|---------|---------| | np.inf | null | np.inf | | -np.inf | null | -np.inf | | 10 | null | np.inf | | -10 | null | -np.inf | +-----------------------|---------|---------+ """ return self._dtype_op.truediv(self, other) def __mod__(self, other: Any) -> IndexOpsLike: return self._dtype_op.mod(self, other) def __radd__(self, other: Any) -> IndexOpsLike: return self._dtype_op.radd(self, other) def __rsub__(self, other: Any) -> IndexOpsLike: return self._dtype_op.rsub(self, other) def __rmul__(self, other: Any) -> IndexOpsLike: return self._dtype_op.rmul(self, other) def __rtruediv__(self, other: Any) -> IndexOpsLike: return self._dtype_op.rtruediv(self, other) def __floordiv__(self, other: Any) -> IndexOpsLike: """ __floordiv__ has different behaviour between pandas and PySpark for several cases. 1. When divide np.inf by zero, PySpark returns null whereas pandas returns np.inf 2. When divide positive number by zero, PySpark returns null whereas pandas returns np.inf 3. When divide -np.inf by zero, PySpark returns null whereas pandas returns -np.inf 4. When divide negative number by zero, PySpark returns null whereas pandas returns -np.inf +-------------------------------------------+ | dividend (divisor: 0) | PySpark | pandas | |-----------------------|---------|---------| | np.inf | null | np.inf | | -np.inf | null | -np.inf | | 10 | null | np.inf | | -10 | null | -np.inf | +-----------------------|---------|---------+ """ return self._dtype_op.floordiv(self, other) def __rfloordiv__(self, other: Any) -> IndexOpsLike: return self._dtype_op.rfloordiv(self, other) def __rmod__(self, other: Any) -> IndexOpsLike: return self._dtype_op.rmod(self, other) def __pow__(self, other: Any) -> IndexOpsLike: return self._dtype_op.pow(self, other) def __rpow__(self, other: Any) -> IndexOpsLike: return self._dtype_op.rpow(self, other) def __abs__(self: T_IndexOps) -> T_IndexOps: return cast(T_IndexOps, column_op(F.abs)(self)) # comparison operators def __eq__(self, other: Any) -> IndexOpsLike: # type: ignore[override] return column_op(Column.__eq__)(self, other) def __ne__(self, other: Any) -> IndexOpsLike: # type: ignore[override] return column_op(Column.__ne__)(self, other) __lt__ = column_op(Column.__lt__) __le__ = column_op(Column.__le__) __ge__ = column_op(Column.__ge__) __gt__ = column_op(Column.__gt__) def __invert__(self: T_IndexOps) -> T_IndexOps: return cast(T_IndexOps, column_op(Column.__invert__)(self)) # `and`, `or`, `not` cannot be overloaded in Python, # so use bitwise operators as boolean operators def __and__(self, other: Any) -> IndexOpsLike: return self._dtype_op.__and__(self, other) def __or__(self, other: Any) -> IndexOpsLike: return self._dtype_op.__or__(self, other) def __rand__(self, other: Any) -> IndexOpsLike: return self._dtype_op.rand(self, other) def __ror__(self, other: Any) -> IndexOpsLike: return self._dtype_op.ror(self, other) def __len__(self) -> int: return len(self._psdf) # NDArray Compat def __array_ufunc__( self, ufunc: Callable, method: str, *inputs: Any, **kwargs: Any ) -> IndexOpsLike: from pyspark.pandas import numpy_compat # Try dunder methods first. result = numpy_compat.maybe_dispatch_ufunc_to_dunder_op( self, ufunc, method, *inputs, **kwargs ) # After that, we try with PySpark APIs. if result is NotImplemented: result = numpy_compat.maybe_dispatch_ufunc_to_spark_func( self, ufunc, method, *inputs, **kwargs ) if result is not NotImplemented: return cast(IndexOpsLike, result) else: # TODO: support more APIs? raise NotImplementedError( "pandas-on-Spark objects currently do not support %s." % ufunc ) @property def dtype(self) -> Dtype: """Return the dtype object of the underlying data. Examples -------- >>> s = ps.Series([1, 2, 3]) >>> s.dtype dtype('int64') >>> s = ps.Series(list('abc')) >>> s.dtype dtype('O') >>> s = ps.Series(pd.date_range('20130101', periods=3)) >>> s.dtype dtype('>> s.rename("a").to_frame().set_index("a").index.dtype dtype(' bool: """ Returns true if the current object is empty. Otherwise, returns false. >>> ps.range(10).id.empty False >>> ps.range(0).id.empty True >>> ps.DataFrame({}, index=list('abc')).index.empty False """ return self._internal.resolved_copy.spark_frame.rdd.isEmpty() @property def hasnans(self) -> bool: """ Return True if it has any missing values. Otherwise, it returns False. >>> ps.DataFrame({}, index=list('abc')).index.hasnans False >>> ps.Series(['a', None]).hasnans True >>> ps.Series([1.0, 2.0, np.nan]).hasnans True >>> ps.Series([1, 2, 3]).hasnans False >>> (ps.Series([1.0, 2.0, np.nan]) + 1).hasnans True >>> ps.Series([1, 2, 3]).rename("a").to_frame().set_index("a").index.hasnans False """ sdf = self._internal.spark_frame scol = self.spark.column if isinstance(self.spark.data_type, (DoubleType, FloatType)): return sdf.select(F.max(scol.isNull() | F.isnan(scol))).collect()[0][0] else: return sdf.select(F.max(scol.isNull())).collect()[0][0] @property def is_monotonic(self) -> bool: """ Return boolean if values in the object are monotonically increasing. .. note:: the current implementation of is_monotonic requires to shuffle and aggregate multiple times to check the order locally and globally, which is potentially expensive. In case of multi-index, all data are transferred to single node which can easily cause out-of-memory error currently. .. note:: Disable the Spark config `spark.sql.optimizer.nestedSchemaPruning.enabled` for multi-index if you're using pandas-on-Spark < 1.7.0 with PySpark 3.1.1. Returns ------- is_monotonic : bool Examples -------- >>> ser = ps.Series(['1/1/2018', '3/1/2018', '4/1/2018']) >>> ser.is_monotonic True >>> df = ps.DataFrame({'dates': [None, '1/1/2018', '2/1/2018', '3/1/2018']}) >>> df.dates.is_monotonic False >>> df.index.is_monotonic True >>> ser = ps.Series([1]) >>> ser.is_monotonic True >>> ser = ps.Series([]) >>> ser.is_monotonic True >>> ser.rename("a").to_frame().set_index("a").index.is_monotonic True >>> ser = ps.Series([5, 4, 3, 2, 1], index=[1, 2, 3, 4, 5]) >>> ser.is_monotonic False >>> ser.index.is_monotonic True Support for MultiIndex >>> midx = ps.MultiIndex.from_tuples( ... [('x', 'a'), ('x', 'b'), ('y', 'c'), ('y', 'd'), ('z', 'e')]) >>> midx # doctest: +SKIP MultiIndex([('x', 'a'), ('x', 'b'), ('y', 'c'), ('y', 'd'), ('z', 'e')], ) >>> midx.is_monotonic True >>> midx = ps.MultiIndex.from_tuples( ... [('z', 'a'), ('z', 'b'), ('y', 'c'), ('y', 'd'), ('x', 'e')]) >>> midx # doctest: +SKIP MultiIndex([('z', 'a'), ('z', 'b'), ('y', 'c'), ('y', 'd'), ('x', 'e')], ) >>> midx.is_monotonic False """ return self._is_monotonic("increasing") is_monotonic_increasing = is_monotonic @property def is_monotonic_decreasing(self) -> bool: """ Return boolean if values in the object are monotonically decreasing. .. note:: the current implementation of is_monotonic_decreasing requires to shuffle and aggregate multiple times to check the order locally and globally, which is potentially expensive. In case of multi-index, all data are transferred to single node which can easily cause out-of-memory error currently. .. note:: Disable the Spark config `spark.sql.optimizer.nestedSchemaPruning.enabled` for multi-index if you're using pandas-on-Spark < 1.7.0 with PySpark 3.1.1. Returns ------- is_monotonic : bool Examples -------- >>> ser = ps.Series(['4/1/2018', '3/1/2018', '1/1/2018']) >>> ser.is_monotonic_decreasing True >>> df = ps.DataFrame({'dates': [None, '3/1/2018', '2/1/2018', '1/1/2018']}) >>> df.dates.is_monotonic_decreasing False >>> df.index.is_monotonic_decreasing False >>> ser = ps.Series([1]) >>> ser.is_monotonic_decreasing True >>> ser = ps.Series([]) >>> ser.is_monotonic_decreasing True >>> ser.rename("a").to_frame().set_index("a").index.is_monotonic_decreasing True >>> ser = ps.Series([5, 4, 3, 2, 1], index=[1, 2, 3, 4, 5]) >>> ser.is_monotonic_decreasing True >>> ser.index.is_monotonic_decreasing False Support for MultiIndex >>> midx = ps.MultiIndex.from_tuples( ... [('x', 'a'), ('x', 'b'), ('y', 'c'), ('y', 'd'), ('z', 'e')]) >>> midx # doctest: +SKIP MultiIndex([('x', 'a'), ('x', 'b'), ('y', 'c'), ('y', 'd'), ('z', 'e')], ) >>> midx.is_monotonic_decreasing False >>> midx = ps.MultiIndex.from_tuples( ... [('z', 'e'), ('z', 'd'), ('y', 'c'), ('y', 'b'), ('x', 'a')]) >>> midx # doctest: +SKIP MultiIndex([('z', 'a'), ('z', 'b'), ('y', 'c'), ('y', 'd'), ('x', 'e')], ) >>> midx.is_monotonic_decreasing True """ return self._is_monotonic("decreasing") def _is_locally_monotonic_spark_column(self, order: str) -> Column: window = ( Window.partitionBy(F.col("__partition_id")) .orderBy(NATURAL_ORDER_COLUMN_NAME) .rowsBetween(-1, -1) ) if order == "increasing": return (F.col("__origin") >= F.lag(F.col("__origin"), 1).over(window)) & F.col( "__origin" ).isNotNull() else: return (F.col("__origin") <= F.lag(F.col("__origin"), 1).over(window)) & F.col( "__origin" ).isNotNull() def _is_monotonic(self, order: str) -> bool: assert order in ("increasing", "decreasing") sdf = self._internal.spark_frame sdf = ( sdf.select( F.spark_partition_id().alias( "__partition_id" ), # Make sure we use the same partition id in the whole job. F.col(NATURAL_ORDER_COLUMN_NAME), self.spark.column.alias("__origin"), ) .select( F.col("__partition_id"), F.col("__origin"), self._is_locally_monotonic_spark_column(order).alias( "__comparison_within_partition" ), ) .groupby(F.col("__partition_id")) .agg( F.min(F.col("__origin")).alias("__partition_min"), F.max(F.col("__origin")).alias("__partition_max"), F.min(F.coalesce(F.col("__comparison_within_partition"), SF.lit(True))).alias( "__comparison_within_partition" ), ) ) # Now we're windowing the aggregation results without partition specification. # The number of rows here will be as the same of partitions, which is expected # to be small. window = Window.orderBy(F.col("__partition_id")).rowsBetween(-1, -1) if order == "increasing": comparison_col = F.col("__partition_min") >= F.lag(F.col("__partition_max"), 1).over( window ) else: comparison_col = F.col("__partition_min") <= F.lag(F.col("__partition_max"), 1).over( window ) sdf = sdf.select( comparison_col.alias("__comparison_between_partitions"), F.col("__comparison_within_partition"), ) ret = sdf.select( F.min(F.coalesce(F.col("__comparison_between_partitions"), SF.lit(True))) & F.min(F.coalesce(F.col("__comparison_within_partition"), SF.lit(True))) ).collect()[0][0] if ret is None: return True else: return ret @property def ndim(self) -> int: """ Return an int representing the number of array dimensions. Return 1 for Series / Index / MultiIndex. Examples -------- For Series >>> s = ps.Series([None, 1, 2, 3, 4], index=[4, 5, 2, 1, 8]) >>> s.ndim 1 For Index >>> s.index.ndim 1 For MultiIndex >>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [1, 1, 1, 1, 1, 2, 1, 2, 2]]) >>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx) >>> s.index.ndim 1 """ return 1 def astype(self: T_IndexOps, dtype: Union[str, type, Dtype]) -> T_IndexOps: """ Cast a pandas-on-Spark object to a specified dtype ``dtype``. Parameters ---------- dtype : data type Use a numpy.dtype or Python type to cast entire pandas object to the same type. Returns ------- casted : same type as caller See Also -------- to_datetime : Convert argument to datetime. Examples -------- >>> ser = ps.Series([1, 2], dtype='int32') >>> ser 0 1 1 2 dtype: int32 >>> ser.astype('int64') 0 1 1 2 dtype: int64 >>> ser.rename("a").to_frame().set_index("a").index.astype('int64') Int64Index([1, 2], dtype='int64', name='a') """ return cast(T_IndexOps, self._dtype_op.astype(cast(IndexOpsLike, self), dtype)) def isin(self: T_IndexOps, values: Sequence[Any]) -> T_IndexOps: """ Check whether `values` are contained in Series or Index. Return a boolean Series or Index showing whether each element in the Series matches an element in the passed sequence of `values` exactly. Parameters ---------- values : set or list-like The sequence of values to test. Returns ------- isin : Series (bool dtype) or Index (bool dtype) Examples -------- >>> s = ps.Series(['lama', 'cow', 'lama', 'beetle', 'lama', ... 'hippo'], name='animal') >>> s.isin(['cow', 'lama']) 0 True 1 True 2 True 3 False 4 True 5 False Name: animal, dtype: bool Passing a single string as ``s.isin('lama')`` will raise an error. Use a list of one element instead: >>> s.isin(['lama']) 0 True 1 False 2 True 3 False 4 True 5 False Name: animal, dtype: bool >>> s.rename("a").to_frame().set_index("a").index.isin(['lama']) Index([True, False, True, False, True, False], dtype='object', name='a') """ if not is_list_like(values): raise TypeError( "only list-like objects are allowed to be passed" " to isin(), you passed a [{values_type}]".format(values_type=type(values).__name__) ) values = values.tolist() if isinstance(values, np.ndarray) else list(values) return self._with_new_scol(self.spark.column.isin([SF.lit(v) for v in values])) def isnull(self: T_IndexOps) -> T_IndexOps: """ Detect existing (non-missing) values. Return a boolean same-sized object indicating if the values are NA. NA values, such as None or numpy.NaN, gets mapped to True values. Everything else gets mapped to False values. Characters such as empty strings '' or numpy.inf are not considered NA values (unless you set pandas.options.mode.use_inf_as_na = True). Returns ------- Series or Index : Mask of bool values for each element in Series that indicates whether an element is not an NA value. Examples -------- >>> ser = ps.Series([5, 6, np.NaN]) >>> ser.isna() # doctest: +NORMALIZE_WHITESPACE 0 False 1 False 2 True dtype: bool >>> ser.rename("a").to_frame().set_index("a").index.isna() Index([False, False, True], dtype='object', name='a') """ from pyspark.pandas.indexes import MultiIndex if isinstance(self, MultiIndex): raise NotImplementedError("isna is not defined for MultiIndex") return self._dtype_op.isnull(self) isna = isnull def notnull(self: T_IndexOps) -> T_IndexOps: """ Detect existing (non-missing) values. Return a boolean same-sized object indicating if the values are not NA. Non-missing values get mapped to True. Characters such as empty strings '' or numpy.inf are not considered NA values (unless you set pandas.options.mode.use_inf_as_na = True). NA values, such as None or numpy.NaN, get mapped to False values. Returns ------- Series or Index : Mask of bool values for each element in Series that indicates whether an element is not an NA value. Examples -------- Show which entries in a Series are not NA. >>> ser = ps.Series([5, 6, np.NaN]) >>> ser 0 5.0 1 6.0 2 NaN dtype: float64 >>> ser.notna() 0 True 1 True 2 False dtype: bool >>> ser.rename("a").to_frame().set_index("a").index.notna() Index([True, True, False], dtype='object', name='a') """ from pyspark.pandas.indexes import MultiIndex if isinstance(self, MultiIndex): raise NotImplementedError("notna is not defined for MultiIndex") return (~self.isnull()).rename(self.name) # type: ignore notna = notnull # TODO: axis, skipna, and many arguments should be implemented. def all(self, axis: Union[int, str] = 0) -> bool: """ Return whether all elements are True. Returns True unless there at least one element within a series that is False or equivalent (e.g. zero or empty) Parameters ---------- axis : {0 or 'index'}, default 0 Indicate which axis or axes should be reduced. * 0 / 'index' : reduce the index, return a Series whose index is the original column labels. Examples -------- >>> ps.Series([True, True]).all() True >>> ps.Series([True, False]).all() False >>> ps.Series([0, 1]).all() False >>> ps.Series([1, 2, 3]).all() True >>> ps.Series([True, True, None]).all() True >>> ps.Series([True, False, None]).all() False >>> ps.Series([]).all() True >>> ps.Series([np.nan]).all() True >>> df = ps.Series([True, False, None]).rename("a").to_frame() >>> df.set_index("a").index.all() False """ axis = validate_axis(axis) if axis != 0: raise NotImplementedError('axis should be either 0 or "index" currently.') sdf = self._internal.spark_frame.select(self.spark.column) col = scol_for(sdf, sdf.columns[0]) # Note that we're ignoring `None`s here for now. # any and every was added as of Spark 3.0 # ret = sdf.select(F.expr("every(CAST(`%s` AS BOOLEAN))" % sdf.columns[0])).collect()[0][0] # Here we use min as its alternative: ret = sdf.select(F.min(F.coalesce(col.cast("boolean"), SF.lit(True)))).collect()[0][0] if ret is None: return True else: return ret # TODO: axis, skipna, and many arguments should be implemented. def any(self, axis: Union[int, str] = 0) -> bool: """ Return whether any element is True. Returns False unless there at least one element within a series that is True or equivalent (e.g. non-zero or non-empty). Parameters ---------- axis : {0 or 'index'}, default 0 Indicate which axis or axes should be reduced. * 0 / 'index' : reduce the index, return a Series whose index is the original column labels. Examples -------- >>> ps.Series([False, False]).any() False >>> ps.Series([True, False]).any() True >>> ps.Series([0, 0]).any() False >>> ps.Series([0, 1, 2]).any() True >>> ps.Series([False, False, None]).any() False >>> ps.Series([True, False, None]).any() True >>> ps.Series([]).any() False >>> ps.Series([np.nan]).any() False >>> df = ps.Series([True, False, None]).rename("a").to_frame() >>> df.set_index("a").index.any() True """ axis = validate_axis(axis) if axis != 0: raise NotImplementedError('axis should be either 0 or "index" currently.') sdf = self._internal.spark_frame.select(self.spark.column) col = scol_for(sdf, sdf.columns[0]) # Note that we're ignoring `None`s here for now. # any and every was added as of Spark 3.0 # ret = sdf.select(F.expr("any(CAST(`%s` AS BOOLEAN))" % sdf.columns[0])).collect()[0][0] # Here we use max as its alternative: ret = sdf.select(F.max(F.coalesce(col.cast("boolean"), SF.lit(False)))).collect()[0][0] if ret is None: return False else: return ret # TODO: add frep and axis parameter def shift(self: T_IndexOps, periods: int = 1, fill_value: Optional[Any] = None) -> T_IndexOps: """ Shift Series/Index by desired number of periods. .. note:: the current implementation of shift uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Parameters ---------- periods : int Number of periods to shift. Can be positive or negative. fill_value : object, optional The scalar value to use for newly introduced missing values. The default depends on the dtype of self. For numeric data, np.nan is used. Returns ------- Copy of input Series/Index, shifted. Examples -------- >>> df = ps.DataFrame({'Col1': [10, 20, 15, 30, 45], ... 'Col2': [13, 23, 18, 33, 48], ... 'Col3': [17, 27, 22, 37, 52]}, ... columns=['Col1', 'Col2', 'Col3']) >>> df.Col1.shift(periods=3) 0 NaN 1 NaN 2 NaN 3 10.0 4 20.0 Name: Col1, dtype: float64 >>> df.Col2.shift(periods=3, fill_value=0) 0 0 1 0 2 0 3 13 4 23 Name: Col2, dtype: int64 >>> df.index.shift(periods=3, fill_value=0) Int64Index([0, 0, 0, 0, 1], dtype='int64') """ return self._shift(periods, fill_value).spark.analyzed def _shift( self: T_IndexOps, periods: int, fill_value: Any, *, part_cols: Sequence[Union[str, Column]] = () ) -> T_IndexOps: if not isinstance(periods, int): raise TypeError("periods should be an int; however, got [%s]" % type(periods).__name__) col = self.spark.column window = ( Window.partitionBy(*part_cols) .orderBy(NATURAL_ORDER_COLUMN_NAME) .rowsBetween(-periods, -periods) ) lag_col = F.lag(col, periods).over(window) col = F.when(lag_col.isNull() | F.isnan(lag_col), fill_value).otherwise(lag_col) return self._with_new_scol(col, field=self._internal.data_fields[0].copy(nullable=True)) # TODO: Update Documentation for Bins Parameter when its supported def value_counts( self, normalize: bool = False, sort: bool = True, ascending: bool = False, bins: None = None, dropna: bool = True, ) -> "Series": """ Return a Series containing counts of unique values. The resulting object will be in descending order so that the first element is the most frequently-occurring element. Excludes NA values by default. Parameters ---------- normalize : boolean, default False If True then the object returned will contain the relative frequencies of the unique values. sort : boolean, default True Sort by values. ascending : boolean, default False Sort in ascending order. bins : Not Yet Supported dropna : boolean, default True Don't include counts of NaN. Returns ------- counts : Series See Also -------- Series.count: Number of non-NA elements in a Series. Examples -------- For Series >>> df = ps.DataFrame({'x':[0, 0, 1, 1, 1, np.nan]}) >>> df.x.value_counts() # doctest: +NORMALIZE_WHITESPACE 1.0 3 0.0 2 Name: x, dtype: int64 With `normalize` set to `True`, returns the relative frequency by dividing all values by the sum of values. >>> df.x.value_counts(normalize=True) # doctest: +NORMALIZE_WHITESPACE 1.0 0.6 0.0 0.4 Name: x, dtype: float64 **dropna** With `dropna` set to `False` we can also see NaN index values. >>> df.x.value_counts(dropna=False) # doctest: +NORMALIZE_WHITESPACE 1.0 3 0.0 2 NaN 1 Name: x, dtype: int64 For Index >>> idx = ps.Index([3, 1, 2, 3, 4, np.nan]) >>> idx Float64Index([3.0, 1.0, 2.0, 3.0, 4.0, nan], dtype='float64') >>> idx.value_counts().sort_index() 1.0 1 2.0 1 3.0 2 4.0 1 dtype: int64 **sort** With `sort` set to `False`, the result wouldn't be sorted by number of count. >>> idx.value_counts(sort=True).sort_index() 1.0 1 2.0 1 3.0 2 4.0 1 dtype: int64 **normalize** With `normalize` set to `True`, returns the relative frequency by dividing all values by the sum of values. >>> idx.value_counts(normalize=True).sort_index() 1.0 0.2 2.0 0.2 3.0 0.4 4.0 0.2 dtype: float64 **dropna** With `dropna` set to `False` we can also see NaN index values. >>> idx.value_counts(dropna=False).sort_index() # doctest: +SKIP 1.0 1 2.0 1 3.0 2 4.0 1 NaN 1 dtype: int64 For MultiIndex. >>> midx = pd.MultiIndex([['lama', 'cow', 'falcon'], ... ['speed', 'weight', 'length']], ... [[0, 0, 0, 1, 1, 1, 2, 2, 2], ... [1, 1, 1, 1, 1, 2, 1, 2, 2]]) >>> s = ps.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx) >>> s.index # doctest: +SKIP MultiIndex([( 'lama', 'weight'), ( 'lama', 'weight'), ( 'lama', 'weight'), ( 'cow', 'weight'), ( 'cow', 'weight'), ( 'cow', 'length'), ('falcon', 'weight'), ('falcon', 'length'), ('falcon', 'length')], ) >>> s.index.value_counts().sort_index() (cow, length) 1 (cow, weight) 2 (falcon, length) 2 (falcon, weight) 1 (lama, weight) 3 dtype: int64 >>> s.index.value_counts(normalize=True).sort_index() (cow, length) 0.111111 (cow, weight) 0.222222 (falcon, length) 0.222222 (falcon, weight) 0.111111 (lama, weight) 0.333333 dtype: float64 If Index has name, keep the name up. >>> idx = ps.Index([0, 0, 0, 1, 1, 2, 3], name='pandas-on-Spark') >>> idx.value_counts().sort_index() 0 3 1 2 2 1 3 1 Name: pandas-on-Spark, dtype: int64 """ from pyspark.pandas.series import first_series if bins is not None: raise NotImplementedError("value_counts currently does not support bins") if dropna: sdf_dropna = self._internal.spark_frame.select(self.spark.column).dropna() else: sdf_dropna = self._internal.spark_frame.select(self.spark.column) index_name = SPARK_DEFAULT_INDEX_NAME column_name = self._internal.data_spark_column_names[0] sdf = sdf_dropna.groupby(scol_for(sdf_dropna, column_name).alias(index_name)).count() if sort: if ascending: sdf = sdf.orderBy(F.col("count")) else: sdf = sdf.orderBy(F.col("count").desc()) if normalize: sum = sdf_dropna.count() sdf = sdf.withColumn("count", F.col("count") / SF.lit(sum)) internal = InternalFrame( spark_frame=sdf, index_spark_columns=[scol_for(sdf, index_name)], column_labels=self._internal.column_labels, data_spark_columns=[scol_for(sdf, "count")], column_label_names=self._internal.column_label_names, ) return first_series(DataFrame(internal)) def nunique(self, dropna: bool = True, approx: bool = False, rsd: float = 0.05) -> int: """ Return number of unique elements in the object. Excludes NA values by default. Parameters ---------- dropna : bool, default True Don’t include NaN in the count. approx: bool, default False If False, will use the exact algorithm and return the exact number of unique. If True, it uses the HyperLogLog approximate algorithm, which is significantly faster for large amount of data. Note: This parameter is specific to pandas-on-Spark and is not found in pandas. rsd: float, default 0.05 Maximum estimation error allowed in the HyperLogLog algorithm. Note: Just like ``approx`` this parameter is specific to pandas-on-Spark. Returns ------- int See Also -------- DataFrame.nunique: Method nunique for DataFrame. Series.count: Count non-NA/null observations in the Series. Examples -------- >>> ps.Series([1, 2, 3, np.nan]).nunique() 3 >>> ps.Series([1, 2, 3, np.nan]).nunique(dropna=False) 4 On big data, we recommend using the approximate algorithm to speed up this function. The result will be very close to the exact unique count. >>> ps.Series([1, 2, 3, np.nan]).nunique(approx=True) 3 >>> idx = ps.Index([1, 1, 2, None]) >>> idx Float64Index([1.0, 1.0, 2.0, nan], dtype='float64') >>> idx.nunique() 2 >>> idx.nunique(dropna=False) 3 """ res = self._internal.spark_frame.select([self._nunique(dropna, approx, rsd)]) return res.collect()[0][0] def _nunique(self, dropna: bool = True, approx: bool = False, rsd: float = 0.05) -> Column: colname = self._internal.data_spark_column_names[0] count_fn = cast( Callable[[Column], Column], partial(F.approx_count_distinct, rsd=rsd) if approx else F.countDistinct, ) if dropna: return count_fn(self.spark.column).alias(colname) else: return ( count_fn(self.spark.column) + F.when( F.count(F.when(self.spark.column.isNull(), 1).otherwise(None)) >= 1, 1 ).otherwise(0) ).alias(colname) def take(self: T_IndexOps, indices: Sequence[int]) -> T_IndexOps: """ Return the elements in the given *positional* indices along an axis. This means that we are not indexing according to actual values in the index attribute of the object. We are indexing according to the actual position of the element in the object. Parameters ---------- indices : array-like An array of ints indicating which positions to take. Returns ------- taken : same type as caller An array-like containing the elements taken from the object. See Also -------- DataFrame.loc : Select a subset of a DataFrame by labels. DataFrame.iloc : Select a subset of a DataFrame by positions. numpy.take : Take elements from an array along an axis. Examples -------- Series >>> psser = ps.Series([100, 200, 300, 400, 500]) >>> psser 0 100 1 200 2 300 3 400 4 500 dtype: int64 >>> psser.take([0, 2, 4]).sort_index() 0 100 2 300 4 500 dtype: int64 Index >>> psidx = ps.Index([100, 200, 300, 400, 500]) >>> psidx Int64Index([100, 200, 300, 400, 500], dtype='int64') >>> psidx.take([0, 2, 4]).sort_values() Int64Index([100, 300, 500], dtype='int64') MultiIndex >>> psmidx = ps.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("x", "c")]) >>> psmidx # doctest: +SKIP MultiIndex([('x', 'a'), ('x', 'b'), ('x', 'c')], ) >>> psmidx.take([0, 2]) # doctest: +SKIP MultiIndex([('x', 'a'), ('x', 'c')], ) """ if not is_list_like(indices) or isinstance(indices, (dict, set)): raise TypeError("`indices` must be a list-like except dict or set") if isinstance(self, ps.Series): return cast(T_IndexOps, self.iloc[indices]) else: return cast(T_IndexOps, self._psdf.iloc[indices].index) def factorize( self: T_IndexOps, sort: bool = True, na_sentinel: Optional[int] = -1 ) -> Tuple[T_IndexOps, pd.Index]: """ Encode the object as an enumerated type or categorical variable. This method is useful for obtaining a numeric representation of an array when all that matters is identifying distinct values. Parameters ---------- sort : bool, default True na_sentinel : int or None, default -1 Value to mark "not found". If None, will not drop the NaN from the uniques of the values. Returns ------- codes : Series or Index A Series or Index that's an indexer into `uniques`. ``uniques.take(codes)`` will have the same values as `values`. uniques : pd.Index The unique valid values. .. note :: Even if there's a missing value in `values`, `uniques` will *not* contain an entry for it. Examples -------- >>> psser = ps.Series(['b', None, 'a', 'c', 'b']) >>> codes, uniques = psser.factorize() >>> codes 0 1 1 -1 2 0 3 2 4 1 dtype: int32 >>> uniques Index(['a', 'b', 'c'], dtype='object') >>> codes, uniques = psser.factorize(na_sentinel=None) >>> codes 0 1 1 3 2 0 3 2 4 1 dtype: int32 >>> uniques Index(['a', 'b', 'c', None], dtype='object') >>> codes, uniques = psser.factorize(na_sentinel=-2) >>> codes 0 1 1 -2 2 0 3 2 4 1 dtype: int32 >>> uniques Index(['a', 'b', 'c'], dtype='object') For Index: >>> psidx = ps.Index(['b', None, 'a', 'c', 'b']) >>> codes, uniques = psidx.factorize() >>> codes Int64Index([1, -1, 0, 2, 1], dtype='int64') >>> uniques Index(['a', 'b', 'c'], dtype='object') """ from pyspark.pandas.series import first_series assert (na_sentinel is None) or isinstance(na_sentinel, int) assert sort is True if isinstance(self.dtype, CategoricalDtype): categories = self.dtype.categories if len(categories) == 0: scol = SF.lit(None) else: kvs = list( chain( *[ (SF.lit(code), SF.lit(category)) for code, category in enumerate(categories) ] ) ) map_scol = F.create_map(*kvs) scol = map_scol.getItem(self.spark.column) codes, uniques = self._with_new_scol( scol.alias(self._internal.data_spark_column_names[0]) ).factorize(na_sentinel=na_sentinel) return codes, uniques.astype(self.dtype) uniq_sdf = self._internal.spark_frame.select(self.spark.column).distinct() # Check number of uniques and constructs sorted `uniques_list` max_compute_count = get_option("compute.max_rows") if max_compute_count is not None: uniq_pdf = uniq_sdf.limit(max_compute_count + 1).toPandas() if len(uniq_pdf) > max_compute_count: raise ValueError( "Current Series has more then {0} unique values. " "Please set 'compute.max_rows' by using 'pyspark.pandas.config.set_option' " "to more than {0} rows. Note that, before changing the " "'compute.max_rows', this operation is considerably expensive.".format( max_compute_count ) ) else: uniq_pdf = uniq_sdf.toPandas() # pandas takes both NaN and null in Spark to np.nan, so de-duplication is required uniq_series = first_series(uniq_pdf).drop_duplicates() uniques_list = uniq_series.tolist() uniques_list = sorted(uniques_list, key=lambda x: (pd.isna(x), x)) # Constructs `unique_to_code` mapping non-na unique to code unique_to_code = {} if na_sentinel is not None: na_sentinel_code = na_sentinel code = 0 for unique in uniques_list: if pd.isna(unique): if na_sentinel is None: na_sentinel_code = code else: unique_to_code[unique] = code code += 1 kvs = list( chain(*([(SF.lit(unique), SF.lit(code)) for unique, code in unique_to_code.items()])) ) if len(kvs) == 0: # uniques are all missing values new_scol = SF.lit(na_sentinel_code) else: scol = self.spark.column if isinstance(self.spark.data_type, (FloatType, DoubleType)): cond = scol.isNull() | F.isnan(scol) else: cond = scol.isNull() map_scol = F.create_map(*kvs) null_scol = F.when(cond, SF.lit(na_sentinel_code)) new_scol = null_scol.otherwise(map_scol.getItem(scol)) codes = self._with_new_scol(new_scol.alias(self._internal.data_spark_column_names[0])) if na_sentinel is not None: # Drops the NaN from the uniques of the values uniques_list = [x for x in uniques_list if not pd.isna(x)] uniques = pd.Index(uniques_list) return codes, uniques def _test() -> None: import os import doctest import sys from pyspark.sql import SparkSession import pyspark.pandas.base os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.pandas.base.__dict__.copy() globs["ps"] = pyspark.pandas spark = ( SparkSession.builder.master("local[4]").appName("pyspark.pandas.base tests").getOrCreate() ) (failure_count, test_count) = doctest.testmod( pyspark.pandas.base, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE, ) spark.stop() if failure_count: sys.exit(-1) if __name__ == "__main__": _test()