7ff9d2e3ee
### What changes were proposed in this pull request? This PR proposes to rename Koalas to pandas-on-Spark in main codes ### Why are the changes needed? To have the correct name in PySpark. NOTE that the official name in the main documentation will be pandas APIs on Spark to be extra clear. pandas-on-Spark is not the official term. ### Does this PR introduce _any_ user-facing change? No, it's master-only change. It changes the docstring and class names. ### How was this patch tested? Manually tested via: ```bash ./python/run-tests --python-executable=python3 --modules pyspark-pandas ``` Closes #32166 from HyukjinKwon/rename-koalas. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1275 lines
42 KiB
Python
1275 lines
42 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
"""
|
|
Spark related features. Usually, the features here are missing in pandas
|
|
but Spark has it.
|
|
"""
|
|
from abc import ABCMeta, abstractmethod
|
|
from typing import TYPE_CHECKING, Optional, Union, List, cast
|
|
|
|
from pyspark import StorageLevel
|
|
from pyspark.sql import Column, DataFrame as SparkDataFrame
|
|
from pyspark.sql.types import DataType, StructType
|
|
|
|
if TYPE_CHECKING:
|
|
import pyspark.pandas as ps # noqa: F401 (SPARK-34943)
|
|
from pyspark.pandas.base import IndexOpsMixin # noqa: F401 (SPARK-34943)
|
|
from pyspark.pandas.frame import CachedDataFrame # noqa: F401 (SPARK-34943)
|
|
|
|
|
|
class SparkIndexOpsMethods(object, metaclass=ABCMeta):
|
|
"""Spark related features. Usually, the features here are missing in pandas
|
|
but Spark has it."""
|
|
|
|
def __init__(self, data: Union["IndexOpsMixin"]):
|
|
self._data = data
|
|
|
|
@property
|
|
def data_type(self) -> DataType:
|
|
""" Returns the data type as defined by Spark, as a Spark DataType object."""
|
|
return self._data._internal.spark_type_for(self._data._column_label)
|
|
|
|
@property
|
|
def nullable(self) -> bool:
|
|
""" Returns the nullability as defined by Spark. """
|
|
return self._data._internal.spark_column_nullable_for(self._data._column_label)
|
|
|
|
@property
|
|
def column(self) -> Column:
|
|
"""
|
|
Spark Column object representing the Series/Index.
|
|
|
|
.. note:: This Spark Column object is strictly stick to its base DataFrame the Series/Index
|
|
was derived from.
|
|
"""
|
|
return self._data._internal.spark_column_for(self._data._column_label)
|
|
|
|
def transform(self, func) -> Union["ps.Series", "ps.Index"]:
|
|
"""
|
|
Applies a function that takes and returns a Spark column. It allows to natively
|
|
apply a Spark function and column APIs with the Spark column internally used
|
|
in Series or Index. The output length of the Spark column should be same as input's.
|
|
|
|
.. note:: It requires to have the same input and output length; therefore,
|
|
the aggregate Spark functions such as count does not work.
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
Function to use for transforming the data by using Spark columns.
|
|
|
|
Returns
|
|
-------
|
|
Series or Index
|
|
|
|
Raises
|
|
------
|
|
ValueError : If the output from the function is not a Spark column.
|
|
|
|
Examples
|
|
--------
|
|
>>> from pyspark.sql.functions import log
|
|
>>> df = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
|
|
>>> df
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
>>> df.a.spark.transform(lambda c: log(c))
|
|
0 0.000000
|
|
1 0.693147
|
|
2 1.098612
|
|
Name: a, dtype: float64
|
|
|
|
>>> df.index.spark.transform(lambda c: c + 10)
|
|
Int64Index([10, 11, 12], dtype='int64')
|
|
|
|
>>> df.a.spark.transform(lambda c: c + df.b.spark.column)
|
|
0 5
|
|
1 7
|
|
2 9
|
|
Name: a, dtype: int64
|
|
"""
|
|
from pyspark.pandas import MultiIndex
|
|
|
|
if isinstance(self._data, MultiIndex):
|
|
raise NotImplementedError("MultiIndex does not support spark.transform yet.")
|
|
output = func(self._data.spark.column)
|
|
if not isinstance(output, Column):
|
|
raise ValueError(
|
|
"The output of the function [%s] should be of a "
|
|
"pyspark.sql.Column; however, got [%s]." % (func, type(output))
|
|
)
|
|
new_ser = self._data._with_new_scol(scol=output)
|
|
# Trigger the resolution so it throws an exception if anything does wrong
|
|
# within the function, for example,
|
|
# `df1.a.spark.transform(lambda _: F.col("non-existent"))`.
|
|
new_ser._internal.to_internal_spark_frame
|
|
return new_ser
|
|
|
|
@property
|
|
@abstractmethod
|
|
def analyzed(self) -> Union["ps.Series", "ps.Index"]:
|
|
pass
|
|
|
|
|
|
class SparkSeriesMethods(SparkIndexOpsMethods):
|
|
def transform(self, func) -> "ps.Series":
|
|
return cast("ps.Series", super().transform(func))
|
|
|
|
transform.__doc__ = SparkIndexOpsMethods.transform.__doc__
|
|
|
|
def apply(self, func) -> "ps.Series":
|
|
"""
|
|
Applies a function that takes and returns a Spark column. It allows to natively
|
|
apply a Spark function and column APIs with the Spark column internally used
|
|
in Series or Index.
|
|
|
|
.. note:: It forces to lose the index and end up with using default index. It is
|
|
preferred to use :meth:`Series.spark.transform` or `:meth:`DataFrame.spark.apply`
|
|
with specifying the `inedx_col`.
|
|
|
|
.. note:: It does not require to have the same length of the input and output.
|
|
However, it requires to create a new DataFrame internally which will require
|
|
to set `compute.ops_on_diff_frames` to compute even with the same origin
|
|
DataFrame that is expensive, whereas :meth:`Series.spark.transform` does not
|
|
require it.
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
Function to apply the function against the data by using Spark columns.
|
|
|
|
Returns
|
|
-------
|
|
Series
|
|
|
|
Raises
|
|
------
|
|
ValueError : If the output from the function is not a Spark column.
|
|
|
|
Examples
|
|
--------
|
|
>>> from pyspark import pandas as ps
|
|
>>> from pyspark.sql.functions import count, lit
|
|
>>> df = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
|
|
>>> df
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
>>> df.a.spark.apply(lambda c: count(c))
|
|
0 3
|
|
Name: a, dtype: int64
|
|
|
|
>>> df.a.spark.apply(lambda c: c + df.b.spark.column)
|
|
0 5
|
|
1 7
|
|
2 9
|
|
Name: a, dtype: int64
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
from pyspark.pandas.series import Series, first_series
|
|
from pyspark.pandas.internal import HIDDEN_COLUMNS
|
|
|
|
output = func(self._data.spark.column)
|
|
if not isinstance(output, Column):
|
|
raise ValueError(
|
|
"The output of the function [%s] should be of a "
|
|
"pyspark.sql.Column; however, got [%s]." % (func, type(output))
|
|
)
|
|
assert isinstance(self._data, Series)
|
|
|
|
sdf = self._data._internal.spark_frame.drop(*HIDDEN_COLUMNS).select(output)
|
|
# Lose index.
|
|
return first_series(DataFrame(sdf)).rename(self._data.name)
|
|
|
|
@property
|
|
def analyzed(self) -> "ps.Series":
|
|
"""
|
|
Returns a new Series with the analyzed Spark DataFrame.
|
|
|
|
After multiple operations, the underlying Spark plan could grow huge
|
|
and make the Spark planner take a long time to finish the planning.
|
|
|
|
This function is for the workaround to avoid it.
|
|
|
|
.. note:: After analyzed, operations between the analyzed Series and the original one
|
|
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
|
|
|
|
Returns
|
|
-------
|
|
Series
|
|
|
|
Examples
|
|
--------
|
|
>>> ser = ps.Series([1, 2, 3])
|
|
>>> ser
|
|
0 1
|
|
1 2
|
|
2 3
|
|
dtype: int64
|
|
|
|
The analyzed one should return the same value.
|
|
|
|
>>> ser.spark.analyzed
|
|
0 1
|
|
1 2
|
|
2 3
|
|
dtype: int64
|
|
|
|
However, it won't work with the same anchor Series.
|
|
|
|
>>> ser + ser.spark.analyzed
|
|
Traceback (most recent call last):
|
|
...
|
|
ValueError: ... enable 'compute.ops_on_diff_frames' option.
|
|
|
|
>>> with ps.option_context('compute.ops_on_diff_frames', True):
|
|
... (ser + ser.spark.analyzed).sort_index()
|
|
0 2
|
|
1 4
|
|
2 6
|
|
dtype: int64
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
from pyspark.pandas.series import first_series
|
|
|
|
return first_series(DataFrame(self._data._internal.resolved_copy))
|
|
|
|
|
|
class SparkIndexMethods(SparkIndexOpsMethods):
|
|
def transform(self, func) -> "ps.Index":
|
|
return cast("ps.Index", super().transform(func))
|
|
|
|
transform.__doc__ = SparkIndexOpsMethods.transform.__doc__
|
|
|
|
@property
|
|
def analyzed(self) -> "ps.Index":
|
|
"""
|
|
Returns a new Index with the analyzed Spark DataFrame.
|
|
|
|
After multiple operations, the underlying Spark plan could grow huge
|
|
and make the Spark planner take a long time to finish the planning.
|
|
|
|
This function is for the workaround to avoid it.
|
|
|
|
.. note:: After analyzed, operations between the analyzed Series and the original one
|
|
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
|
|
|
|
Returns
|
|
-------
|
|
Index
|
|
|
|
Examples
|
|
--------
|
|
>>> idx = ps.Index([1, 2, 3])
|
|
>>> idx
|
|
Int64Index([1, 2, 3], dtype='int64')
|
|
|
|
The analyzed one should return the same value.
|
|
|
|
>>> idx.spark.analyzed
|
|
Int64Index([1, 2, 3], dtype='int64')
|
|
|
|
However, it won't work with the same anchor Index.
|
|
|
|
>>> idx + idx.spark.analyzed
|
|
Traceback (most recent call last):
|
|
...
|
|
ValueError: ... enable 'compute.ops_on_diff_frames' option.
|
|
|
|
>>> with ps.option_context('compute.ops_on_diff_frames', True):
|
|
... (idx + idx.spark.analyzed).sort_values()
|
|
Int64Index([2, 4, 6], dtype='int64')
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
return DataFrame(self._data._internal.resolved_copy).index
|
|
|
|
|
|
class SparkFrameMethods(object):
|
|
"""Spark related features. Usually, the features here are missing in pandas
|
|
but Spark has it."""
|
|
|
|
def __init__(self, frame: "ps.DataFrame"):
|
|
self._kdf = frame
|
|
|
|
def schema(self, index_col: Optional[Union[str, List[str]]] = None) -> StructType:
|
|
"""
|
|
Returns the underlying Spark schema.
|
|
|
|
Returns
|
|
-------
|
|
pyspark.sql.types.StructType
|
|
The underlying Spark schema.
|
|
|
|
Parameters
|
|
----------
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame({'a': list('abc'),
|
|
... 'b': list(range(1, 4)),
|
|
... 'c': np.arange(3, 6).astype('i1'),
|
|
... 'd': np.arange(4.0, 7.0, dtype='float64'),
|
|
... 'e': [True, False, True],
|
|
... 'f': pd.date_range('20130101', periods=3)},
|
|
... columns=['a', 'b', 'c', 'd', 'e', 'f'])
|
|
>>> df.spark.schema().simpleString()
|
|
'struct<a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
|
|
>>> df.spark.schema(index_col='index').simpleString()
|
|
'struct<index:bigint,a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
|
|
"""
|
|
return self.frame(index_col).schema
|
|
|
|
def print_schema(self, index_col: Optional[Union[str, List[str]]] = None) -> None:
|
|
"""
|
|
Prints out the underlying Spark schema in the tree format.
|
|
|
|
Parameters
|
|
----------
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame({'a': list('abc'),
|
|
... 'b': list(range(1, 4)),
|
|
... 'c': np.arange(3, 6).astype('i1'),
|
|
... 'd': np.arange(4.0, 7.0, dtype='float64'),
|
|
... 'e': [True, False, True],
|
|
... 'f': pd.date_range('20130101', periods=3)},
|
|
... columns=['a', 'b', 'c', 'd', 'e', 'f'])
|
|
>>> df.spark.print_schema() # doctest: +NORMALIZE_WHITESPACE
|
|
root
|
|
|-- a: string (nullable = false)
|
|
|-- b: long (nullable = false)
|
|
|-- c: byte (nullable = false)
|
|
|-- d: double (nullable = false)
|
|
|-- e: boolean (nullable = false)
|
|
|-- f: timestamp (nullable = false)
|
|
>>> df.spark.print_schema(index_col='index') # doctest: +NORMALIZE_WHITESPACE
|
|
root
|
|
|-- index: long (nullable = false)
|
|
|-- a: string (nullable = false)
|
|
|-- b: long (nullable = false)
|
|
|-- c: byte (nullable = false)
|
|
|-- d: double (nullable = false)
|
|
|-- e: boolean (nullable = false)
|
|
|-- f: timestamp (nullable = false)
|
|
"""
|
|
self.frame(index_col).printSchema()
|
|
|
|
def frame(self, index_col: Optional[Union[str, List[str]]] = None) -> SparkDataFrame:
|
|
"""
|
|
Return the current DataFrame as a Spark DataFrame. :meth:`DataFrame.spark.frame` is an
|
|
alias of :meth:`DataFrame.to_spark`.
|
|
|
|
Parameters
|
|
----------
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
|
|
See Also
|
|
--------
|
|
DataFrame.to_spark
|
|
DataFrame.to_koalas
|
|
DataFrame.spark.frame
|
|
|
|
Examples
|
|
--------
|
|
By default, this method loses the index as below.
|
|
|
|
>>> df = ps.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]})
|
|
>>> df.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
|
|
+---+---+---+
|
|
| a| b| c|
|
|
+---+---+---+
|
|
| 1| 4| 7|
|
|
| 2| 5| 8|
|
|
| 3| 6| 9|
|
|
+---+---+---+
|
|
|
|
>>> df = ps.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]})
|
|
>>> df.spark.frame().show() # doctest: +NORMALIZE_WHITESPACE
|
|
+---+---+---+
|
|
| a| b| c|
|
|
+---+---+---+
|
|
| 1| 4| 7|
|
|
| 2| 5| 8|
|
|
| 3| 6| 9|
|
|
+---+---+---+
|
|
|
|
If `index_col` is set, it keeps the index column as specified.
|
|
|
|
>>> df.to_spark(index_col="index").show() # doctest: +NORMALIZE_WHITESPACE
|
|
+-----+---+---+---+
|
|
|index| a| b| c|
|
|
+-----+---+---+---+
|
|
| 0| 1| 4| 7|
|
|
| 1| 2| 5| 8|
|
|
| 2| 3| 6| 9|
|
|
+-----+---+---+---+
|
|
|
|
Keeping index column is useful when you want to call some Spark APIs and
|
|
convert it back to pandas-on-Spark DataFrame without creating a default index, which
|
|
can affect performance.
|
|
|
|
>>> spark_df = df.to_spark(index_col="index")
|
|
>>> spark_df = spark_df.filter("a == 2")
|
|
>>> spark_df.to_koalas(index_col="index") # doctest: +NORMALIZE_WHITESPACE
|
|
a b c
|
|
index
|
|
1 2 5 8
|
|
|
|
In case of multi-index, specify a list to `index_col`.
|
|
|
|
>>> new_df = df.set_index("a", append=True)
|
|
>>> new_spark_df = new_df.to_spark(index_col=["index_1", "index_2"])
|
|
>>> new_spark_df.show() # doctest: +NORMALIZE_WHITESPACE
|
|
+-------+-------+---+---+
|
|
|index_1|index_2| b| c|
|
|
+-------+-------+---+---+
|
|
| 0| 1| 4| 7|
|
|
| 1| 2| 5| 8|
|
|
| 2| 3| 6| 9|
|
|
+-------+-------+---+---+
|
|
|
|
Likewise, can be converted to back to pandas-on-Spark DataFrame.
|
|
|
|
>>> new_spark_df.to_koalas(
|
|
... index_col=["index_1", "index_2"]) # doctest: +NORMALIZE_WHITESPACE
|
|
b c
|
|
index_1 index_2
|
|
0 1 4 7
|
|
1 2 5 8
|
|
2 3 6 9
|
|
"""
|
|
from pyspark.pandas.utils import name_like_string
|
|
|
|
kdf = self._kdf
|
|
|
|
data_column_names = []
|
|
data_columns = []
|
|
for i, (label, spark_column, column_name) in enumerate(
|
|
zip(
|
|
kdf._internal.column_labels,
|
|
kdf._internal.data_spark_columns,
|
|
kdf._internal.data_spark_column_names,
|
|
)
|
|
):
|
|
name = str(i) if label is None else name_like_string(label)
|
|
data_column_names.append(name)
|
|
if column_name != name:
|
|
spark_column = spark_column.alias(name)
|
|
data_columns.append(spark_column)
|
|
|
|
if index_col is None:
|
|
return kdf._internal.spark_frame.select(data_columns)
|
|
else:
|
|
if isinstance(index_col, str):
|
|
index_col = [index_col]
|
|
|
|
old_index_scols = kdf._internal.index_spark_columns
|
|
|
|
if len(index_col) != len(old_index_scols):
|
|
raise ValueError(
|
|
"length of index columns is %s; however, the length of the given "
|
|
"'index_col' is %s." % (len(old_index_scols), len(index_col))
|
|
)
|
|
|
|
if any(col in data_column_names for col in index_col):
|
|
raise ValueError("'index_col' cannot be overlapped with other columns.")
|
|
|
|
new_index_scols = [
|
|
index_scol.alias(col) for index_scol, col in zip(old_index_scols, index_col)
|
|
]
|
|
return kdf._internal.spark_frame.select(new_index_scols + data_columns)
|
|
|
|
def cache(self) -> "CachedDataFrame":
|
|
"""
|
|
Yields and caches the current DataFrame.
|
|
|
|
The pandas-on-Spark DataFrame is yielded as a protected resource and its corresponding
|
|
data is cached which gets uncached after execution goes of the context.
|
|
|
|
If you want to specify the StorageLevel manually, use :meth:`DataFrame.spark.persist`
|
|
|
|
See Also
|
|
--------
|
|
DataFrame.spark.persist
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
|
|
... columns=['dogs', 'cats'])
|
|
>>> df
|
|
dogs cats
|
|
0 0.2 0.3
|
|
1 0.0 0.6
|
|
2 0.6 0.0
|
|
3 0.2 0.1
|
|
|
|
>>> with df.spark.cache() as cached_df:
|
|
... print(cached_df.count())
|
|
...
|
|
dogs 4
|
|
cats 4
|
|
dtype: int64
|
|
|
|
>>> df = df.spark.cache()
|
|
>>> df.to_pandas().mean(axis=1)
|
|
0 0.25
|
|
1 0.30
|
|
2 0.30
|
|
3 0.15
|
|
dtype: float64
|
|
|
|
To uncache the dataframe, use `unpersist` function
|
|
|
|
>>> df.spark.unpersist()
|
|
"""
|
|
from pyspark.pandas.frame import CachedDataFrame
|
|
|
|
self._kdf._update_internal_frame(
|
|
self._kdf._internal.resolved_copy, requires_same_anchor=False
|
|
)
|
|
return CachedDataFrame(self._kdf._internal)
|
|
|
|
def persist(
|
|
self, storage_level: StorageLevel = StorageLevel.MEMORY_AND_DISK
|
|
) -> "CachedDataFrame":
|
|
"""
|
|
Yields and caches the current DataFrame with a specific StorageLevel.
|
|
If a StogeLevel is not given, the `MEMORY_AND_DISK` level is used by default like PySpark.
|
|
|
|
The pandas-on-Spark DataFrame is yielded as a protected resource and its corresponding
|
|
data is cached which gets uncached after execution goes of the context.
|
|
|
|
See Also
|
|
--------
|
|
DataFrame.spark.cache
|
|
|
|
Examples
|
|
--------
|
|
>>> import pyspark
|
|
>>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
|
|
... columns=['dogs', 'cats'])
|
|
>>> df
|
|
dogs cats
|
|
0 0.2 0.3
|
|
1 0.0 0.6
|
|
2 0.6 0.0
|
|
3 0.2 0.1
|
|
|
|
Set the StorageLevel to `MEMORY_ONLY`.
|
|
|
|
>>> with df.spark.persist(pyspark.StorageLevel.MEMORY_ONLY) as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
... print(cached_df.count())
|
|
...
|
|
Memory Serialized 1x Replicated
|
|
dogs 4
|
|
cats 4
|
|
dtype: int64
|
|
|
|
Set the StorageLevel to `DISK_ONLY`.
|
|
|
|
>>> with df.spark.persist(pyspark.StorageLevel.DISK_ONLY) as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
... print(cached_df.count())
|
|
...
|
|
Disk Serialized 1x Replicated
|
|
dogs 4
|
|
cats 4
|
|
dtype: int64
|
|
|
|
If a StorageLevel is not given, it uses `MEMORY_AND_DISK` by default.
|
|
|
|
>>> with df.spark.persist() as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
... print(cached_df.count())
|
|
...
|
|
Disk Memory Serialized 1x Replicated
|
|
dogs 4
|
|
cats 4
|
|
dtype: int64
|
|
|
|
>>> df = df.spark.persist()
|
|
>>> df.to_pandas().mean(axis=1)
|
|
0 0.25
|
|
1 0.30
|
|
2 0.30
|
|
3 0.15
|
|
dtype: float64
|
|
|
|
To uncache the dataframe, use `unpersist` function
|
|
|
|
>>> df.spark.unpersist()
|
|
"""
|
|
from pyspark.pandas.frame import CachedDataFrame
|
|
|
|
self._kdf._update_internal_frame(
|
|
self._kdf._internal.resolved_copy, requires_same_anchor=False
|
|
)
|
|
return CachedDataFrame(self._kdf._internal, storage_level=storage_level)
|
|
|
|
def hint(self, name: str, *parameters) -> "ps.DataFrame":
|
|
"""
|
|
Specifies some hint on the current DataFrame.
|
|
|
|
Parameters
|
|
----------
|
|
name : A name of the hint.
|
|
parameters : Optional parameters.
|
|
|
|
Returns
|
|
-------
|
|
ret : DataFrame with the hint.
|
|
|
|
See Also
|
|
--------
|
|
broadcast : Marks a DataFrame as small enough for use in broadcast joins.
|
|
|
|
Examples
|
|
--------
|
|
>>> df1 = ps.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
|
|
... 'value': [1, 2, 3, 5]},
|
|
... columns=['lkey', 'value']).set_index('lkey')
|
|
>>> df2 = ps.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
|
|
... 'value': [5, 6, 7, 8]},
|
|
... columns=['rkey', 'value']).set_index('rkey')
|
|
>>> merged = df1.merge(df2.spark.hint("broadcast"), left_index=True, right_index=True)
|
|
>>> merged.spark.explain() # doctest: +ELLIPSIS
|
|
== Physical Plan ==
|
|
...
|
|
...BroadcastHashJoin...
|
|
...
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._kdf._internal.resolved_copy
|
|
return DataFrame(internal.with_new_sdf(internal.spark_frame.hint(name, *parameters)))
|
|
|
|
def to_table(
|
|
self,
|
|
name: str,
|
|
format: Optional[str] = None,
|
|
mode: str = "overwrite",
|
|
partition_cols: Optional[Union[str, List[str]]] = None,
|
|
index_col: Optional[Union[str, List[str]]] = None,
|
|
**options
|
|
) -> None:
|
|
"""
|
|
Write the DataFrame into a Spark table. :meth:`DataFrame.spark.to_table`
|
|
is an alias of :meth:`DataFrame.to_table`.
|
|
|
|
Parameters
|
|
----------
|
|
name : str, required
|
|
Table name in Spark.
|
|
format : string, optional
|
|
Specifies the output data source format. Some common ones are:
|
|
|
|
- 'delta'
|
|
- 'parquet'
|
|
- 'orc'
|
|
- 'json'
|
|
- 'csv'
|
|
|
|
mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, default
|
|
'overwrite'. Specifies the behavior of the save operation when the table exists
|
|
already.
|
|
|
|
- 'append': Append the new data to existing data.
|
|
- 'overwrite': Overwrite existing data.
|
|
- 'ignore': Silently ignore this operation if data already exists.
|
|
- 'error' or 'errorifexists': Throw an exception if data already exists.
|
|
|
|
partition_cols : str or list of str, optional, default None
|
|
Names of partitioning columns
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
options
|
|
Additional options passed directly to Spark.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
See Also
|
|
--------
|
|
read_table
|
|
DataFrame.to_spark_io
|
|
DataFrame.spark.to_spark_io
|
|
DataFrame.to_parquet
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame(dict(
|
|
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
|
|
... country=['KR', 'US', 'JP'],
|
|
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
|
|
>>> df
|
|
date country code
|
|
0 2012-01-31 12:00:00 KR 1
|
|
1 2012-02-29 12:00:00 US 2
|
|
2 2012-03-31 12:00:00 JP 3
|
|
|
|
>>> df.to_table('%s.my_table' % db, partition_cols='date')
|
|
"""
|
|
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
|
|
options = options.get("options") # type: ignore
|
|
|
|
self._kdf.spark.frame(index_col=index_col).write.saveAsTable(
|
|
name=name, format=format, mode=mode, partitionBy=partition_cols, **options
|
|
)
|
|
|
|
def to_spark_io(
|
|
self,
|
|
path: Optional[str] = None,
|
|
format: Optional[str] = None,
|
|
mode: str = "overwrite",
|
|
partition_cols: Optional[Union[str, List[str]]] = None,
|
|
index_col: Optional[Union[str, List[str]]] = None,
|
|
**options
|
|
) -> None:
|
|
"""Write the DataFrame out to a Spark data source. :meth:`DataFrame.spark.to_spark_io`
|
|
is an alias of :meth:`DataFrame.to_spark_io`.
|
|
|
|
Parameters
|
|
----------
|
|
path : string, optional
|
|
Path to the data source.
|
|
format : string, optional
|
|
Specifies the output data source format. Some common ones are:
|
|
|
|
- 'delta'
|
|
- 'parquet'
|
|
- 'orc'
|
|
- 'json'
|
|
- 'csv'
|
|
mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, default
|
|
'overwrite'. Specifies the behavior of the save operation when data already.
|
|
|
|
- 'append': Append the new data to existing data.
|
|
- 'overwrite': Overwrite existing data.
|
|
- 'ignore': Silently ignore this operation if data already exists.
|
|
- 'error' or 'errorifexists': Throw an exception if data already exists.
|
|
partition_cols : str or list of str, optional
|
|
Names of partitioning columns
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
options : dict
|
|
All other options passed directly into Spark's data source.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
See Also
|
|
--------
|
|
read_spark_io
|
|
DataFrame.to_delta
|
|
DataFrame.to_parquet
|
|
DataFrame.to_table
|
|
DataFrame.to_spark_io
|
|
DataFrame.spark.to_spark_io
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame(dict(
|
|
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
|
|
... country=['KR', 'US', 'JP'],
|
|
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
|
|
>>> df
|
|
date country code
|
|
0 2012-01-31 12:00:00 KR 1
|
|
1 2012-02-29 12:00:00 US 2
|
|
2 2012-03-31 12:00:00 JP 3
|
|
|
|
>>> df.to_spark_io(path='%s/to_spark_io/foo.json' % path, format='json')
|
|
"""
|
|
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
|
|
options = options.get("options") # type: ignore
|
|
|
|
self._kdf.spark.frame(index_col=index_col).write.save(
|
|
path=path, format=format, mode=mode, partitionBy=partition_cols, **options
|
|
)
|
|
|
|
def explain(self, extended: Optional[bool] = None, mode: Optional[str] = None) -> None:
|
|
"""
|
|
Prints the underlying (logical and physical) Spark plans to the console for debugging
|
|
purpose.
|
|
|
|
Parameters
|
|
----------
|
|
extended : boolean, default ``False``.
|
|
If ``False``, prints only the physical plan.
|
|
mode : string, default ``None``.
|
|
The expected output format of plans.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame({'id': range(10)})
|
|
>>> df.spark.explain() # doctest: +ELLIPSIS
|
|
== Physical Plan ==
|
|
...
|
|
|
|
>>> df.spark.explain(True) # doctest: +ELLIPSIS
|
|
== Parsed Logical Plan ==
|
|
...
|
|
== Analyzed Logical Plan ==
|
|
...
|
|
== Optimized Logical Plan ==
|
|
...
|
|
== Physical Plan ==
|
|
...
|
|
|
|
>>> df.spark.explain("extended") # doctest: +ELLIPSIS
|
|
== Parsed Logical Plan ==
|
|
...
|
|
== Analyzed Logical Plan ==
|
|
...
|
|
== Optimized Logical Plan ==
|
|
...
|
|
== Physical Plan ==
|
|
...
|
|
|
|
>>> df.spark.explain(mode="extended") # doctest: +ELLIPSIS
|
|
== Parsed Logical Plan ==
|
|
...
|
|
== Analyzed Logical Plan ==
|
|
...
|
|
== Optimized Logical Plan ==
|
|
...
|
|
== Physical Plan ==
|
|
...
|
|
"""
|
|
self._kdf._internal.to_internal_spark_frame.explain(extended, mode)
|
|
|
|
def apply(self, func, index_col: Optional[Union[str, List[str]]] = None) -> "ps.DataFrame":
|
|
"""
|
|
Applies a function that takes and returns a Spark DataFrame. It allows natively
|
|
apply a Spark function and column APIs with the Spark column internally used
|
|
in Series or Index.
|
|
|
|
.. note:: set `index_col` and keep the column named as so in the output Spark
|
|
DataFrame to avoid using the default index to prevent performance penalty.
|
|
If you omit `index_col`, it will use default index which is potentially
|
|
expensive in general.
|
|
|
|
.. note:: it will lose column labels. This is a synonym of
|
|
``func(kdf.to_spark(index_col)).to_koalas(index_col)``.
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
Function to apply the function against the data by using Spark DataFrame.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Raises
|
|
------
|
|
ValueError : If the output from the function is not a Spark DataFrame.
|
|
|
|
Examples
|
|
--------
|
|
>>> kdf = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
|
|
>>> kdf
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
>>> kdf.spark.apply(
|
|
... lambda sdf: sdf.selectExpr("a + b as c", "index"), index_col="index")
|
|
... # doctest: +NORMALIZE_WHITESPACE
|
|
c
|
|
index
|
|
0 5
|
|
1 7
|
|
2 9
|
|
|
|
The case below ends up with using the default index, which should be avoided
|
|
if possible.
|
|
|
|
>>> kdf.spark.apply(lambda sdf: sdf.groupby("a").count().sort("a"))
|
|
a count
|
|
0 1 1
|
|
1 2 1
|
|
2 3 1
|
|
"""
|
|
output = func(self.frame(index_col))
|
|
if not isinstance(output, SparkDataFrame):
|
|
raise ValueError(
|
|
"The output of the function [%s] should be of a "
|
|
"pyspark.sql.DataFrame; however, got [%s]." % (func, type(output))
|
|
)
|
|
return output.to_koalas(index_col)
|
|
|
|
def repartition(self, num_partitions: int) -> "ps.DataFrame":
|
|
"""
|
|
Returns a new DataFrame partitioned by the given partitioning expressions. The
|
|
resulting DataFrame is hash partitioned.
|
|
|
|
Parameters
|
|
----------
|
|
num_partitions : int
|
|
The target number of partitions.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> kdf = ps.DataFrame({"age": [5, 5, 2, 2],
|
|
... "name": ["Bob", "Bob", "Alice", "Alice"]}).set_index("age")
|
|
>>> kdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
|
|
name
|
|
age
|
|
2 Alice
|
|
2 Alice
|
|
5 Bob
|
|
5 Bob
|
|
>>> new_kdf = kdf.spark.repartition(7)
|
|
>>> new_kdf.to_spark().rdd.getNumPartitions()
|
|
7
|
|
>>> new_kdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
|
|
name
|
|
age
|
|
2 Alice
|
|
2 Alice
|
|
5 Bob
|
|
5 Bob
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._kdf._internal.resolved_copy
|
|
repartitioned_sdf = internal.spark_frame.repartition(num_partitions)
|
|
return DataFrame(internal.with_new_sdf(repartitioned_sdf))
|
|
|
|
def coalesce(self, num_partitions: int) -> "ps.DataFrame":
|
|
"""
|
|
Returns a new DataFrame that has exactly `num_partitions` partitions.
|
|
|
|
.. note:: This operation results in a narrow dependency, e.g. if you go from 1000
|
|
partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new
|
|
partitions will claim 10 of the current partitions. If a larger number of partitions is
|
|
requested, it will stay at the current number of partitions. However, if you're doing a
|
|
drastic coalesce, e.g. to num_partitions = 1, this may result in your computation taking
|
|
place on fewer nodes than you like (e.g. one node in the case of num_partitions = 1). To
|
|
avoid this, you can call repartition(). This will add a shuffle step, but means the
|
|
current upstream partitions will be executed in parallel (per whatever the current
|
|
partitioning is).
|
|
|
|
Parameters
|
|
----------
|
|
num_partitions : int
|
|
The target number of partitions.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> kdf = ps.DataFrame({"age": [5, 5, 2, 2],
|
|
... "name": ["Bob", "Bob", "Alice", "Alice"]}).set_index("age")
|
|
>>> kdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
|
|
name
|
|
age
|
|
2 Alice
|
|
2 Alice
|
|
5 Bob
|
|
5 Bob
|
|
>>> new_kdf = kdf.spark.coalesce(1)
|
|
>>> new_kdf.to_spark().rdd.getNumPartitions()
|
|
1
|
|
>>> new_kdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
|
|
name
|
|
age
|
|
2 Alice
|
|
2 Alice
|
|
5 Bob
|
|
5 Bob
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._kdf._internal.resolved_copy
|
|
coalesced_sdf = internal.spark_frame.coalesce(num_partitions)
|
|
return DataFrame(internal.with_new_sdf(coalesced_sdf))
|
|
|
|
def checkpoint(self, eager: bool = True) -> "ps.DataFrame":
|
|
"""Returns a checkpointed version of this DataFrame.
|
|
|
|
Checkpointing can be used to truncate the logical plan of this DataFrame, which is
|
|
especially useful in iterative algorithms where the plan may grow exponentially. It will be
|
|
saved to files inside the checkpoint directory set with `SparkContext.setCheckpointDir`.
|
|
|
|
Parameters
|
|
----------
|
|
eager : bool
|
|
Whether to checkpoint this DataFrame immediately
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> kdf = ps.DataFrame({"a": ["a", "b", "c"]})
|
|
>>> kdf
|
|
a
|
|
0 a
|
|
1 b
|
|
2 c
|
|
>>> new_kdf = kdf.spark.checkpoint() # doctest: +SKIP
|
|
>>> new_kdf # doctest: +SKIP
|
|
a
|
|
0 a
|
|
1 b
|
|
2 c
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._kdf._internal.resolved_copy
|
|
checkpointed_sdf = internal.spark_frame.checkpoint(eager)
|
|
return DataFrame(internal.with_new_sdf(checkpointed_sdf))
|
|
|
|
def local_checkpoint(self, eager: bool = True) -> "ps.DataFrame":
|
|
"""Returns a locally checkpointed version of this DataFrame.
|
|
|
|
Checkpointing can be used to truncate the logical plan of this DataFrame, which is
|
|
especially useful in iterative algorithms where the plan may grow exponentially. Local
|
|
checkpoints are stored in the executors using the caching subsystem and therefore they are
|
|
not reliable.
|
|
|
|
Parameters
|
|
----------
|
|
eager : bool
|
|
Whether to locally checkpoint this DataFrame immediately
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> kdf = ps.DataFrame({"a": ["a", "b", "c"]})
|
|
>>> kdf
|
|
a
|
|
0 a
|
|
1 b
|
|
2 c
|
|
>>> new_kdf = kdf.spark.local_checkpoint()
|
|
>>> new_kdf
|
|
a
|
|
0 a
|
|
1 b
|
|
2 c
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._kdf._internal.resolved_copy
|
|
checkpointed_sdf = internal.spark_frame.localCheckpoint(eager)
|
|
return DataFrame(internal.with_new_sdf(checkpointed_sdf))
|
|
|
|
@property
|
|
def analyzed(self) -> "ps.DataFrame":
|
|
"""
|
|
Returns a new DataFrame with the analyzed Spark DataFrame.
|
|
|
|
After multiple operations, the underlying Spark plan could grow huge
|
|
and make the Spark planner take a long time to finish the planning.
|
|
|
|
This function is for the workaround to avoid it.
|
|
|
|
.. note:: After analyzed, operations between the analyzed DataFrame and the original one
|
|
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
|
|
>>> df
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
The analyzed one should return the same value.
|
|
|
|
>>> df.spark.analyzed
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
However, it won't work with the same anchor Series.
|
|
|
|
>>> df + df.spark.analyzed
|
|
Traceback (most recent call last):
|
|
...
|
|
ValueError: ... enable 'compute.ops_on_diff_frames' option.
|
|
|
|
>>> with ps.option_context('compute.ops_on_diff_frames', True):
|
|
... (df + df.spark.analyzed).sort_index()
|
|
a b
|
|
0 2 8
|
|
1 4 10
|
|
2 6 12
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
return DataFrame(self._kdf._internal.resolved_copy)
|
|
|
|
|
|
class CachedSparkFrameMethods(SparkFrameMethods):
|
|
"""Spark related features for cached DataFrame. This is usually created via
|
|
`df.spark.cache()`."""
|
|
|
|
def __init__(self, frame: "CachedDataFrame"):
|
|
super().__init__(frame)
|
|
|
|
@property
|
|
def storage_level(self) -> StorageLevel:
|
|
"""
|
|
Return the storage level of this cache.
|
|
|
|
Examples
|
|
--------
|
|
>>> import pyspark.pandas as ps
|
|
>>> import pyspark
|
|
>>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
|
|
... columns=['dogs', 'cats'])
|
|
>>> df
|
|
dogs cats
|
|
0 0.2 0.3
|
|
1 0.0 0.6
|
|
2 0.6 0.0
|
|
3 0.2 0.1
|
|
|
|
>>> with df.spark.cache() as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
...
|
|
Disk Memory Deserialized 1x Replicated
|
|
|
|
Set the StorageLevel to `MEMORY_ONLY`.
|
|
|
|
>>> with df.spark.persist(pyspark.StorageLevel.MEMORY_ONLY) as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
...
|
|
Memory Serialized 1x Replicated
|
|
"""
|
|
return self._kdf._cached.storageLevel
|
|
|
|
def unpersist(self) -> None:
|
|
"""
|
|
The `unpersist` function is used to uncache the pandas-on-Spark DataFrame when it
|
|
is not used with `with` statement.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
|
|
... columns=['dogs', 'cats'])
|
|
>>> df = df.spark.cache()
|
|
|
|
To uncache the dataframe, use `unpersist` function
|
|
|
|
>>> df.spark.unpersist()
|
|
"""
|
|
if self._kdf._cached.is_cached:
|
|
self._kdf._cached.unpersist()
|
|
|
|
|
|
def _test():
|
|
import os
|
|
import doctest
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import uuid
|
|
import numpy
|
|
import pandas
|
|
from pyspark.sql import SparkSession
|
|
import pyspark.pandas.spark.accessors
|
|
|
|
os.chdir(os.environ["SPARK_HOME"])
|
|
|
|
globs = pyspark.pandas.spark.accessors.__dict__.copy()
|
|
globs["np"] = numpy
|
|
globs["pd"] = pandas
|
|
globs["ps"] = pyspark.pandas
|
|
spark = (
|
|
SparkSession.builder.master("local[4]")
|
|
.appName("pyspark.pandas.spark.accessors tests")
|
|
.getOrCreate()
|
|
)
|
|
|
|
db_name = "db%s" % str(uuid.uuid4()).replace("-", "")
|
|
spark.sql("CREATE DATABASE %s" % db_name)
|
|
globs["db"] = db_name
|
|
|
|
path = tempfile.mkdtemp()
|
|
globs["path"] = path
|
|
|
|
(failure_count, test_count) = doctest.testmod(
|
|
pyspark.pandas.spark.accessors,
|
|
globs=globs,
|
|
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
|
|
)
|
|
|
|
shutil.rmtree(path, ignore_errors=True)
|
|
spark.sql("DROP DATABASE IF EXISTS %s CASCADE" % db_name)
|
|
spark.stop()
|
|
if failure_count:
|
|
sys.exit(-1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_test()
|