[SPARK-36214][PYTHON] Add add_categories to CategoricalAccessor and CategoricalIndex

### What changes were proposed in this pull request?

Add `add_categories` to `CategoricalAccessor` and `CategoricalIndex`.

### Why are the changes needed?

We should implement `add_categories` in `CategoricalAccessor` and `CategoricalIndex`.

### Does this PR introduce _any_ user-facing change?

Yes, users will be able to use `add_categories`.

### How was this patch tested?

Added some tests.

Closes #33470 from ueshin/issues/SPARK-36214/add_categories.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
This commit is contained in:
Takuya UESHIN 2021-07-21 22:34:04 -07:00
parent f3e29574d9
commit dcc0aaa3ef
7 changed files with 158 additions and 5 deletions

View file

@ -175,6 +175,7 @@ Categorical components
CategoricalIndex.codes
CategoricalIndex.categories
CategoricalIndex.ordered
CategoricalIndex.add_categories
CategoricalIndex.as_ordered
CategoricalIndex.as_unordered

View file

@ -401,6 +401,7 @@ the ``Series.cat`` accessor.
Series.cat.categories
Series.cat.ordered
Series.cat.codes
Series.cat.add_categories
Series.cat.as_ordered
Series.cat.as_unordered

View file

@ -14,10 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import List, Optional, Union, TYPE_CHECKING, cast
from typing import Any, List, Optional, Union, TYPE_CHECKING, cast
import pandas as pd
from pandas.api.types import CategoricalDtype
from pandas.api.types import CategoricalDtype, is_list_like
from pyspark.pandas.internal import InternalField
from pyspark.sql.types import StructField
@ -165,8 +165,84 @@ class CategoricalAccessor(object):
),
).rename()
def add_categories(self, new_categories: pd.Index, inplace: bool = False) -> "ps.Series":
raise NotImplementedError()
def add_categories(
self, new_categories: Union[pd.Index, Any, List], inplace: bool = False
) -> Optional["ps.Series"]:
"""
Add new categories.
`new_categories` will be included at the last/highest place in the
categories and will be unused directly after this call.
Parameters
----------
new_categories : category or list-like of category
The new categories to be included.
inplace : bool, default False
Whether or not to add the categories inplace or return a copy of
this categorical with added categories.
Returns
-------
Series or None
Categorical with new categories added or None if ``inplace=True``.
Raises
------
ValueError
If the new categories include old categories or do not validate as
categories
Examples
--------
>>> s = ps.Series(list("abbccc"), dtype="category")
>>> s # doctest: +SKIP
0 a
1 b
2 b
3 c
4 c
5 c
dtype: category
Categories (3, object): ['a', 'b', 'c']
>>> s.cat.add_categories('x') # doctest: +SKIP
0 a
1 b
2 b
3 c
4 c
5 c
dtype: category
Categories (4, object): ['a', 'b', 'c', 'x']
"""
from pyspark.pandas.frame import DataFrame
if is_list_like(new_categories):
categories = list(new_categories) # type: List
else:
categories = [new_categories]
if any(cat in self.categories for cat in categories):
raise ValueError(
"new categories must not include old categories: {{{cats}}}".format(
cats=", ".join(set(str(cat) for cat in categories if cat in self.categories))
)
)
internal = self._data._psdf._internal.with_new_spark_column(
self._data._column_label,
self._data.spark.column,
field=self._data._internal.data_fields[0].copy(
dtype=CategoricalDtype(list(self.categories) + categories, ordered=self.ordered)
),
)
if inplace:
self._data._psdf._update_internal_frame(internal)
return None
else:
psser = DataFrame(internal)._psser_for(self._data._column_label)
return psser._with_new_scol(psser.spark.column, field=psser._internal.data_fields[0])
def _set_ordered(self, *, ordered: bool, inplace: bool) -> Optional["ps.Series"]:
from pyspark.pandas.frame import DataFrame

View file

@ -204,6 +204,52 @@ class CategoricalIndex(Index):
"""
return self.dtype.ordered
def add_categories(
self, new_categories: Union[pd.Index, Any, List], inplace: bool = False
) -> Optional["CategoricalIndex"]:
"""
Add new categories.
`new_categories` will be included at the last/highest place in the
categories and will be unused directly after this call.
Parameters
----------
new_categories : category or list-like of category
The new categories to be included.
inplace : bool, default False
Whether or not to add the categories inplace or return a copy of
this categorical with added categories.
Returns
-------
CategoricalIndex or None
Categorical with new categories added or None if ``inplace=True``.
Raises
------
ValueError
If the new categories include old categories or do not validate as
categories
Examples
--------
>>> idx = ps.CategoricalIndex(list("abbccc"))
>>> idx # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex(['a', 'b', 'b', 'c', 'c', 'c'],
categories=['a', 'b', 'c'], ordered=False, dtype='category')
>>> idx.add_categories('x') # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex(['a', 'b', 'b', 'c', 'c', 'c'],
categories=['a', 'b', 'c', 'x'], ordered=False, dtype='category')
"""
if inplace:
raise ValueError("cannot use inplace with CategoricalIndex")
return CategoricalIndex(
self.to_series().cat.add_categories(new_categories=new_categories)
).rename(self.name)
def as_ordered(self, inplace: bool = False) -> Optional["CategoricalIndex"]:
"""
Set the Categorical to be ordered.

View file

@ -125,7 +125,6 @@ class MissingPandasLikeCategoricalIndex(MissingPandasLikeIndex):
# Functions
rename_categories = _unsupported_function("rename_categories", cls="CategoricalIndex")
reorder_categories = _unsupported_function("reorder_categories", cls="CategoricalIndex")
add_categories = _unsupported_function("add_categories", cls="CategoricalIndex")
remove_categories = _unsupported_function("remove_categories", cls="CategoricalIndex")
remove_unused_categories = _unsupported_function(
"remove_unused_categories", cls="CategoricalIndex"

View file

@ -94,6 +94,18 @@ class CategoricalIndexTest(PandasOnSparkTestCase, TestUtils):
with self.assertRaises(ValueError):
psidx.categories = [1, 2, 3, 4]
def test_add_categories(self):
pidx = pd.CategoricalIndex([1, 2, 3], categories=[3, 2, 1])
psidx = ps.from_pandas(pidx)
self.assert_eq(pidx.add_categories(4), psidx.add_categories(4))
self.assert_eq(pidx.add_categories([4, 5]), psidx.add_categories([4, 5]))
self.assert_eq(pidx.add_categories([]), psidx.add_categories([]))
self.assertRaises(ValueError, lambda: psidx.add_categories(4, inplace=True))
self.assertRaises(ValueError, lambda: psidx.add_categories(3))
self.assertRaises(ValueError, lambda: psidx.add_categories([4, 4]))
def test_as_ordered_unordered(self):
pidx = pd.CategoricalIndex(["x", "y", "z"], categories=["z", "y", "x"])
psidx = ps.from_pandas(pidx)

View file

@ -79,6 +79,24 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
with self.assertRaises(ValueError):
psser.cat.categories = [1, 2, 3, 4]
def test_add_categories(self):
pdf, psdf = self.df_pair
pser = pdf.a
psser = psdf.a
self.assert_eq(pser.cat.add_categories(4), psser.cat.add_categories(4))
self.assert_eq(pser.cat.add_categories([4, 5]), psser.cat.add_categories([4, 5]))
self.assert_eq(pser.cat.add_categories([]), psser.cat.add_categories([]))
pser.cat.add_categories(4, inplace=True)
psser.cat.add_categories(4, inplace=True)
self.assert_eq(pser, psser)
self.assert_eq(pdf, psdf)
self.assertRaises(ValueError, lambda: psser.cat.add_categories(4))
self.assertRaises(ValueError, lambda: psser.cat.add_categories([5, 5]))
def test_as_ordered_unordered(self):
pdf, psdf = self.df_pair