[SPARK-35453][PYTHON] Move Koalas accessor to pandas_on_spark accessor

### What changes were proposed in this pull request?

This PR proposes renaming the existing "Koalas Accessor" to "Pandas API on Spark Accessor".

### Why are the changes needed?

Because we don't use name "Koalas" anymore, rather use "Pandas API on Spark".

So, the related code bases are all need to be changed.

### Does this PR introduce _any_ user-facing change?

Yes, the usage of pandas API on Spark accessor is changed from `df.koalas.[...]`. to `df.pandas_on_spark.[...]`.

**Note:** `df.koalas.[...]` is still available but with deprecated warnings.

### How was this patch tested?

Manually tested in local and checked one by one.

Closes #32674 from itholic/SPARK-35453.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
itholic 2021-06-01 10:33:10 +09:00 committed by Hyukjin Kwon
parent 8e11f5f007
commit 7e2717333b
9 changed files with 155 additions and 140 deletions

View file

@ -88,19 +88,19 @@ class PandasOnSparkFrameMethods(object):
Examples
--------
>>> df = ps.DataFrame({"x": ['a', 'b', 'c']})
>>> df.koalas.attach_id_column(id_type="sequence", column="id")
>>> df.pandas_on_spark.attach_id_column(id_type="sequence", column="id")
x id
0 a 0
1 b 1
2 c 2
>>> df.koalas.attach_id_column(id_type="distributed-sequence", column=0)
>>> df.pandas_on_spark.attach_id_column(id_type="distributed-sequence", column=0)
x 0
0 a 0
1 b 1
2 c 2
>>> df.koalas.attach_id_column(id_type="distributed", column=0.0)
>>> df.pandas_on_spark.attach_id_column(id_type="distributed", column=0.0)
... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
x 0.0
0 a ...
@ -110,14 +110,14 @@ class PandasOnSparkFrameMethods(object):
For multi-index columns:
>>> df = ps.DataFrame({("x", "y"): ['a', 'b', 'c']})
>>> df.koalas.attach_id_column(id_type="sequence", column=("id-x", "id-y"))
>>> df.pandas_on_spark.attach_id_column(id_type="sequence", column=("id-x", "id-y"))
x id-x
y id-y
0 a 0
1 b 1
2 c 2
>>> df.koalas.attach_id_column(id_type="distributed-sequence", column=(0, 1.0))
>>> df.pandas_on_spark.attach_id_column(id_type="distributed-sequence", column=(0, 1.0))
x 0
y 1.0
0 a 0
@ -205,7 +205,7 @@ class PandasOnSparkFrameMethods(object):
... return pd.DataFrame([len(pdf)])
...
>>> df = ps.DataFrame({'A': range(1000)})
>>> df.koalas.apply_batch(length) # doctest: +SKIP
>>> df.pandas_on_spark.apply_batch(length) # doctest: +SKIP
c0
0 83
1 83
@ -263,7 +263,7 @@ class PandasOnSparkFrameMethods(object):
DataFrame.applymap: For elementwise operations.
DataFrame.aggregate: Only perform aggregating type operations.
DataFrame.transform: Only perform transforming type operations.
Series.koalas.transform_batch: transform the search as each pandas chunks.
Series.pandas_on_spark.transform_batch: transform the search as each pandas chunks.
Examples
--------
@ -276,19 +276,19 @@ class PandasOnSparkFrameMethods(object):
>>> def query_func(pdf) -> ps.DataFrame[int, int]:
... return pdf.query('A == 1')
>>> df.koalas.apply_batch(query_func)
>>> df.pandas_on_spark.apply_batch(query_func)
c0 c1
0 1 2
>>> def query_func(pdf) -> ps.DataFrame["A": int, "B": int]:
... return pdf.query('A == 1')
>>> df.koalas.apply_batch(query_func)
>>> df.pandas_on_spark.apply_batch(query_func)
A B
0 1 2
You can also omit the type hints so pandas-on-Spark infers the return schema as below:
>>> df.koalas.apply_batch(lambda pdf: pdf.query('A == 1'))
>>> df.pandas_on_spark.apply_batch(lambda pdf: pdf.query('A == 1'))
A B
0 1 2
@ -296,7 +296,7 @@ class PandasOnSparkFrameMethods(object):
>>> def calculation(pdf, y, z) -> ps.DataFrame[int, int]:
... return pdf ** y + z
>>> df.koalas.apply_batch(calculation, args=(10,), z=20)
>>> df.pandas_on_spark.apply_batch(calculation, args=(10,), z=20)
c0 c1
0 21 1044
1 59069 1048596
@ -304,13 +304,13 @@ class PandasOnSparkFrameMethods(object):
You can also use ``np.ufunc`` and built-in functions as input.
>>> df.koalas.apply_batch(np.add, args=(10,))
>>> df.pandas_on_spark.apply_batch(np.add, args=(10,))
A B
0 11 12
1 13 14
2 15 16
>>> (df * -1).koalas.apply_batch(abs)
>>> (df * -1).pandas_on_spark.apply_batch(abs)
A B
0 1 2
1 3 4
@ -411,7 +411,7 @@ class PandasOnSparkFrameMethods(object):
... return pd.DataFrame([len(pdf)] * len(pdf))
...
>>> df = ps.DataFrame({'A': range(1000)})
>>> df.koalas.transform_batch(length) # doctest: +SKIP
>>> df.pandas_on_spark.transform_batch(length) # doctest: +SKIP
c0
0 83
1 83
@ -461,8 +461,8 @@ class PandasOnSparkFrameMethods(object):
See Also
--------
DataFrame.koalas.apply_batch: For row/columnwise operations.
Series.koalas.transform_batch: transform the search as each pandas chunks.
DataFrame.pandas_on_spark.apply_batch: For row/columnwise operations.
Series.pandas_on_spark.transform_batch: transform the search as each pandas chunks.
Examples
--------
@ -475,7 +475,7 @@ class PandasOnSparkFrameMethods(object):
>>> def plus_one_func(pdf) -> ps.DataFrame[int, int]:
... return pdf + 1
>>> df.koalas.transform_batch(plus_one_func)
>>> df.pandas_on_spark.transform_batch(plus_one_func)
c0 c1
0 2 3
1 4 5
@ -483,7 +483,7 @@ class PandasOnSparkFrameMethods(object):
>>> def plus_one_func(pdf) -> ps.DataFrame['A': int, 'B': int]:
... return pdf + 1
>>> df.koalas.transform_batch(plus_one_func)
>>> df.pandas_on_spark.transform_batch(plus_one_func)
A B
0 2 3
1 4 5
@ -491,7 +491,7 @@ class PandasOnSparkFrameMethods(object):
>>> def plus_one_func(pdf) -> ps.Series[int]:
... return pdf.B + 1
>>> df.koalas.transform_batch(plus_one_func)
>>> df.pandas_on_spark.transform_batch(plus_one_func)
0 3
1 5
2 7
@ -499,13 +499,13 @@ class PandasOnSparkFrameMethods(object):
You can also omit the type hints so pandas-on-Spark infers the return schema as below:
>>> df.koalas.transform_batch(lambda pdf: pdf + 1)
>>> df.pandas_on_spark.transform_batch(lambda pdf: pdf + 1)
A B
0 2 3
1 4 5
2 6 7
>>> (df * -1).koalas.transform_batch(abs)
>>> (df * -1).pandas_on_spark.transform_batch(abs)
A B
0 1 2
1 3 4
@ -513,7 +513,7 @@ class PandasOnSparkFrameMethods(object):
Note that you should not transform the index. The index information will not change.
>>> df.koalas.transform_batch(lambda pdf: pdf.B + 1)
>>> df.pandas_on_spark.transform_batch(lambda pdf: pdf.B + 1)
0 3
1 5
2 7
@ -521,7 +521,7 @@ class PandasOnSparkFrameMethods(object):
You can also specify extra arguments as below.
>>> df.koalas.transform_batch(lambda pdf, a, b, c: pdf.B + a + b + c, 1, 2, c=3)
>>> df.pandas_on_spark.transform_batch(lambda pdf, a, b, c: pdf.B + a + b + c, 1, 2, c=3)
0 8
1 10
2 12
@ -720,7 +720,7 @@ class PandasOnSparkSeriesMethods(object):
... return pd.Series([len(pser)] * len(pser))
...
>>> df = ps.DataFrame({'A': range(1000)})
>>> df.A.koalas.transform_batch(length) # doctest: +SKIP
>>> df.A.pandas_on_spark.transform_batch(length) # doctest: +SKIP
c0
0 83
1 83
@ -751,7 +751,8 @@ class PandasOnSparkSeriesMethods(object):
See Also
--------
DataFrame.koalas.apply_batch : Similar but it takes pandas DataFrame as its internal batch.
DataFrame.pandas_on_spark.apply_batch : Similar but it takes pandas DataFrame as its
internal batch.
Examples
--------
@ -764,7 +765,7 @@ class PandasOnSparkSeriesMethods(object):
>>> def plus_one_func(pser) -> ps.Series[np.int64]:
... return pser + 1
>>> df.A.koalas.transform_batch(plus_one_func)
>>> df.A.pandas_on_spark.transform_batch(plus_one_func)
0 2
1 4
2 6
@ -772,7 +773,7 @@ class PandasOnSparkSeriesMethods(object):
You can also omit the type hints so pandas-on-Spark infers the return schema as below:
>>> df.A.koalas.transform_batch(lambda pser: pser + 1)
>>> df.A.pandas_on_spark.transform_batch(lambda pser: pser + 1)
0 2
1 4
2 6
@ -782,7 +783,7 @@ class PandasOnSparkSeriesMethods(object):
>>> def plus_one_func(pser, a, b, c=3) -> ps.Series[np.int64]:
... return pser + a + b + c
>>> df.A.koalas.transform_batch(plus_one_func, 1, b=2)
>>> df.A.pandas_on_spark.transform_batch(plus_one_func, 1, b=2)
0 7
1 9
2 11
@ -790,13 +791,13 @@ class PandasOnSparkSeriesMethods(object):
You can also use ``np.ufunc`` and built-in functions as input.
>>> df.A.koalas.transform_batch(np.add, 10)
>>> df.A.pandas_on_spark.transform_batch(np.add, 10)
0 11
1 13
2 15
Name: A, dtype: int64
>>> (df * -1).A.koalas.transform_batch(abs)
>>> (df * -1).A.pandas_on_spark.transform_batch(abs)
0 1
1 3
2 5

View file

@ -111,7 +111,7 @@ class DatetimeMethods(object):
def pandas_microsecond(s) -> "ps.Series[np.int64]":
return s.dt.microsecond
return self._data.koalas.transform_batch(pandas_microsecond)
return self._data.pandas_on_spark.transform_batch(pandas_microsecond)
@property
def nanosecond(self) -> "ps.Series":
@ -171,7 +171,7 @@ class DatetimeMethods(object):
def pandas_dayofweek(s) -> "ps.Series[np.int64]":
return s.dt.dayofweek
return self._data.koalas.transform_batch(pandas_dayofweek)
return self._data.pandas_on_spark.transform_batch(pandas_dayofweek)
@property
def weekday(self) -> "ps.Series":
@ -189,7 +189,7 @@ class DatetimeMethods(object):
def pandas_dayofyear(s) -> "ps.Series[np.int64]":
return s.dt.dayofyear
return self._data.koalas.transform_batch(pandas_dayofyear)
return self._data.pandas_on_spark.transform_batch(pandas_dayofyear)
@property
def quarter(self) -> "ps.Series":
@ -201,7 +201,7 @@ class DatetimeMethods(object):
def pandas_quarter(s) -> "ps.Series[np.int64]":
return s.dt.quarter
return self._data.koalas.transform_batch(pandas_quarter)
return self._data.pandas_on_spark.transform_batch(pandas_quarter)
@property
def is_month_start(self) -> "ps.Series":
@ -241,7 +241,7 @@ class DatetimeMethods(object):
def pandas_is_month_start(s) -> "ps.Series[bool]":
return s.dt.is_month_start
return self._data.koalas.transform_batch(pandas_is_month_start)
return self._data.pandas_on_spark.transform_batch(pandas_is_month_start)
@property
def is_month_end(self) -> "ps.Series":
@ -281,7 +281,7 @@ class DatetimeMethods(object):
def pandas_is_month_end(s) -> "ps.Series[bool]":
return s.dt.is_month_end
return self._data.koalas.transform_batch(pandas_is_month_end)
return self._data.pandas_on_spark.transform_batch(pandas_is_month_end)
@property
def is_quarter_start(self) -> "ps.Series":
@ -332,7 +332,7 @@ class DatetimeMethods(object):
def pandas_is_quarter_start(s) -> "ps.Series[bool]":
return s.dt.is_quarter_start
return self._data.koalas.transform_batch(pandas_is_quarter_start)
return self._data.pandas_on_spark.transform_batch(pandas_is_quarter_start)
@property
def is_quarter_end(self) -> "ps.Series":
@ -383,7 +383,7 @@ class DatetimeMethods(object):
def pandas_is_quarter_end(s) -> "ps.Series[bool]":
return s.dt.is_quarter_end
return self._data.koalas.transform_batch(pandas_is_quarter_end)
return self._data.pandas_on_spark.transform_batch(pandas_is_quarter_end)
@property
def is_year_start(self) -> "ps.Series":
@ -423,7 +423,7 @@ class DatetimeMethods(object):
def pandas_is_year_start(s) -> "ps.Series[bool]":
return s.dt.is_year_start
return self._data.koalas.transform_batch(pandas_is_year_start)
return self._data.pandas_on_spark.transform_batch(pandas_is_year_start)
@property
def is_year_end(self) -> "ps.Series":
@ -463,7 +463,7 @@ class DatetimeMethods(object):
def pandas_is_year_end(s) -> "ps.Series[bool]":
return s.dt.is_year_end
return self._data.koalas.transform_batch(pandas_is_year_end)
return self._data.pandas_on_spark.transform_batch(pandas_is_year_end)
@property
def is_leap_year(self) -> "ps.Series":
@ -503,7 +503,7 @@ class DatetimeMethods(object):
def pandas_is_leap_year(s) -> "ps.Series[bool]":
return s.dt.is_leap_year
return self._data.koalas.transform_batch(pandas_is_leap_year)
return self._data.pandas_on_spark.transform_batch(pandas_is_leap_year)
@property
def daysinmonth(self) -> "ps.Series":
@ -515,7 +515,7 @@ class DatetimeMethods(object):
def pandas_daysinmonth(s) -> "ps.Series[np.int64]":
return s.dt.daysinmonth
return self._data.koalas.transform_batch(pandas_daysinmonth)
return self._data.pandas_on_spark.transform_batch(pandas_daysinmonth)
@property
def days_in_month(self) -> "ps.Series":
@ -578,7 +578,7 @@ class DatetimeMethods(object):
def pandas_normalize(s) -> "ps.Series[np.datetime64]":
return s.dt.normalize()
return self._data.koalas.transform_batch(pandas_normalize)
return self._data.pandas_on_spark.transform_batch(pandas_normalize)
def strftime(self, date_format: str) -> "ps.Series":
"""
@ -627,7 +627,7 @@ class DatetimeMethods(object):
def pandas_strftime(s) -> "ps.Series[str]":
return s.dt.strftime(date_format)
return self._data.koalas.transform_batch(pandas_strftime)
return self._data.pandas_on_spark.transform_batch(pandas_strftime)
def round(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.Series":
"""
@ -683,7 +683,7 @@ class DatetimeMethods(object):
def pandas_round(s) -> "ps.Series[np.datetime64]":
return s.dt.round(freq, *args, **kwargs)
return self._data.koalas.transform_batch(pandas_round)
return self._data.pandas_on_spark.transform_batch(pandas_round)
def floor(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.Series":
"""
@ -739,7 +739,7 @@ class DatetimeMethods(object):
def pandas_floor(s) -> "ps.Series[np.datetime64]":
return s.dt.floor(freq, *args, **kwargs)
return self._data.koalas.transform_batch(pandas_floor)
return self._data.pandas_on_spark.transform_batch(pandas_floor)
def ceil(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.Series":
"""
@ -795,7 +795,7 @@ class DatetimeMethods(object):
def pandas_ceil(s) -> "ps.Series[np.datetime64]":
return s.dt.ceil(freq, *args, **kwargs)
return self._data.koalas.transform_batch(pandas_ceil)
return self._data.pandas_on_spark.transform_batch(pandas_ceil)
def month_name(self, locale: Optional[str] = None) -> "ps.Series":
"""
@ -832,7 +832,7 @@ class DatetimeMethods(object):
def pandas_month_name(s) -> "ps.Series[str]":
return s.dt.month_name(locale=locale)
return self._data.koalas.transform_batch(pandas_month_name)
return self._data.pandas_on_spark.transform_batch(pandas_month_name)
def day_name(self, locale: Optional[str] = None) -> "ps.Series":
"""
@ -869,7 +869,7 @@ class DatetimeMethods(object):
def pandas_day_name(s) -> "ps.Series[str]":
return s.dt.day_name(locale=locale)
return self._data.koalas.transform_batch(pandas_day_name)
return self._data.pandas_on_spark.transform_batch(pandas_day_name)
def _test() -> None:

View file

@ -882,6 +882,9 @@ class DataFrame(Frame, Generic[T]):
spark = CachedAccessor("spark", SparkFrameMethods)
# create accessor for pandas-on-Spark specific methods.
pandas_on_spark = CachedAccessor("pandas_on_spark", PandasOnSparkFrameMethods)
# keep the name "koalas" for backward compatibility.
koalas = CachedAccessor("koalas", PandasOnSparkFrameMethods)
def hist(self, bins=10, **kwds):
@ -2732,7 +2735,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
as_nullable_spark_type(psdf._internal.spark_type_for(output_label))
)
applied.append(
psser.koalas._transform_batch(
psser.pandas_on_spark._transform_batch(
func=lambda c: func(c, *args, **kwargs),
return_type=SeriesType(dtype, return_schema),
)
@ -2744,7 +2747,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
return DataFrame(internal)
else:
return self._apply_series_op(
lambda psser: psser.koalas.transform_batch(func, *args, **kwargs)
lambda psser: psser.pandas_on_spark.transform_batch(func, *args, **kwargs)
)
def pop(self, item) -> "DataFrame":
@ -3042,7 +3045,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
# default index, which will never be used. So use "distributed" index as a dummy to
# avoid overhead.
with option_context("compute.default_index_type", "distributed"):
psdf = psdf.koalas.apply_batch(pandas_between_time)
psdf = psdf.pandas_on_spark.apply_batch(pandas_between_time)
return DataFrame(
self._internal.copy(
@ -3124,7 +3127,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
# a default index, which will never be used. So use "distributed" index as a dummy
# to avoid overhead.
with option_context("compute.default_index_type", "distributed"):
psdf = psdf.koalas.apply_batch(pandas_at_time)
psdf = psdf.pandas_on_spark.apply_batch(pandas_at_time)
return DataFrame(
self._internal.copy(
@ -10739,7 +10742,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
.. note:: This API delegates to Spark SQL so the syntax follows Spark SQL. Therefore, the
pandas specific syntax such as `@` is not supported. If you want the pandas syntax,
you can work around with :meth:`DataFrame.koalas.apply_batch`, but you should
you can work around with :meth:`DataFrame.pandas_on_spark.apply_batch`, but you should
be aware that `query_func` will be executed at different nodes in a distributed manner.
So, for example, to use `@` syntax, make sure the variable is serialized by, for
example, putting it within the closure as below.
@ -10748,7 +10751,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
>>> def query_func(pdf):
... num = 1995
... return pdf.query('A > @num')
>>> df.koalas.apply_batch(query_func)
>>> df.pandas_on_spark.apply_batch(query_func)
A B
1996 1996 1996
1997 1997 1997
@ -11020,7 +11023,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
result_inner = pd.Series(result_inner).to_frame()
return result_inner
result = self.koalas.apply_batch(eval_func)
result = self.pandas_on_spark.apply_batch(eval_func)
if inplace:
# Here, the result is always a frame because the error is thrown during schema inference
# from pandas.

View file

@ -688,11 +688,11 @@ class DatetimeIndex(Index):
psdf = self.to_frame()[[]]
id_column_name = verify_temp_column_name(psdf, "__id_column__")
psdf = psdf.koalas.attach_id_column("distributed-sequence", id_column_name)
psdf = psdf.pandas_on_spark.attach_id_column("distributed-sequence", id_column_name)
with ps.option_context("compute.default_index_type", "distributed"):
# The attached index in the statement below will be dropped soon,
# so we enforce “distributed” default index type
psdf = psdf.koalas.apply_batch(pandas_between_time)
psdf = psdf.pandas_on_spark.apply_batch(pandas_between_time)
return ps.Index(first_series(psdf).rename(self.name))
def indexer_at_time(self, time: Union[datetime.time, str], asof: bool = False) -> Index:
@ -734,11 +734,11 @@ class DatetimeIndex(Index):
psdf = self.to_frame()[[]]
id_column_name = verify_temp_column_name(psdf, "__id_column__")
psdf = psdf.koalas.attach_id_column("distributed-sequence", id_column_name)
psdf = psdf.pandas_on_spark.attach_id_column("distributed-sequence", id_column_name)
with ps.option_context("compute.default_index_type", "distributed"):
# The attached index in the statement below will be dropped soon,
# so we enforce “distributed” default index type
psdf = psdf.koalas.apply_batch(pandas_at_time)
psdf = psdf.pandas_on_spark.apply_batch(pandas_at_time)
return ps.Index(first_series(psdf).rename(self.name))

View file

@ -1592,10 +1592,10 @@ def to_datetime(
)
if isinstance(arg, Series):
return arg.koalas.transform_batch(pandas_to_datetime)
return arg.pandas_on_spark.transform_batch(pandas_to_datetime)
if isinstance(arg, DataFrame):
psdf = arg[["year", "month", "day"]]
return psdf.koalas.transform_batch(pandas_to_datetime)
return psdf.pandas_on_spark.transform_batch(pandas_to_datetime)
return pd.to_datetime(
arg,
errors=errors,

View file

@ -650,6 +650,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
)
# create accessor for pandas-on-Spark specific methods.
pandas_on_spark = CachedAccessor("pandas_on_spark", PandasOnSparkSeriesMethods)
# keep the name "koalas" for backward compatibility.
koalas = CachedAccessor("koalas", PandasOnSparkSeriesMethods)
# Comparison Operators
@ -3098,7 +3101,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
apply_each = wraps(func)(lambda s: s.apply(func, args=args, **kwds))
if should_infer_schema:
return self.koalas._transform_batch(apply_each, None)
return self.pandas_on_spark._transform_batch(apply_each, None)
else:
sig_return = infer_return_type(func)
if not isinstance(sig_return, ScalarType):
@ -3107,7 +3110,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
"but found type {}".format(sig_return)
)
return_type = cast(ScalarType, sig_return)
return self.koalas._transform_batch(apply_each, return_type)
return self.pandas_on_spark._transform_batch(apply_each, return_type)
# TODO: not all arguments are implemented comparing to pandas' for now.
def aggregate(self, func: Union[str, List[str]]) -> Union[Scalar, "Series"]:

View file

@ -77,7 +77,7 @@ class StringMethods(object):
def pandas_capitalize(s) -> "ps.Series[str]":
return s.str.capitalize()
return self._data.koalas.transform_batch(pandas_capitalize)
return self._data.pandas_on_spark.transform_batch(pandas_capitalize)
def title(self) -> "ps.Series":
"""
@ -105,7 +105,7 @@ class StringMethods(object):
def pandas_title(s) -> "ps.Series[str]":
return s.str.title()
return self._data.koalas.transform_batch(pandas_title)
return self._data.pandas_on_spark.transform_batch(pandas_title)
def lower(self) -> "ps.Series":
"""
@ -179,7 +179,7 @@ class StringMethods(object):
def pandas_swapcase(s) -> "ps.Series[str]":
return s.str.swapcase()
return self._data.koalas.transform_batch(pandas_swapcase)
return self._data.pandas_on_spark.transform_batch(pandas_swapcase)
def startswith(self, pattern: str, na: Optional[Any] = None) -> "ps.Series":
"""
@ -231,7 +231,7 @@ class StringMethods(object):
def pandas_startswith(s) -> "ps.Series[bool]":
return s.str.startswith(pattern, na)
return self._data.koalas.transform_batch(pandas_startswith)
return self._data.pandas_on_spark.transform_batch(pandas_startswith)
def endswith(self, pattern: str, na: Optional[Any] = None) -> "ps.Series":
"""
@ -283,7 +283,7 @@ class StringMethods(object):
def pandas_endswith(s) -> "ps.Series[bool]":
return s.str.endswith(pattern, na)
return self._data.koalas.transform_batch(pandas_endswith)
return self._data.pandas_on_spark.transform_batch(pandas_endswith)
def strip(self, to_strip: Optional[str] = None) -> "ps.Series":
"""
@ -336,7 +336,7 @@ class StringMethods(object):
def pandas_strip(s) -> "ps.Series[str]":
return s.str.strip(to_strip)
return self._data.koalas.transform_batch(pandas_strip)
return self._data.pandas_on_spark.transform_batch(pandas_strip)
def lstrip(self, to_strip: Optional[str] = None) -> "ps.Series":
"""
@ -377,7 +377,7 @@ class StringMethods(object):
def pandas_lstrip(s) -> "ps.Series[str]":
return s.str.lstrip(to_strip)
return self._data.koalas.transform_batch(pandas_lstrip)
return self._data.pandas_on_spark.transform_batch(pandas_lstrip)
def rstrip(self, to_strip: Optional[str] = None) -> "ps.Series":
"""
@ -418,7 +418,7 @@ class StringMethods(object):
def pandas_rstrip(s) -> "ps.Series[str]":
return s.str.rstrip(to_strip)
return self._data.koalas.transform_batch(pandas_rstrip)
return self._data.pandas_on_spark.transform_batch(pandas_rstrip)
def get(self, i: int) -> "ps.Series":
"""
@ -473,7 +473,7 @@ class StringMethods(object):
def pandas_get(s) -> "ps.Series[str]":
return s.str.get(i)
return self._data.koalas.transform_batch(pandas_get)
return self._data.pandas_on_spark.transform_batch(pandas_get)
def isalnum(self) -> "ps.Series":
"""
@ -510,7 +510,7 @@ class StringMethods(object):
def pandas_isalnum(s) -> "ps.Series[bool]":
return s.str.isalnum()
return self._data.koalas.transform_batch(pandas_isalnum)
return self._data.pandas_on_spark.transform_batch(pandas_isalnum)
def isalpha(self) -> "ps.Series":
"""
@ -536,7 +536,7 @@ class StringMethods(object):
def pandas_isalpha(s) -> "ps.Series[bool]":
return s.str.isalpha()
return self._data.koalas.transform_batch(pandas_isalpha)
return self._data.pandas_on_spark.transform_batch(pandas_isalpha)
def isdigit(self) -> "ps.Series":
"""
@ -587,7 +587,7 @@ class StringMethods(object):
def pandas_isdigit(s) -> "ps.Series[bool]":
return s.str.isdigit()
return self._data.koalas.transform_batch(pandas_isdigit)
return self._data.pandas_on_spark.transform_batch(pandas_isdigit)
def isspace(self) -> "ps.Series":
"""
@ -611,7 +611,7 @@ class StringMethods(object):
def pandas_isspace(s) -> "ps.Series[bool]":
return s.str.isspace()
return self._data.koalas.transform_batch(pandas_isspace)
return self._data.pandas_on_spark.transform_batch(pandas_isspace)
def islower(self) -> "ps.Series":
"""
@ -636,7 +636,7 @@ class StringMethods(object):
def pandas_isspace(s) -> "ps.Series[bool]":
return s.str.islower()
return self._data.koalas.transform_batch(pandas_isspace)
return self._data.pandas_on_spark.transform_batch(pandas_isspace)
def isupper(self) -> "ps.Series":
"""
@ -661,7 +661,7 @@ class StringMethods(object):
def pandas_isspace(s) -> "ps.Series[bool]":
return s.str.isupper()
return self._data.koalas.transform_batch(pandas_isspace)
return self._data.pandas_on_spark.transform_batch(pandas_isspace)
def istitle(self) -> "ps.Series":
"""
@ -692,7 +692,7 @@ class StringMethods(object):
def pandas_istitle(s) -> "ps.Series[bool]":
return s.str.istitle()
return self._data.koalas.transform_batch(pandas_istitle)
return self._data.pandas_on_spark.transform_batch(pandas_istitle)
def isnumeric(self) -> "ps.Series":
"""
@ -751,7 +751,7 @@ class StringMethods(object):
def pandas_isnumeric(s) -> "ps.Series[bool]":
return s.str.isnumeric()
return self._data.koalas.transform_batch(pandas_isnumeric)
return self._data.pandas_on_spark.transform_batch(pandas_isnumeric)
def isdecimal(self) -> "ps.Series":
"""
@ -802,7 +802,7 @@ class StringMethods(object):
def pandas_isdecimal(s) -> "ps.Series[bool]":
return s.str.isdecimal()
return self._data.koalas.transform_batch(pandas_isdecimal)
return self._data.pandas_on_spark.transform_batch(pandas_isdecimal)
@no_type_check
def cat(self, others=None, sep=None, na_rep=None, join=None) -> "ps.Series":
@ -846,7 +846,7 @@ class StringMethods(object):
def pandas_center(s) -> "ps.Series[str]":
return s.str.center(width, fillchar)
return self._data.koalas.transform_batch(pandas_center)
return self._data.pandas_on_spark.transform_batch(pandas_center)
def contains(
self, pat: str, case: bool = True, flags: int = 0, na: Any = None, regex: bool = True
@ -966,7 +966,7 @@ class StringMethods(object):
def pandas_contains(s) -> "ps.Series[bool]":
return s.str.contains(pat, case, flags, na, regex)
return self._data.koalas.transform_batch(pandas_contains)
return self._data.pandas_on_spark.transform_batch(pandas_contains)
def count(self, pat: str, flags: int = 0) -> "ps.Series":
"""
@ -1017,7 +1017,7 @@ class StringMethods(object):
def pandas_count(s) -> "ps.Series[int]":
return s.str.count(pat, flags)
return self._data.koalas.transform_batch(pandas_count)
return self._data.pandas_on_spark.transform_batch(pandas_count)
@no_type_check
def decode(self, encoding, errors="strict") -> "ps.Series":
@ -1101,7 +1101,7 @@ class StringMethods(object):
def pandas_find(s) -> "ps.Series[int]":
return s.str.find(sub, start, end)
return self._data.koalas.transform_batch(pandas_find)
return self._data.pandas_on_spark.transform_batch(pandas_find)
def findall(self, pat: str, flags: int = 0) -> "ps.Series":
"""
@ -1231,7 +1231,7 @@ class StringMethods(object):
def pandas_index(s) -> "ps.Series[np.int64]":
return s.str.index(sub, start, end)
return self._data.koalas.transform_batch(pandas_index)
return self._data.pandas_on_spark.transform_batch(pandas_index)
def join(self, sep: str) -> "ps.Series":
"""
@ -1281,7 +1281,7 @@ class StringMethods(object):
def pandas_join(s) -> "ps.Series[str]":
return s.str.join(sep)
return self._data.koalas.transform_batch(pandas_join)
return self._data.pandas_on_spark.transform_batch(pandas_join)
def len(self) -> "ps.Series":
"""
@ -1352,7 +1352,7 @@ class StringMethods(object):
def pandas_ljust(s) -> "ps.Series[str]":
return s.str.ljust(width, fillchar)
return self._data.koalas.transform_batch(pandas_ljust)
return self._data.pandas_on_spark.transform_batch(pandas_ljust)
def match(self, pat: str, case: bool = True, flags: int = 0, na: Any = np.NaN) -> "ps.Series":
"""
@ -1419,7 +1419,7 @@ class StringMethods(object):
def pandas_match(s) -> "ps.Series[bool]":
return s.str.match(pat, case, flags, na)
return self._data.koalas.transform_batch(pandas_match)
return self._data.pandas_on_spark.transform_batch(pandas_match)
def normalize(self, form: str) -> "ps.Series":
"""
@ -1443,7 +1443,7 @@ class StringMethods(object):
def pandas_normalize(s) -> "ps.Series[str]":
return s.str.normalize(form)
return self._data.koalas.transform_batch(pandas_normalize)
return self._data.pandas_on_spark.transform_batch(pandas_normalize)
def pad(self, width: int, side: str = "left", fillchar: str = " ") -> "ps.Series":
"""
@ -1492,7 +1492,7 @@ class StringMethods(object):
def pandas_pad(s) -> "ps.Series[str]":
return s.str.pad(width, side, fillchar)
return self._data.koalas.transform_batch(pandas_pad)
return self._data.pandas_on_spark.transform_batch(pandas_pad)
def partition(self, sep: str = " ", expand: bool = True) -> "ps.Series":
"""
@ -1638,7 +1638,7 @@ class StringMethods(object):
def pandas_replace(s) -> "ps.Series[str]":
return s.str.replace(pat, repl, n=n, case=case, flags=flags, regex=regex)
return self._data.koalas.transform_batch(pandas_replace)
return self._data.pandas_on_spark.transform_batch(pandas_replace)
def rfind(self, sub: str, start: int = 0, end: Optional[int] = None) -> "ps.Series":
"""
@ -1694,7 +1694,7 @@ class StringMethods(object):
def pandas_rfind(s) -> "ps.Series[int]":
return s.str.rfind(sub, start, end)
return self._data.koalas.transform_batch(pandas_rfind)
return self._data.pandas_on_spark.transform_batch(pandas_rfind)
def rindex(self, sub: str, start: int = 0, end: Optional[int] = None) -> "ps.Series":
"""
@ -1738,7 +1738,7 @@ class StringMethods(object):
def pandas_rindex(s) -> "ps.Series[np.int64]":
return s.str.rindex(sub, start, end)
return self._data.koalas.transform_batch(pandas_rindex)
return self._data.pandas_on_spark.transform_batch(pandas_rindex)
def rjust(self, width: int, fillchar: str = " ") -> "ps.Series":
"""
@ -1780,7 +1780,7 @@ class StringMethods(object):
def pandas_rjust(s) -> "ps.Series[str]":
return s.str.rjust(width, fillchar)
return self._data.koalas.transform_batch(pandas_rjust)
return self._data.pandas_on_spark.transform_batch(pandas_rjust)
def rpartition(self, sep: str = " ", expand: bool = True) -> "ps.Series":
"""
@ -1846,7 +1846,7 @@ class StringMethods(object):
def pandas_slice(s) -> "ps.Series[str]":
return s.str.slice(start, stop, step)
return self._data.koalas.transform_batch(pandas_slice)
return self._data.pandas_on_spark.transform_batch(pandas_slice)
def slice_replace(
self, start: Optional[int] = None, stop: Optional[int] = None, repl: Optional[str] = None
@ -1923,7 +1923,7 @@ class StringMethods(object):
def pandas_slice_replace(s) -> "ps.Series[str]":
return s.str.slice_replace(start, stop, repl)
return self._data.koalas.transform_batch(pandas_slice_replace)
return self._data.pandas_on_spark.transform_batch(pandas_slice_replace)
def split(
self, pat: Optional[str] = None, n: int = -1, expand: bool = False
@ -2247,7 +2247,7 @@ class StringMethods(object):
def pandas_translate(s) -> "ps.Series[str]":
return s.str.translate(table)
return self._data.koalas.transform_batch(pandas_translate)
return self._data.pandas_on_spark.transform_batch(pandas_translate)
def wrap(self, width: int, **kwargs: bool) -> "ps.Series":
"""
@ -2299,7 +2299,7 @@ class StringMethods(object):
def pandas_wrap(s) -> "ps.Series[str]":
return s.str.wrap(width, **kwargs)
return self._data.koalas.transform_batch(pandas_wrap)
return self._data.pandas_on_spark.transform_batch(pandas_wrap)
def zfill(self, width: int) -> "ps.Series":
"""
@ -2350,7 +2350,7 @@ class StringMethods(object):
def pandas_zfill(s) -> "ps.Series[str]":
return s.str.zfill(width)
return self._data.koalas.transform_batch(pandas_zfill)
return self._data.pandas_on_spark.transform_batch(pandas_zfill)
@no_type_check
def get_dummies(self, sep: str = "|") -> "ps.DataFrame":

View file

@ -301,7 +301,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
pdf, psdf = self.df_pair
self.assert_eq(
psdf.koalas.apply_batch(lambda pdf: pdf.astype(str)).sort_index(),
psdf.pandas_on_spark.apply_batch(lambda pdf: pdf.astype(str)).sort_index(),
pdf.astype(str).sort_index(),
)
@ -313,7 +313,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
dtype = CategoricalDtype(categories=["a", "b", "c", "d"])
self.assert_eq(
psdf.koalas.apply_batch(lambda pdf: pdf.astype(dtype)).sort_index(),
psdf.pandas_on_spark.apply_batch(lambda pdf: pdf.astype(dtype)).sort_index(),
pdf.astype(dtype).sort_index(),
)
@ -327,7 +327,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
return pdf.astype(str)
self.assert_eq(
psdf.koalas.apply_batch(to_str).sort_values(["a", "b"]).reset_index(drop=True),
psdf.pandas_on_spark.apply_batch(to_str).sort_values(["a", "b"]).reset_index(drop=True),
to_str(pdf).sort_values(["a", "b"]).reset_index(drop=True),
)
@ -343,7 +343,9 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
return pdf.astype(dtype)
self.assert_eq(
psdf.koalas.apply_batch(to_category).sort_values(["a", "b"]).reset_index(drop=True),
psdf.pandas_on_spark.apply_batch(to_category)
.sort_values(["a", "b"])
.reset_index(drop=True),
to_category(pdf).sort_values(["a", "b"]).reset_index(drop=True),
)
@ -351,11 +353,11 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
pdf, psdf = self.df_pair
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf: pdf.astype(str)).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.astype(str)).sort_index(),
pdf.astype(str).sort_index(),
)
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf: pdf.b.cat.codes).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.b.cat.codes).sort_index(),
pdf.b.cat.codes.sort_index(),
)
@ -367,11 +369,11 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
dtype = CategoricalDtype(categories=["a", "b", "c", "d"])
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf: pdf.astype(dtype)).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.astype(dtype)).sort_index(),
pdf.astype(dtype).sort_index(),
)
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf: pdf.b.astype(dtype)).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.b.astype(dtype)).sort_index(),
pdf.b.astype(dtype).sort_index(),
)
@ -385,14 +387,14 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
return pdf.astype(str)
self.assert_eq(
psdf.koalas.transform_batch(to_str).sort_index(), to_str(pdf).sort_index(),
psdf.pandas_on_spark.transform_batch(to_str).sort_index(), to_str(pdf).sort_index(),
)
def to_codes(pdf) -> ps.Series[np.int8]:
return pdf.b.cat.codes
self.assert_eq(
psdf.koalas.transform_batch(to_codes).sort_index(), to_codes(pdf).sort_index(),
psdf.pandas_on_spark.transform_batch(to_codes).sort_index(), to_codes(pdf).sort_index(),
)
pdf = pd.DataFrame(
@ -407,14 +409,15 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
return pdf.astype(dtype)
self.assert_eq(
psdf.koalas.transform_batch(to_category).sort_index(), to_category(pdf).sort_index(),
psdf.pandas_on_spark.transform_batch(to_category).sort_index(),
to_category(pdf).sort_index(),
)
def to_category(pdf) -> ps.Series[dtype]:
return pdf.b.astype(dtype)
self.assert_eq(
psdf.koalas.transform_batch(to_category).sort_index(),
psdf.pandas_on_spark.transform_batch(to_category).sort_index(),
to_category(pdf).rename().sort_index(),
)
@ -422,7 +425,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
pdf, psdf = self.df_pair
self.assert_eq(
psdf.a.koalas.transform_batch(lambda pser: pser.astype(str)).sort_index(),
psdf.a.pandas_on_spark.transform_batch(lambda pser: pser.astype(str)).sort_index(),
pdf.a.astype(str).sort_index(),
)
@ -434,7 +437,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
dtype = CategoricalDtype(categories=["a", "b", "c", "d"])
self.assert_eq(
psdf.a.koalas.transform_batch(lambda pser: pser.astype(dtype)).sort_index(),
psdf.a.pandas_on_spark.transform_batch(lambda pser: pser.astype(dtype)).sort_index(),
pdf.a.astype(dtype).sort_index(),
)
@ -448,7 +451,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
return pser.astype(str)
self.assert_eq(
psdf.a.koalas.transform_batch(to_str).sort_index(), to_str(pdf.a).sort_index()
psdf.a.pandas_on_spark.transform_batch(to_str).sort_index(), to_str(pdf.a).sort_index()
)
pdf = pd.DataFrame(
@ -462,7 +465,8 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils):
return pser.astype(dtype)
self.assert_eq(
psdf.a.koalas.transform_batch(to_category).sort_index(), to_category(pdf.a).sort_index()
psdf.a.pandas_on_spark.transform_batch(to_category).sort_index(),
to_category(pdf.a).sort_index(),
)

View file

@ -4325,30 +4325,31 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
psdf = ps.DataFrame(pdf)
self.assert_eq(
psdf.koalas.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(),
psdf.pandas_on_spark.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(),
(pdf + 1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.koalas.apply_batch(lambda pdf: pdf + 1).sort_index(), (pdf + 1).sort_index()
psdf.pandas_on_spark.apply_batch(lambda pdf: pdf + 1).sort_index(),
(pdf + 1).sort_index(),
)
self.assert_eq(
psdf.koalas.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(),
psdf.pandas_on_spark.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(),
(pdf + 1).sort_index(),
)
with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
psdf.koalas.apply_batch(1)
psdf.pandas_on_spark.apply_batch(1)
with self.assertRaisesRegex(TypeError, "The given function.*frame as its type hints"):
def f2(_) -> ps.Series[int]:
pass
psdf.koalas.apply_batch(f2)
psdf.pandas_on_spark.apply_batch(f2)
with self.assertRaisesRegex(ValueError, "The given function should return a frame"):
psdf.koalas.apply_batch(lambda pdf: 1)
psdf.pandas_on_spark.apply_batch(lambda pdf: 1)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
@ -4356,11 +4357,12 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
psdf.columns = columns
self.assert_eq(
psdf.koalas.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.koalas.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(),
(pdf + 1).sort_index(),
)
def test_transform_batch(self):
@ -4376,46 +4378,46 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
psdf = ps.DataFrame(pdf)
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf: pdf.c + 1).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.c + 1).sort_index(),
(pdf.c + 1).sort_index(),
)
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(),
(pdf + 1).sort_index(),
)
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(),
(pdf.c + 1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf: pdf + 1).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf + 1).sort_index(),
(pdf + 1).sort_index(),
)
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf: pdf.b + 1).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.b + 1).sort_index(),
(pdf.b + 1).sort_index(),
)
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(),
(pdf + 1).sort_index(),
)
self.assert_eq(
psdf.koalas.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(),
psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(),
(pdf.c + 1).sort_index(),
)
with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
psdf.koalas.transform_batch(1)
psdf.pandas_on_spark.transform_batch(1)
with self.assertRaisesRegex(ValueError, "The given function should return a frame"):
psdf.koalas.transform_batch(lambda pdf: 1)
psdf.pandas_on_spark.transform_batch(lambda pdf: 1)
with self.assertRaisesRegex(
ValueError, "transform_batch cannot produce aggregated results"
):
psdf.koalas.transform_batch(lambda pdf: pd.Series(1))
psdf.pandas_on_spark.transform_batch(lambda pdf: pd.Series(1))
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
@ -4423,16 +4425,18 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
psdf.columns = columns
self.assert_eq(
psdf.koalas.transform_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
psdf.pandas_on_spark.transform_batch(lambda x: x + 1).sort_index(),
(pdf + 1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.koalas.transform_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
psdf.pandas_on_spark.transform_batch(lambda x: x + 1).sort_index(),
(pdf + 1).sort_index(),
)
def test_transform_batch_same_anchor(self):
psdf = ps.range(10)
psdf["d"] = psdf.koalas.transform_batch(lambda pdf: pdf.id + 1)
psdf["d"] = psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.id + 1)
self.assert_eq(
psdf,
pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]),
@ -4443,7 +4447,7 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
def plus_one(pdf) -> ps.Series[np.int64]:
return pdf.id + 1
psdf["d"] = psdf.koalas.transform_batch(plus_one)
psdf["d"] = psdf.pandas_on_spark.transform_batch(plus_one)
self.assert_eq(
psdf,
pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]),
@ -4454,7 +4458,7 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
def plus_one(ser) -> ps.Series[np.int64]:
return ser + 1
psdf["d"] = psdf.id.koalas.transform_batch(plus_one)
psdf["d"] = psdf.id.pandas_on_spark.transform_batch(plus_one)
self.assert_eq(
psdf,
pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]),