From 7e2717333b980b7c9a258ea3dff1619cf2351cbf Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 1 Jun 2021 10:33:10 +0900 Subject: [PATCH] [SPARK-35453][PYTHON] Move Koalas accessor to pandas_on_spark accessor ### What changes were proposed in this pull request? This PR proposes renaming the existing "Koalas Accessor" to "Pandas API on Spark Accessor". ### Why are the changes needed? Because we don't use name "Koalas" anymore, rather use "Pandas API on Spark". So, the related code bases are all need to be changed. ### Does this PR introduce _any_ user-facing change? Yes, the usage of pandas API on Spark accessor is changed from `df.koalas.[...]`. to `df.pandas_on_spark.[...]`. **Note:** `df.koalas.[...]` is still available but with deprecated warnings. ### How was this patch tested? Manually tested in local and checked one by one. Closes #32674 from itholic/SPARK-35453. Authored-by: itholic Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/accessors.py | 61 +++++++-------- python/pyspark/pandas/datetimes.py | 38 +++++----- python/pyspark/pandas/frame.py | 17 +++-- python/pyspark/pandas/indexes/datetimes.py | 8 +- python/pyspark/pandas/namespace.py | 4 +- python/pyspark/pandas/series.py | 7 +- python/pyspark/pandas/strings.py | 74 +++++++++---------- .../pyspark/pandas/tests/test_categorical.py | 36 +++++---- python/pyspark/pandas/tests/test_dataframe.py | 50 +++++++------ 9 files changed, 155 insertions(+), 140 deletions(-) diff --git a/python/pyspark/pandas/accessors.py b/python/pyspark/pandas/accessors.py index d63654fef4..d2f6771ea9 100644 --- a/python/pyspark/pandas/accessors.py +++ b/python/pyspark/pandas/accessors.py @@ -88,19 +88,19 @@ class PandasOnSparkFrameMethods(object): Examples -------- >>> df = ps.DataFrame({"x": ['a', 'b', 'c']}) - >>> df.koalas.attach_id_column(id_type="sequence", column="id") + >>> df.pandas_on_spark.attach_id_column(id_type="sequence", column="id") x id 0 a 0 1 b 1 2 c 2 - >>> df.koalas.attach_id_column(id_type="distributed-sequence", column=0) + >>> df.pandas_on_spark.attach_id_column(id_type="distributed-sequence", column=0) x 0 0 a 0 1 b 1 2 c 2 - >>> df.koalas.attach_id_column(id_type="distributed", column=0.0) + >>> df.pandas_on_spark.attach_id_column(id_type="distributed", column=0.0) ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE x 0.0 0 a ... @@ -110,14 +110,14 @@ class PandasOnSparkFrameMethods(object): For multi-index columns: >>> df = ps.DataFrame({("x", "y"): ['a', 'b', 'c']}) - >>> df.koalas.attach_id_column(id_type="sequence", column=("id-x", "id-y")) + >>> df.pandas_on_spark.attach_id_column(id_type="sequence", column=("id-x", "id-y")) x id-x y id-y 0 a 0 1 b 1 2 c 2 - >>> df.koalas.attach_id_column(id_type="distributed-sequence", column=(0, 1.0)) + >>> df.pandas_on_spark.attach_id_column(id_type="distributed-sequence", column=(0, 1.0)) x 0 y 1.0 0 a 0 @@ -205,7 +205,7 @@ class PandasOnSparkFrameMethods(object): ... return pd.DataFrame([len(pdf)]) ... >>> df = ps.DataFrame({'A': range(1000)}) - >>> df.koalas.apply_batch(length) # doctest: +SKIP + >>> df.pandas_on_spark.apply_batch(length) # doctest: +SKIP c0 0 83 1 83 @@ -263,7 +263,7 @@ class PandasOnSparkFrameMethods(object): DataFrame.applymap: For elementwise operations. DataFrame.aggregate: Only perform aggregating type operations. DataFrame.transform: Only perform transforming type operations. - Series.koalas.transform_batch: transform the search as each pandas chunks. + Series.pandas_on_spark.transform_batch: transform the search as each pandas chunks. Examples -------- @@ -276,19 +276,19 @@ class PandasOnSparkFrameMethods(object): >>> def query_func(pdf) -> ps.DataFrame[int, int]: ... return pdf.query('A == 1') - >>> df.koalas.apply_batch(query_func) + >>> df.pandas_on_spark.apply_batch(query_func) c0 c1 0 1 2 >>> def query_func(pdf) -> ps.DataFrame["A": int, "B": int]: ... return pdf.query('A == 1') - >>> df.koalas.apply_batch(query_func) + >>> df.pandas_on_spark.apply_batch(query_func) A B 0 1 2 You can also omit the type hints so pandas-on-Spark infers the return schema as below: - >>> df.koalas.apply_batch(lambda pdf: pdf.query('A == 1')) + >>> df.pandas_on_spark.apply_batch(lambda pdf: pdf.query('A == 1')) A B 0 1 2 @@ -296,7 +296,7 @@ class PandasOnSparkFrameMethods(object): >>> def calculation(pdf, y, z) -> ps.DataFrame[int, int]: ... return pdf ** y + z - >>> df.koalas.apply_batch(calculation, args=(10,), z=20) + >>> df.pandas_on_spark.apply_batch(calculation, args=(10,), z=20) c0 c1 0 21 1044 1 59069 1048596 @@ -304,13 +304,13 @@ class PandasOnSparkFrameMethods(object): You can also use ``np.ufunc`` and built-in functions as input. - >>> df.koalas.apply_batch(np.add, args=(10,)) + >>> df.pandas_on_spark.apply_batch(np.add, args=(10,)) A B 0 11 12 1 13 14 2 15 16 - >>> (df * -1).koalas.apply_batch(abs) + >>> (df * -1).pandas_on_spark.apply_batch(abs) A B 0 1 2 1 3 4 @@ -411,7 +411,7 @@ class PandasOnSparkFrameMethods(object): ... return pd.DataFrame([len(pdf)] * len(pdf)) ... >>> df = ps.DataFrame({'A': range(1000)}) - >>> df.koalas.transform_batch(length) # doctest: +SKIP + >>> df.pandas_on_spark.transform_batch(length) # doctest: +SKIP c0 0 83 1 83 @@ -461,8 +461,8 @@ class PandasOnSparkFrameMethods(object): See Also -------- - DataFrame.koalas.apply_batch: For row/columnwise operations. - Series.koalas.transform_batch: transform the search as each pandas chunks. + DataFrame.pandas_on_spark.apply_batch: For row/columnwise operations. + Series.pandas_on_spark.transform_batch: transform the search as each pandas chunks. Examples -------- @@ -475,7 +475,7 @@ class PandasOnSparkFrameMethods(object): >>> def plus_one_func(pdf) -> ps.DataFrame[int, int]: ... return pdf + 1 - >>> df.koalas.transform_batch(plus_one_func) + >>> df.pandas_on_spark.transform_batch(plus_one_func) c0 c1 0 2 3 1 4 5 @@ -483,7 +483,7 @@ class PandasOnSparkFrameMethods(object): >>> def plus_one_func(pdf) -> ps.DataFrame['A': int, 'B': int]: ... return pdf + 1 - >>> df.koalas.transform_batch(plus_one_func) + >>> df.pandas_on_spark.transform_batch(plus_one_func) A B 0 2 3 1 4 5 @@ -491,7 +491,7 @@ class PandasOnSparkFrameMethods(object): >>> def plus_one_func(pdf) -> ps.Series[int]: ... return pdf.B + 1 - >>> df.koalas.transform_batch(plus_one_func) + >>> df.pandas_on_spark.transform_batch(plus_one_func) 0 3 1 5 2 7 @@ -499,13 +499,13 @@ class PandasOnSparkFrameMethods(object): You can also omit the type hints so pandas-on-Spark infers the return schema as below: - >>> df.koalas.transform_batch(lambda pdf: pdf + 1) + >>> df.pandas_on_spark.transform_batch(lambda pdf: pdf + 1) A B 0 2 3 1 4 5 2 6 7 - >>> (df * -1).koalas.transform_batch(abs) + >>> (df * -1).pandas_on_spark.transform_batch(abs) A B 0 1 2 1 3 4 @@ -513,7 +513,7 @@ class PandasOnSparkFrameMethods(object): Note that you should not transform the index. The index information will not change. - >>> df.koalas.transform_batch(lambda pdf: pdf.B + 1) + >>> df.pandas_on_spark.transform_batch(lambda pdf: pdf.B + 1) 0 3 1 5 2 7 @@ -521,7 +521,7 @@ class PandasOnSparkFrameMethods(object): You can also specify extra arguments as below. - >>> df.koalas.transform_batch(lambda pdf, a, b, c: pdf.B + a + b + c, 1, 2, c=3) + >>> df.pandas_on_spark.transform_batch(lambda pdf, a, b, c: pdf.B + a + b + c, 1, 2, c=3) 0 8 1 10 2 12 @@ -720,7 +720,7 @@ class PandasOnSparkSeriesMethods(object): ... return pd.Series([len(pser)] * len(pser)) ... >>> df = ps.DataFrame({'A': range(1000)}) - >>> df.A.koalas.transform_batch(length) # doctest: +SKIP + >>> df.A.pandas_on_spark.transform_batch(length) # doctest: +SKIP c0 0 83 1 83 @@ -751,7 +751,8 @@ class PandasOnSparkSeriesMethods(object): See Also -------- - DataFrame.koalas.apply_batch : Similar but it takes pandas DataFrame as its internal batch. + DataFrame.pandas_on_spark.apply_batch : Similar but it takes pandas DataFrame as its + internal batch. Examples -------- @@ -764,7 +765,7 @@ class PandasOnSparkSeriesMethods(object): >>> def plus_one_func(pser) -> ps.Series[np.int64]: ... return pser + 1 - >>> df.A.koalas.transform_batch(plus_one_func) + >>> df.A.pandas_on_spark.transform_batch(plus_one_func) 0 2 1 4 2 6 @@ -772,7 +773,7 @@ class PandasOnSparkSeriesMethods(object): You can also omit the type hints so pandas-on-Spark infers the return schema as below: - >>> df.A.koalas.transform_batch(lambda pser: pser + 1) + >>> df.A.pandas_on_spark.transform_batch(lambda pser: pser + 1) 0 2 1 4 2 6 @@ -782,7 +783,7 @@ class PandasOnSparkSeriesMethods(object): >>> def plus_one_func(pser, a, b, c=3) -> ps.Series[np.int64]: ... return pser + a + b + c - >>> df.A.koalas.transform_batch(plus_one_func, 1, b=2) + >>> df.A.pandas_on_spark.transform_batch(plus_one_func, 1, b=2) 0 7 1 9 2 11 @@ -790,13 +791,13 @@ class PandasOnSparkSeriesMethods(object): You can also use ``np.ufunc`` and built-in functions as input. - >>> df.A.koalas.transform_batch(np.add, 10) + >>> df.A.pandas_on_spark.transform_batch(np.add, 10) 0 11 1 13 2 15 Name: A, dtype: int64 - >>> (df * -1).A.koalas.transform_batch(abs) + >>> (df * -1).A.pandas_on_spark.transform_batch(abs) 0 1 1 3 2 5 diff --git a/python/pyspark/pandas/datetimes.py b/python/pyspark/pandas/datetimes.py index 6062a1a760..d9a801f291 100644 --- a/python/pyspark/pandas/datetimes.py +++ b/python/pyspark/pandas/datetimes.py @@ -111,7 +111,7 @@ class DatetimeMethods(object): def pandas_microsecond(s) -> "ps.Series[np.int64]": return s.dt.microsecond - return self._data.koalas.transform_batch(pandas_microsecond) + return self._data.pandas_on_spark.transform_batch(pandas_microsecond) @property def nanosecond(self) -> "ps.Series": @@ -171,7 +171,7 @@ class DatetimeMethods(object): def pandas_dayofweek(s) -> "ps.Series[np.int64]": return s.dt.dayofweek - return self._data.koalas.transform_batch(pandas_dayofweek) + return self._data.pandas_on_spark.transform_batch(pandas_dayofweek) @property def weekday(self) -> "ps.Series": @@ -189,7 +189,7 @@ class DatetimeMethods(object): def pandas_dayofyear(s) -> "ps.Series[np.int64]": return s.dt.dayofyear - return self._data.koalas.transform_batch(pandas_dayofyear) + return self._data.pandas_on_spark.transform_batch(pandas_dayofyear) @property def quarter(self) -> "ps.Series": @@ -201,7 +201,7 @@ class DatetimeMethods(object): def pandas_quarter(s) -> "ps.Series[np.int64]": return s.dt.quarter - return self._data.koalas.transform_batch(pandas_quarter) + return self._data.pandas_on_spark.transform_batch(pandas_quarter) @property def is_month_start(self) -> "ps.Series": @@ -241,7 +241,7 @@ class DatetimeMethods(object): def pandas_is_month_start(s) -> "ps.Series[bool]": return s.dt.is_month_start - return self._data.koalas.transform_batch(pandas_is_month_start) + return self._data.pandas_on_spark.transform_batch(pandas_is_month_start) @property def is_month_end(self) -> "ps.Series": @@ -281,7 +281,7 @@ class DatetimeMethods(object): def pandas_is_month_end(s) -> "ps.Series[bool]": return s.dt.is_month_end - return self._data.koalas.transform_batch(pandas_is_month_end) + return self._data.pandas_on_spark.transform_batch(pandas_is_month_end) @property def is_quarter_start(self) -> "ps.Series": @@ -332,7 +332,7 @@ class DatetimeMethods(object): def pandas_is_quarter_start(s) -> "ps.Series[bool]": return s.dt.is_quarter_start - return self._data.koalas.transform_batch(pandas_is_quarter_start) + return self._data.pandas_on_spark.transform_batch(pandas_is_quarter_start) @property def is_quarter_end(self) -> "ps.Series": @@ -383,7 +383,7 @@ class DatetimeMethods(object): def pandas_is_quarter_end(s) -> "ps.Series[bool]": return s.dt.is_quarter_end - return self._data.koalas.transform_batch(pandas_is_quarter_end) + return self._data.pandas_on_spark.transform_batch(pandas_is_quarter_end) @property def is_year_start(self) -> "ps.Series": @@ -423,7 +423,7 @@ class DatetimeMethods(object): def pandas_is_year_start(s) -> "ps.Series[bool]": return s.dt.is_year_start - return self._data.koalas.transform_batch(pandas_is_year_start) + return self._data.pandas_on_spark.transform_batch(pandas_is_year_start) @property def is_year_end(self) -> "ps.Series": @@ -463,7 +463,7 @@ class DatetimeMethods(object): def pandas_is_year_end(s) -> "ps.Series[bool]": return s.dt.is_year_end - return self._data.koalas.transform_batch(pandas_is_year_end) + return self._data.pandas_on_spark.transform_batch(pandas_is_year_end) @property def is_leap_year(self) -> "ps.Series": @@ -503,7 +503,7 @@ class DatetimeMethods(object): def pandas_is_leap_year(s) -> "ps.Series[bool]": return s.dt.is_leap_year - return self._data.koalas.transform_batch(pandas_is_leap_year) + return self._data.pandas_on_spark.transform_batch(pandas_is_leap_year) @property def daysinmonth(self) -> "ps.Series": @@ -515,7 +515,7 @@ class DatetimeMethods(object): def pandas_daysinmonth(s) -> "ps.Series[np.int64]": return s.dt.daysinmonth - return self._data.koalas.transform_batch(pandas_daysinmonth) + return self._data.pandas_on_spark.transform_batch(pandas_daysinmonth) @property def days_in_month(self) -> "ps.Series": @@ -578,7 +578,7 @@ class DatetimeMethods(object): def pandas_normalize(s) -> "ps.Series[np.datetime64]": return s.dt.normalize() - return self._data.koalas.transform_batch(pandas_normalize) + return self._data.pandas_on_spark.transform_batch(pandas_normalize) def strftime(self, date_format: str) -> "ps.Series": """ @@ -627,7 +627,7 @@ class DatetimeMethods(object): def pandas_strftime(s) -> "ps.Series[str]": return s.dt.strftime(date_format) - return self._data.koalas.transform_batch(pandas_strftime) + return self._data.pandas_on_spark.transform_batch(pandas_strftime) def round(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.Series": """ @@ -683,7 +683,7 @@ class DatetimeMethods(object): def pandas_round(s) -> "ps.Series[np.datetime64]": return s.dt.round(freq, *args, **kwargs) - return self._data.koalas.transform_batch(pandas_round) + return self._data.pandas_on_spark.transform_batch(pandas_round) def floor(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.Series": """ @@ -739,7 +739,7 @@ class DatetimeMethods(object): def pandas_floor(s) -> "ps.Series[np.datetime64]": return s.dt.floor(freq, *args, **kwargs) - return self._data.koalas.transform_batch(pandas_floor) + return self._data.pandas_on_spark.transform_batch(pandas_floor) def ceil(self, freq: Union[str, DateOffset], *args: Any, **kwargs: Any) -> "ps.Series": """ @@ -795,7 +795,7 @@ class DatetimeMethods(object): def pandas_ceil(s) -> "ps.Series[np.datetime64]": return s.dt.ceil(freq, *args, **kwargs) - return self._data.koalas.transform_batch(pandas_ceil) + return self._data.pandas_on_spark.transform_batch(pandas_ceil) def month_name(self, locale: Optional[str] = None) -> "ps.Series": """ @@ -832,7 +832,7 @@ class DatetimeMethods(object): def pandas_month_name(s) -> "ps.Series[str]": return s.dt.month_name(locale=locale) - return self._data.koalas.transform_batch(pandas_month_name) + return self._data.pandas_on_spark.transform_batch(pandas_month_name) def day_name(self, locale: Optional[str] = None) -> "ps.Series": """ @@ -869,7 +869,7 @@ class DatetimeMethods(object): def pandas_day_name(s) -> "ps.Series[str]": return s.dt.day_name(locale=locale) - return self._data.koalas.transform_batch(pandas_day_name) + return self._data.pandas_on_spark.transform_batch(pandas_day_name) def _test() -> None: diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 709696eb05..baa3ccf389 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -882,6 +882,9 @@ class DataFrame(Frame, Generic[T]): spark = CachedAccessor("spark", SparkFrameMethods) # create accessor for pandas-on-Spark specific methods. + pandas_on_spark = CachedAccessor("pandas_on_spark", PandasOnSparkFrameMethods) + + # keep the name "koalas" for backward compatibility. koalas = CachedAccessor("koalas", PandasOnSparkFrameMethods) def hist(self, bins=10, **kwds): @@ -2732,7 +2735,7 @@ defaultdict(, {'col..., 'col...})] as_nullable_spark_type(psdf._internal.spark_type_for(output_label)) ) applied.append( - psser.koalas._transform_batch( + psser.pandas_on_spark._transform_batch( func=lambda c: func(c, *args, **kwargs), return_type=SeriesType(dtype, return_schema), ) @@ -2744,7 +2747,7 @@ defaultdict(, {'col..., 'col...})] return DataFrame(internal) else: return self._apply_series_op( - lambda psser: psser.koalas.transform_batch(func, *args, **kwargs) + lambda psser: psser.pandas_on_spark.transform_batch(func, *args, **kwargs) ) def pop(self, item) -> "DataFrame": @@ -3042,7 +3045,7 @@ defaultdict(, {'col..., 'col...})] # default index, which will never be used. So use "distributed" index as a dummy to # avoid overhead. with option_context("compute.default_index_type", "distributed"): - psdf = psdf.koalas.apply_batch(pandas_between_time) + psdf = psdf.pandas_on_spark.apply_batch(pandas_between_time) return DataFrame( self._internal.copy( @@ -3124,7 +3127,7 @@ defaultdict(, {'col..., 'col...})] # a default index, which will never be used. So use "distributed" index as a dummy # to avoid overhead. with option_context("compute.default_index_type", "distributed"): - psdf = psdf.koalas.apply_batch(pandas_at_time) + psdf = psdf.pandas_on_spark.apply_batch(pandas_at_time) return DataFrame( self._internal.copy( @@ -10739,7 +10742,7 @@ defaultdict(, {'col..., 'col...})] .. note:: This API delegates to Spark SQL so the syntax follows Spark SQL. Therefore, the pandas specific syntax such as `@` is not supported. If you want the pandas syntax, - you can work around with :meth:`DataFrame.koalas.apply_batch`, but you should + you can work around with :meth:`DataFrame.pandas_on_spark.apply_batch`, but you should be aware that `query_func` will be executed at different nodes in a distributed manner. So, for example, to use `@` syntax, make sure the variable is serialized by, for example, putting it within the closure as below. @@ -10748,7 +10751,7 @@ defaultdict(, {'col..., 'col...})] >>> def query_func(pdf): ... num = 1995 ... return pdf.query('A > @num') - >>> df.koalas.apply_batch(query_func) + >>> df.pandas_on_spark.apply_batch(query_func) A B 1996 1996 1996 1997 1997 1997 @@ -11020,7 +11023,7 @@ defaultdict(, {'col..., 'col...})] result_inner = pd.Series(result_inner).to_frame() return result_inner - result = self.koalas.apply_batch(eval_func) + result = self.pandas_on_spark.apply_batch(eval_func) if inplace: # Here, the result is always a frame because the error is thrown during schema inference # from pandas. diff --git a/python/pyspark/pandas/indexes/datetimes.py b/python/pyspark/pandas/indexes/datetimes.py index 6cec5b0f4b..6998adf99d 100644 --- a/python/pyspark/pandas/indexes/datetimes.py +++ b/python/pyspark/pandas/indexes/datetimes.py @@ -688,11 +688,11 @@ class DatetimeIndex(Index): psdf = self.to_frame()[[]] id_column_name = verify_temp_column_name(psdf, "__id_column__") - psdf = psdf.koalas.attach_id_column("distributed-sequence", id_column_name) + psdf = psdf.pandas_on_spark.attach_id_column("distributed-sequence", id_column_name) with ps.option_context("compute.default_index_type", "distributed"): # The attached index in the statement below will be dropped soon, # so we enforce “distributed” default index type - psdf = psdf.koalas.apply_batch(pandas_between_time) + psdf = psdf.pandas_on_spark.apply_batch(pandas_between_time) return ps.Index(first_series(psdf).rename(self.name)) def indexer_at_time(self, time: Union[datetime.time, str], asof: bool = False) -> Index: @@ -734,11 +734,11 @@ class DatetimeIndex(Index): psdf = self.to_frame()[[]] id_column_name = verify_temp_column_name(psdf, "__id_column__") - psdf = psdf.koalas.attach_id_column("distributed-sequence", id_column_name) + psdf = psdf.pandas_on_spark.attach_id_column("distributed-sequence", id_column_name) with ps.option_context("compute.default_index_type", "distributed"): # The attached index in the statement below will be dropped soon, # so we enforce “distributed” default index type - psdf = psdf.koalas.apply_batch(pandas_at_time) + psdf = psdf.pandas_on_spark.apply_batch(pandas_at_time) return ps.Index(first_series(psdf).rename(self.name)) diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index 961d888566..a74456d5c8 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -1592,10 +1592,10 @@ def to_datetime( ) if isinstance(arg, Series): - return arg.koalas.transform_batch(pandas_to_datetime) + return arg.pandas_on_spark.transform_batch(pandas_to_datetime) if isinstance(arg, DataFrame): psdf = arg[["year", "month", "day"]] - return psdf.koalas.transform_batch(pandas_to_datetime) + return psdf.pandas_on_spark.transform_batch(pandas_to_datetime) return pd.to_datetime( arg, errors=errors, diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index e20ddbc7ee..a4f3aa4fe5 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -650,6 +650,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]): ) # create accessor for pandas-on-Spark specific methods. + pandas_on_spark = CachedAccessor("pandas_on_spark", PandasOnSparkSeriesMethods) + + # keep the name "koalas" for backward compatibility. koalas = CachedAccessor("koalas", PandasOnSparkSeriesMethods) # Comparison Operators @@ -3098,7 +3101,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]): apply_each = wraps(func)(lambda s: s.apply(func, args=args, **kwds)) if should_infer_schema: - return self.koalas._transform_batch(apply_each, None) + return self.pandas_on_spark._transform_batch(apply_each, None) else: sig_return = infer_return_type(func) if not isinstance(sig_return, ScalarType): @@ -3107,7 +3110,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]): "but found type {}".format(sig_return) ) return_type = cast(ScalarType, sig_return) - return self.koalas._transform_batch(apply_each, return_type) + return self.pandas_on_spark._transform_batch(apply_each, return_type) # TODO: not all arguments are implemented comparing to pandas' for now. def aggregate(self, func: Union[str, List[str]]) -> Union[Scalar, "Series"]: diff --git a/python/pyspark/pandas/strings.py b/python/pyspark/pandas/strings.py index b047a7ca2e..1d9b581447 100644 --- a/python/pyspark/pandas/strings.py +++ b/python/pyspark/pandas/strings.py @@ -77,7 +77,7 @@ class StringMethods(object): def pandas_capitalize(s) -> "ps.Series[str]": return s.str.capitalize() - return self._data.koalas.transform_batch(pandas_capitalize) + return self._data.pandas_on_spark.transform_batch(pandas_capitalize) def title(self) -> "ps.Series": """ @@ -105,7 +105,7 @@ class StringMethods(object): def pandas_title(s) -> "ps.Series[str]": return s.str.title() - return self._data.koalas.transform_batch(pandas_title) + return self._data.pandas_on_spark.transform_batch(pandas_title) def lower(self) -> "ps.Series": """ @@ -179,7 +179,7 @@ class StringMethods(object): def pandas_swapcase(s) -> "ps.Series[str]": return s.str.swapcase() - return self._data.koalas.transform_batch(pandas_swapcase) + return self._data.pandas_on_spark.transform_batch(pandas_swapcase) def startswith(self, pattern: str, na: Optional[Any] = None) -> "ps.Series": """ @@ -231,7 +231,7 @@ class StringMethods(object): def pandas_startswith(s) -> "ps.Series[bool]": return s.str.startswith(pattern, na) - return self._data.koalas.transform_batch(pandas_startswith) + return self._data.pandas_on_spark.transform_batch(pandas_startswith) def endswith(self, pattern: str, na: Optional[Any] = None) -> "ps.Series": """ @@ -283,7 +283,7 @@ class StringMethods(object): def pandas_endswith(s) -> "ps.Series[bool]": return s.str.endswith(pattern, na) - return self._data.koalas.transform_batch(pandas_endswith) + return self._data.pandas_on_spark.transform_batch(pandas_endswith) def strip(self, to_strip: Optional[str] = None) -> "ps.Series": """ @@ -336,7 +336,7 @@ class StringMethods(object): def pandas_strip(s) -> "ps.Series[str]": return s.str.strip(to_strip) - return self._data.koalas.transform_batch(pandas_strip) + return self._data.pandas_on_spark.transform_batch(pandas_strip) def lstrip(self, to_strip: Optional[str] = None) -> "ps.Series": """ @@ -377,7 +377,7 @@ class StringMethods(object): def pandas_lstrip(s) -> "ps.Series[str]": return s.str.lstrip(to_strip) - return self._data.koalas.transform_batch(pandas_lstrip) + return self._data.pandas_on_spark.transform_batch(pandas_lstrip) def rstrip(self, to_strip: Optional[str] = None) -> "ps.Series": """ @@ -418,7 +418,7 @@ class StringMethods(object): def pandas_rstrip(s) -> "ps.Series[str]": return s.str.rstrip(to_strip) - return self._data.koalas.transform_batch(pandas_rstrip) + return self._data.pandas_on_spark.transform_batch(pandas_rstrip) def get(self, i: int) -> "ps.Series": """ @@ -473,7 +473,7 @@ class StringMethods(object): def pandas_get(s) -> "ps.Series[str]": return s.str.get(i) - return self._data.koalas.transform_batch(pandas_get) + return self._data.pandas_on_spark.transform_batch(pandas_get) def isalnum(self) -> "ps.Series": """ @@ -510,7 +510,7 @@ class StringMethods(object): def pandas_isalnum(s) -> "ps.Series[bool]": return s.str.isalnum() - return self._data.koalas.transform_batch(pandas_isalnum) + return self._data.pandas_on_spark.transform_batch(pandas_isalnum) def isalpha(self) -> "ps.Series": """ @@ -536,7 +536,7 @@ class StringMethods(object): def pandas_isalpha(s) -> "ps.Series[bool]": return s.str.isalpha() - return self._data.koalas.transform_batch(pandas_isalpha) + return self._data.pandas_on_spark.transform_batch(pandas_isalpha) def isdigit(self) -> "ps.Series": """ @@ -587,7 +587,7 @@ class StringMethods(object): def pandas_isdigit(s) -> "ps.Series[bool]": return s.str.isdigit() - return self._data.koalas.transform_batch(pandas_isdigit) + return self._data.pandas_on_spark.transform_batch(pandas_isdigit) def isspace(self) -> "ps.Series": """ @@ -611,7 +611,7 @@ class StringMethods(object): def pandas_isspace(s) -> "ps.Series[bool]": return s.str.isspace() - return self._data.koalas.transform_batch(pandas_isspace) + return self._data.pandas_on_spark.transform_batch(pandas_isspace) def islower(self) -> "ps.Series": """ @@ -636,7 +636,7 @@ class StringMethods(object): def pandas_isspace(s) -> "ps.Series[bool]": return s.str.islower() - return self._data.koalas.transform_batch(pandas_isspace) + return self._data.pandas_on_spark.transform_batch(pandas_isspace) def isupper(self) -> "ps.Series": """ @@ -661,7 +661,7 @@ class StringMethods(object): def pandas_isspace(s) -> "ps.Series[bool]": return s.str.isupper() - return self._data.koalas.transform_batch(pandas_isspace) + return self._data.pandas_on_spark.transform_batch(pandas_isspace) def istitle(self) -> "ps.Series": """ @@ -692,7 +692,7 @@ class StringMethods(object): def pandas_istitle(s) -> "ps.Series[bool]": return s.str.istitle() - return self._data.koalas.transform_batch(pandas_istitle) + return self._data.pandas_on_spark.transform_batch(pandas_istitle) def isnumeric(self) -> "ps.Series": """ @@ -751,7 +751,7 @@ class StringMethods(object): def pandas_isnumeric(s) -> "ps.Series[bool]": return s.str.isnumeric() - return self._data.koalas.transform_batch(pandas_isnumeric) + return self._data.pandas_on_spark.transform_batch(pandas_isnumeric) def isdecimal(self) -> "ps.Series": """ @@ -802,7 +802,7 @@ class StringMethods(object): def pandas_isdecimal(s) -> "ps.Series[bool]": return s.str.isdecimal() - return self._data.koalas.transform_batch(pandas_isdecimal) + return self._data.pandas_on_spark.transform_batch(pandas_isdecimal) @no_type_check def cat(self, others=None, sep=None, na_rep=None, join=None) -> "ps.Series": @@ -846,7 +846,7 @@ class StringMethods(object): def pandas_center(s) -> "ps.Series[str]": return s.str.center(width, fillchar) - return self._data.koalas.transform_batch(pandas_center) + return self._data.pandas_on_spark.transform_batch(pandas_center) def contains( self, pat: str, case: bool = True, flags: int = 0, na: Any = None, regex: bool = True @@ -966,7 +966,7 @@ class StringMethods(object): def pandas_contains(s) -> "ps.Series[bool]": return s.str.contains(pat, case, flags, na, regex) - return self._data.koalas.transform_batch(pandas_contains) + return self._data.pandas_on_spark.transform_batch(pandas_contains) def count(self, pat: str, flags: int = 0) -> "ps.Series": """ @@ -1017,7 +1017,7 @@ class StringMethods(object): def pandas_count(s) -> "ps.Series[int]": return s.str.count(pat, flags) - return self._data.koalas.transform_batch(pandas_count) + return self._data.pandas_on_spark.transform_batch(pandas_count) @no_type_check def decode(self, encoding, errors="strict") -> "ps.Series": @@ -1101,7 +1101,7 @@ class StringMethods(object): def pandas_find(s) -> "ps.Series[int]": return s.str.find(sub, start, end) - return self._data.koalas.transform_batch(pandas_find) + return self._data.pandas_on_spark.transform_batch(pandas_find) def findall(self, pat: str, flags: int = 0) -> "ps.Series": """ @@ -1231,7 +1231,7 @@ class StringMethods(object): def pandas_index(s) -> "ps.Series[np.int64]": return s.str.index(sub, start, end) - return self._data.koalas.transform_batch(pandas_index) + return self._data.pandas_on_spark.transform_batch(pandas_index) def join(self, sep: str) -> "ps.Series": """ @@ -1281,7 +1281,7 @@ class StringMethods(object): def pandas_join(s) -> "ps.Series[str]": return s.str.join(sep) - return self._data.koalas.transform_batch(pandas_join) + return self._data.pandas_on_spark.transform_batch(pandas_join) def len(self) -> "ps.Series": """ @@ -1352,7 +1352,7 @@ class StringMethods(object): def pandas_ljust(s) -> "ps.Series[str]": return s.str.ljust(width, fillchar) - return self._data.koalas.transform_batch(pandas_ljust) + return self._data.pandas_on_spark.transform_batch(pandas_ljust) def match(self, pat: str, case: bool = True, flags: int = 0, na: Any = np.NaN) -> "ps.Series": """ @@ -1419,7 +1419,7 @@ class StringMethods(object): def pandas_match(s) -> "ps.Series[bool]": return s.str.match(pat, case, flags, na) - return self._data.koalas.transform_batch(pandas_match) + return self._data.pandas_on_spark.transform_batch(pandas_match) def normalize(self, form: str) -> "ps.Series": """ @@ -1443,7 +1443,7 @@ class StringMethods(object): def pandas_normalize(s) -> "ps.Series[str]": return s.str.normalize(form) - return self._data.koalas.transform_batch(pandas_normalize) + return self._data.pandas_on_spark.transform_batch(pandas_normalize) def pad(self, width: int, side: str = "left", fillchar: str = " ") -> "ps.Series": """ @@ -1492,7 +1492,7 @@ class StringMethods(object): def pandas_pad(s) -> "ps.Series[str]": return s.str.pad(width, side, fillchar) - return self._data.koalas.transform_batch(pandas_pad) + return self._data.pandas_on_spark.transform_batch(pandas_pad) def partition(self, sep: str = " ", expand: bool = True) -> "ps.Series": """ @@ -1638,7 +1638,7 @@ class StringMethods(object): def pandas_replace(s) -> "ps.Series[str]": return s.str.replace(pat, repl, n=n, case=case, flags=flags, regex=regex) - return self._data.koalas.transform_batch(pandas_replace) + return self._data.pandas_on_spark.transform_batch(pandas_replace) def rfind(self, sub: str, start: int = 0, end: Optional[int] = None) -> "ps.Series": """ @@ -1694,7 +1694,7 @@ class StringMethods(object): def pandas_rfind(s) -> "ps.Series[int]": return s.str.rfind(sub, start, end) - return self._data.koalas.transform_batch(pandas_rfind) + return self._data.pandas_on_spark.transform_batch(pandas_rfind) def rindex(self, sub: str, start: int = 0, end: Optional[int] = None) -> "ps.Series": """ @@ -1738,7 +1738,7 @@ class StringMethods(object): def pandas_rindex(s) -> "ps.Series[np.int64]": return s.str.rindex(sub, start, end) - return self._data.koalas.transform_batch(pandas_rindex) + return self._data.pandas_on_spark.transform_batch(pandas_rindex) def rjust(self, width: int, fillchar: str = " ") -> "ps.Series": """ @@ -1780,7 +1780,7 @@ class StringMethods(object): def pandas_rjust(s) -> "ps.Series[str]": return s.str.rjust(width, fillchar) - return self._data.koalas.transform_batch(pandas_rjust) + return self._data.pandas_on_spark.transform_batch(pandas_rjust) def rpartition(self, sep: str = " ", expand: bool = True) -> "ps.Series": """ @@ -1846,7 +1846,7 @@ class StringMethods(object): def pandas_slice(s) -> "ps.Series[str]": return s.str.slice(start, stop, step) - return self._data.koalas.transform_batch(pandas_slice) + return self._data.pandas_on_spark.transform_batch(pandas_slice) def slice_replace( self, start: Optional[int] = None, stop: Optional[int] = None, repl: Optional[str] = None @@ -1923,7 +1923,7 @@ class StringMethods(object): def pandas_slice_replace(s) -> "ps.Series[str]": return s.str.slice_replace(start, stop, repl) - return self._data.koalas.transform_batch(pandas_slice_replace) + return self._data.pandas_on_spark.transform_batch(pandas_slice_replace) def split( self, pat: Optional[str] = None, n: int = -1, expand: bool = False @@ -2247,7 +2247,7 @@ class StringMethods(object): def pandas_translate(s) -> "ps.Series[str]": return s.str.translate(table) - return self._data.koalas.transform_batch(pandas_translate) + return self._data.pandas_on_spark.transform_batch(pandas_translate) def wrap(self, width: int, **kwargs: bool) -> "ps.Series": """ @@ -2299,7 +2299,7 @@ class StringMethods(object): def pandas_wrap(s) -> "ps.Series[str]": return s.str.wrap(width, **kwargs) - return self._data.koalas.transform_batch(pandas_wrap) + return self._data.pandas_on_spark.transform_batch(pandas_wrap) def zfill(self, width: int) -> "ps.Series": """ @@ -2350,7 +2350,7 @@ class StringMethods(object): def pandas_zfill(s) -> "ps.Series[str]": return s.str.zfill(width) - return self._data.koalas.transform_batch(pandas_zfill) + return self._data.pandas_on_spark.transform_batch(pandas_zfill) @no_type_check def get_dummies(self, sep: str = "|") -> "ps.DataFrame": diff --git a/python/pyspark/pandas/tests/test_categorical.py b/python/pyspark/pandas/tests/test_categorical.py index 593d4502f6..a6c6c230a0 100644 --- a/python/pyspark/pandas/tests/test_categorical.py +++ b/python/pyspark/pandas/tests/test_categorical.py @@ -301,7 +301,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): pdf, psdf = self.df_pair self.assert_eq( - psdf.koalas.apply_batch(lambda pdf: pdf.astype(str)).sort_index(), + psdf.pandas_on_spark.apply_batch(lambda pdf: pdf.astype(str)).sort_index(), pdf.astype(str).sort_index(), ) @@ -313,7 +313,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): dtype = CategoricalDtype(categories=["a", "b", "c", "d"]) self.assert_eq( - psdf.koalas.apply_batch(lambda pdf: pdf.astype(dtype)).sort_index(), + psdf.pandas_on_spark.apply_batch(lambda pdf: pdf.astype(dtype)).sort_index(), pdf.astype(dtype).sort_index(), ) @@ -327,7 +327,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): return pdf.astype(str) self.assert_eq( - psdf.koalas.apply_batch(to_str).sort_values(["a", "b"]).reset_index(drop=True), + psdf.pandas_on_spark.apply_batch(to_str).sort_values(["a", "b"]).reset_index(drop=True), to_str(pdf).sort_values(["a", "b"]).reset_index(drop=True), ) @@ -343,7 +343,9 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): return pdf.astype(dtype) self.assert_eq( - psdf.koalas.apply_batch(to_category).sort_values(["a", "b"]).reset_index(drop=True), + psdf.pandas_on_spark.apply_batch(to_category) + .sort_values(["a", "b"]) + .reset_index(drop=True), to_category(pdf).sort_values(["a", "b"]).reset_index(drop=True), ) @@ -351,11 +353,11 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): pdf, psdf = self.df_pair self.assert_eq( - psdf.koalas.transform_batch(lambda pdf: pdf.astype(str)).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.astype(str)).sort_index(), pdf.astype(str).sort_index(), ) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf: pdf.b.cat.codes).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.b.cat.codes).sort_index(), pdf.b.cat.codes.sort_index(), ) @@ -367,11 +369,11 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): dtype = CategoricalDtype(categories=["a", "b", "c", "d"]) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf: pdf.astype(dtype)).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.astype(dtype)).sort_index(), pdf.astype(dtype).sort_index(), ) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf: pdf.b.astype(dtype)).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.b.astype(dtype)).sort_index(), pdf.b.astype(dtype).sort_index(), ) @@ -385,14 +387,14 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): return pdf.astype(str) self.assert_eq( - psdf.koalas.transform_batch(to_str).sort_index(), to_str(pdf).sort_index(), + psdf.pandas_on_spark.transform_batch(to_str).sort_index(), to_str(pdf).sort_index(), ) def to_codes(pdf) -> ps.Series[np.int8]: return pdf.b.cat.codes self.assert_eq( - psdf.koalas.transform_batch(to_codes).sort_index(), to_codes(pdf).sort_index(), + psdf.pandas_on_spark.transform_batch(to_codes).sort_index(), to_codes(pdf).sort_index(), ) pdf = pd.DataFrame( @@ -407,14 +409,15 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): return pdf.astype(dtype) self.assert_eq( - psdf.koalas.transform_batch(to_category).sort_index(), to_category(pdf).sort_index(), + psdf.pandas_on_spark.transform_batch(to_category).sort_index(), + to_category(pdf).sort_index(), ) def to_category(pdf) -> ps.Series[dtype]: return pdf.b.astype(dtype) self.assert_eq( - psdf.koalas.transform_batch(to_category).sort_index(), + psdf.pandas_on_spark.transform_batch(to_category).sort_index(), to_category(pdf).rename().sort_index(), ) @@ -422,7 +425,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): pdf, psdf = self.df_pair self.assert_eq( - psdf.a.koalas.transform_batch(lambda pser: pser.astype(str)).sort_index(), + psdf.a.pandas_on_spark.transform_batch(lambda pser: pser.astype(str)).sort_index(), pdf.a.astype(str).sort_index(), ) @@ -434,7 +437,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): dtype = CategoricalDtype(categories=["a", "b", "c", "d"]) self.assert_eq( - psdf.a.koalas.transform_batch(lambda pser: pser.astype(dtype)).sort_index(), + psdf.a.pandas_on_spark.transform_batch(lambda pser: pser.astype(dtype)).sort_index(), pdf.a.astype(dtype).sort_index(), ) @@ -448,7 +451,7 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): return pser.astype(str) self.assert_eq( - psdf.a.koalas.transform_batch(to_str).sort_index(), to_str(pdf.a).sort_index() + psdf.a.pandas_on_spark.transform_batch(to_str).sort_index(), to_str(pdf.a).sort_index() ) pdf = pd.DataFrame( @@ -462,7 +465,8 @@ class CategoricalTest(PandasOnSparkTestCase, TestUtils): return pser.astype(dtype) self.assert_eq( - psdf.a.koalas.transform_batch(to_category).sort_index(), to_category(pdf.a).sort_index() + psdf.a.pandas_on_spark.transform_batch(to_category).sort_index(), + to_category(pdf.a).sort_index(), ) diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py index 5e6b6b924f..41e9ec30bc 100644 --- a/python/pyspark/pandas/tests/test_dataframe.py +++ b/python/pyspark/pandas/tests/test_dataframe.py @@ -4325,30 +4325,31 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): psdf = ps.DataFrame(pdf) self.assert_eq( - psdf.koalas.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(), + psdf.pandas_on_spark.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(), (pdf + 1).sort_index(), ) with option_context("compute.shortcut_limit", 500): self.assert_eq( - psdf.koalas.apply_batch(lambda pdf: pdf + 1).sort_index(), (pdf + 1).sort_index() + psdf.pandas_on_spark.apply_batch(lambda pdf: pdf + 1).sort_index(), + (pdf + 1).sort_index(), ) self.assert_eq( - psdf.koalas.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(), + psdf.pandas_on_spark.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(), (pdf + 1).sort_index(), ) with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"): - psdf.koalas.apply_batch(1) + psdf.pandas_on_spark.apply_batch(1) with self.assertRaisesRegex(TypeError, "The given function.*frame as its type hints"): def f2(_) -> ps.Series[int]: pass - psdf.koalas.apply_batch(f2) + psdf.pandas_on_spark.apply_batch(f2) with self.assertRaisesRegex(ValueError, "The given function should return a frame"): - psdf.koalas.apply_batch(lambda pdf: 1) + psdf.pandas_on_spark.apply_batch(lambda pdf: 1) # multi-index columns columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")]) @@ -4356,11 +4357,12 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): psdf.columns = columns self.assert_eq( - psdf.koalas.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index() + psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index() ) with option_context("compute.shortcut_limit", 500): self.assert_eq( - psdf.koalas.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index() + psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(), + (pdf + 1).sort_index(), ) def test_transform_batch(self): @@ -4376,46 +4378,46 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): psdf = ps.DataFrame(pdf) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf: pdf.c + 1).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.c + 1).sort_index(), (pdf.c + 1).sort_index(), ) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(), (pdf + 1).sort_index(), ) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(), (pdf.c + 1).sort_index(), ) with option_context("compute.shortcut_limit", 500): self.assert_eq( - psdf.koalas.transform_batch(lambda pdf: pdf + 1).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf: pdf + 1).sort_index(), (pdf + 1).sort_index(), ) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf: pdf.b + 1).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.b + 1).sort_index(), (pdf.b + 1).sort_index(), ) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(), (pdf + 1).sort_index(), ) self.assert_eq( - psdf.koalas.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(), + psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(), (pdf.c + 1).sort_index(), ) with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"): - psdf.koalas.transform_batch(1) + psdf.pandas_on_spark.transform_batch(1) with self.assertRaisesRegex(ValueError, "The given function should return a frame"): - psdf.koalas.transform_batch(lambda pdf: 1) + psdf.pandas_on_spark.transform_batch(lambda pdf: 1) with self.assertRaisesRegex( ValueError, "transform_batch cannot produce aggregated results" ): - psdf.koalas.transform_batch(lambda pdf: pd.Series(1)) + psdf.pandas_on_spark.transform_batch(lambda pdf: pd.Series(1)) # multi-index columns columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")]) @@ -4423,16 +4425,18 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): psdf.columns = columns self.assert_eq( - psdf.koalas.transform_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index() + psdf.pandas_on_spark.transform_batch(lambda x: x + 1).sort_index(), + (pdf + 1).sort_index(), ) with option_context("compute.shortcut_limit", 500): self.assert_eq( - psdf.koalas.transform_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index() + psdf.pandas_on_spark.transform_batch(lambda x: x + 1).sort_index(), + (pdf + 1).sort_index(), ) def test_transform_batch_same_anchor(self): psdf = ps.range(10) - psdf["d"] = psdf.koalas.transform_batch(lambda pdf: pdf.id + 1) + psdf["d"] = psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.id + 1) self.assert_eq( psdf, pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]), @@ -4443,7 +4447,7 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): def plus_one(pdf) -> ps.Series[np.int64]: return pdf.id + 1 - psdf["d"] = psdf.koalas.transform_batch(plus_one) + psdf["d"] = psdf.pandas_on_spark.transform_batch(plus_one) self.assert_eq( psdf, pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]), @@ -4454,7 +4458,7 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): def plus_one(ser) -> ps.Series[np.int64]: return ser + 1 - psdf["d"] = psdf.id.koalas.transform_batch(plus_one) + psdf["d"] = psdf.id.pandas_on_spark.transform_batch(plus_one) self.assert_eq( psdf, pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]),