b8740a1d1e
### What changes were proposed in this pull request? This PR proposes applying `black` to pandas API on Spark codes, for improving static analysis. By executing the `./dev/reformat-python` in the spark home directory, all the code of the pandas API on Spark is fixed according to the static analysis rules. ### Why are the changes needed? This can be reduces the cost of static analysis during development. It has been used continuously for about a year in the Koalas project and its convenience has been proven. ### Does this PR introduce _any_ user-facing change? No, it's dev-only. ### How was this patch tested? Manually reformat the pandas API on Spark codes by running the `./dev/reformat-python`, and checked the `./dev/lint-python` is passed. Closes #32779 from itholic/SPARK-35499. Authored-by: itholic <haejoon.lee@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1273 lines
42 KiB
Python
1273 lines
42 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
"""
|
|
Spark related features. Usually, the features here are missing in pandas
|
|
but Spark has it.
|
|
"""
|
|
from abc import ABCMeta, abstractmethod
|
|
from typing import TYPE_CHECKING, Callable, List, Optional, Union, cast
|
|
|
|
from pyspark import StorageLevel
|
|
from pyspark.sql import Column, DataFrame as SparkDataFrame
|
|
from pyspark.sql.types import DataType, StructType
|
|
|
|
if TYPE_CHECKING:
|
|
from pyspark.sql._typing import OptionalPrimitiveType # noqa: F401 (SPARK-34943)
|
|
from pyspark._typing import PrimitiveType # noqa: F401 (SPARK-34943)
|
|
|
|
import pyspark.pandas as ps # noqa: F401 (SPARK-34943)
|
|
from pyspark.pandas.base import IndexOpsMixin # noqa: F401 (SPARK-34943)
|
|
from pyspark.pandas.frame import CachedDataFrame # noqa: F401 (SPARK-34943)
|
|
|
|
|
|
class SparkIndexOpsMethods(metaclass=ABCMeta):
|
|
"""Spark related features. Usually, the features here are missing in pandas
|
|
but Spark has it."""
|
|
|
|
def __init__(self, data: "IndexOpsMixin"):
|
|
self._data = data
|
|
|
|
@property
|
|
def data_type(self) -> DataType:
|
|
"""Returns the data type as defined by Spark, as a Spark DataType object."""
|
|
return self._data._internal.spark_type_for(self._data._column_label)
|
|
|
|
@property
|
|
def nullable(self) -> bool:
|
|
"""Returns the nullability as defined by Spark."""
|
|
return self._data._internal.spark_column_nullable_for(self._data._column_label)
|
|
|
|
@property
|
|
def column(self) -> Column:
|
|
"""
|
|
Spark Column object representing the Series/Index.
|
|
|
|
.. note:: This Spark Column object is strictly stick to its base DataFrame the Series/Index
|
|
was derived from.
|
|
"""
|
|
return self._data._internal.spark_column_for(self._data._column_label)
|
|
|
|
def transform(self, func: Callable[[Column], Column]) -> Union["ps.Series", "ps.Index"]:
|
|
"""
|
|
Applies a function that takes and returns a Spark column. It allows to natively
|
|
apply a Spark function and column APIs with the Spark column internally used
|
|
in Series or Index. The output length of the Spark column should be same as input's.
|
|
|
|
.. note:: It requires to have the same input and output length; therefore,
|
|
the aggregate Spark functions such as count does not work.
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
Function to use for transforming the data by using Spark columns.
|
|
|
|
Returns
|
|
-------
|
|
Series or Index
|
|
|
|
Raises
|
|
------
|
|
ValueError : If the output from the function is not a Spark column.
|
|
|
|
Examples
|
|
--------
|
|
>>> from pyspark.sql.functions import log
|
|
>>> df = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
|
|
>>> df
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
>>> df.a.spark.transform(lambda c: log(c))
|
|
0 0.000000
|
|
1 0.693147
|
|
2 1.098612
|
|
Name: a, dtype: float64
|
|
|
|
>>> df.index.spark.transform(lambda c: c + 10)
|
|
Int64Index([10, 11, 12], dtype='int64')
|
|
|
|
>>> df.a.spark.transform(lambda c: c + df.b.spark.column)
|
|
0 5
|
|
1 7
|
|
2 9
|
|
Name: a, dtype: int64
|
|
"""
|
|
from pyspark.pandas import MultiIndex
|
|
|
|
if isinstance(self._data, MultiIndex):
|
|
raise NotImplementedError("MultiIndex does not support spark.transform yet.")
|
|
output = func(self._data.spark.column)
|
|
if not isinstance(output, Column):
|
|
raise ValueError(
|
|
"The output of the function [%s] should be of a "
|
|
"pyspark.sql.Column; however, got [%s]." % (func, type(output))
|
|
)
|
|
new_ser = self._data._with_new_scol(scol=output)
|
|
# Trigger the resolution so it throws an exception if anything does wrong
|
|
# within the function, for example,
|
|
# `df1.a.spark.transform(lambda _: F.col("non-existent"))`.
|
|
new_ser._internal.to_internal_spark_frame
|
|
return new_ser
|
|
|
|
@property
|
|
@abstractmethod
|
|
def analyzed(self) -> Union["ps.Series", "ps.Index"]:
|
|
pass
|
|
|
|
|
|
class SparkSeriesMethods(SparkIndexOpsMethods):
|
|
def apply(self, func: Callable[[Column], Column]) -> "ps.Series":
|
|
"""
|
|
Applies a function that takes and returns a Spark column. It allows to natively
|
|
apply a Spark function and column APIs with the Spark column internally used
|
|
in Series or Index.
|
|
|
|
.. note:: It forces to lose the index and end up with using default index. It is
|
|
preferred to use :meth:`Series.spark.transform` or `:meth:`DataFrame.spark.apply`
|
|
with specifying the `inedx_col`.
|
|
|
|
.. note:: It does not require to have the same length of the input and output.
|
|
However, it requires to create a new DataFrame internally which will require
|
|
to set `compute.ops_on_diff_frames` to compute even with the same origin
|
|
DataFrame that is expensive, whereas :meth:`Series.spark.transform` does not
|
|
require it.
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
Function to apply the function against the data by using Spark columns.
|
|
|
|
Returns
|
|
-------
|
|
Series
|
|
|
|
Raises
|
|
------
|
|
ValueError : If the output from the function is not a Spark column.
|
|
|
|
Examples
|
|
--------
|
|
>>> from pyspark import pandas as ps
|
|
>>> from pyspark.sql.functions import count, lit
|
|
>>> df = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
|
|
>>> df
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
>>> df.a.spark.apply(lambda c: count(c))
|
|
0 3
|
|
Name: a, dtype: int64
|
|
|
|
>>> df.a.spark.apply(lambda c: c + df.b.spark.column)
|
|
0 5
|
|
1 7
|
|
2 9
|
|
Name: a, dtype: int64
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
from pyspark.pandas.series import Series, first_series
|
|
from pyspark.pandas.internal import HIDDEN_COLUMNS
|
|
|
|
output = func(self._data.spark.column)
|
|
if not isinstance(output, Column):
|
|
raise ValueError(
|
|
"The output of the function [%s] should be of a "
|
|
"pyspark.sql.Column; however, got [%s]." % (func, type(output))
|
|
)
|
|
assert isinstance(self._data, Series)
|
|
|
|
sdf = self._data._internal.spark_frame.drop(*HIDDEN_COLUMNS).select(output)
|
|
# Lose index.
|
|
return first_series(DataFrame(sdf)).rename(self._data.name)
|
|
|
|
@property
|
|
def analyzed(self) -> "ps.Series":
|
|
"""
|
|
Returns a new Series with the analyzed Spark DataFrame.
|
|
|
|
After multiple operations, the underlying Spark plan could grow huge
|
|
and make the Spark planner take a long time to finish the planning.
|
|
|
|
This function is for the workaround to avoid it.
|
|
|
|
.. note:: After analyzed, operations between the analyzed Series and the original one
|
|
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
|
|
|
|
Returns
|
|
-------
|
|
Series
|
|
|
|
Examples
|
|
--------
|
|
>>> ser = ps.Series([1, 2, 3])
|
|
>>> ser
|
|
0 1
|
|
1 2
|
|
2 3
|
|
dtype: int64
|
|
|
|
The analyzed one should return the same value.
|
|
|
|
>>> ser.spark.analyzed
|
|
0 1
|
|
1 2
|
|
2 3
|
|
dtype: int64
|
|
|
|
However, it won't work with the same anchor Series.
|
|
|
|
>>> ser + ser.spark.analyzed
|
|
Traceback (most recent call last):
|
|
...
|
|
ValueError: ... enable 'compute.ops_on_diff_frames' option.
|
|
|
|
>>> with ps.option_context('compute.ops_on_diff_frames', True):
|
|
... (ser + ser.spark.analyzed).sort_index()
|
|
0 2
|
|
1 4
|
|
2 6
|
|
dtype: int64
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
from pyspark.pandas.series import first_series
|
|
|
|
return first_series(DataFrame(self._data._internal.resolved_copy))
|
|
|
|
|
|
class SparkIndexMethods(SparkIndexOpsMethods):
|
|
@property
|
|
def analyzed(self) -> "ps.Index":
|
|
"""
|
|
Returns a new Index with the analyzed Spark DataFrame.
|
|
|
|
After multiple operations, the underlying Spark plan could grow huge
|
|
and make the Spark planner take a long time to finish the planning.
|
|
|
|
This function is for the workaround to avoid it.
|
|
|
|
.. note:: After analyzed, operations between the analyzed Series and the original one
|
|
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
|
|
|
|
Returns
|
|
-------
|
|
Index
|
|
|
|
Examples
|
|
--------
|
|
>>> idx = ps.Index([1, 2, 3])
|
|
>>> idx
|
|
Int64Index([1, 2, 3], dtype='int64')
|
|
|
|
The analyzed one should return the same value.
|
|
|
|
>>> idx.spark.analyzed
|
|
Int64Index([1, 2, 3], dtype='int64')
|
|
|
|
However, it won't work with the same anchor Index.
|
|
|
|
>>> idx + idx.spark.analyzed
|
|
Traceback (most recent call last):
|
|
...
|
|
ValueError: ... enable 'compute.ops_on_diff_frames' option.
|
|
|
|
>>> with ps.option_context('compute.ops_on_diff_frames', True):
|
|
... (idx + idx.spark.analyzed).sort_values()
|
|
Int64Index([2, 4, 6], dtype='int64')
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
return DataFrame(self._data._internal.resolved_copy).index
|
|
|
|
|
|
class SparkFrameMethods(object):
|
|
"""Spark related features. Usually, the features here are missing in pandas
|
|
but Spark has it."""
|
|
|
|
def __init__(self, frame: "ps.DataFrame"):
|
|
self._psdf = frame
|
|
|
|
def schema(self, index_col: Optional[Union[str, List[str]]] = None) -> StructType:
|
|
"""
|
|
Returns the underlying Spark schema.
|
|
|
|
Returns
|
|
-------
|
|
pyspark.sql.types.StructType
|
|
The underlying Spark schema.
|
|
|
|
Parameters
|
|
----------
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame({'a': list('abc'),
|
|
... 'b': list(range(1, 4)),
|
|
... 'c': np.arange(3, 6).astype('i1'),
|
|
... 'd': np.arange(4.0, 7.0, dtype='float64'),
|
|
... 'e': [True, False, True],
|
|
... 'f': pd.date_range('20130101', periods=3)},
|
|
... columns=['a', 'b', 'c', 'd', 'e', 'f'])
|
|
>>> df.spark.schema().simpleString()
|
|
'struct<a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
|
|
>>> df.spark.schema(index_col='index').simpleString()
|
|
'struct<index:bigint,a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
|
|
"""
|
|
return self.frame(index_col).schema
|
|
|
|
def print_schema(self, index_col: Optional[Union[str, List[str]]] = None) -> None:
|
|
"""
|
|
Prints out the underlying Spark schema in the tree format.
|
|
|
|
Parameters
|
|
----------
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame({'a': list('abc'),
|
|
... 'b': list(range(1, 4)),
|
|
... 'c': np.arange(3, 6).astype('i1'),
|
|
... 'd': np.arange(4.0, 7.0, dtype='float64'),
|
|
... 'e': [True, False, True],
|
|
... 'f': pd.date_range('20130101', periods=3)},
|
|
... columns=['a', 'b', 'c', 'd', 'e', 'f'])
|
|
>>> df.spark.print_schema() # doctest: +NORMALIZE_WHITESPACE
|
|
root
|
|
|-- a: string (nullable = false)
|
|
|-- b: long (nullable = false)
|
|
|-- c: byte (nullable = false)
|
|
|-- d: double (nullable = false)
|
|
|-- e: boolean (nullable = false)
|
|
|-- f: timestamp (nullable = false)
|
|
>>> df.spark.print_schema(index_col='index') # doctest: +NORMALIZE_WHITESPACE
|
|
root
|
|
|-- index: long (nullable = false)
|
|
|-- a: string (nullable = false)
|
|
|-- b: long (nullable = false)
|
|
|-- c: byte (nullable = false)
|
|
|-- d: double (nullable = false)
|
|
|-- e: boolean (nullable = false)
|
|
|-- f: timestamp (nullable = false)
|
|
"""
|
|
self.frame(index_col).printSchema()
|
|
|
|
def frame(self, index_col: Optional[Union[str, List[str]]] = None) -> SparkDataFrame:
|
|
"""
|
|
Return the current DataFrame as a Spark DataFrame. :meth:`DataFrame.spark.frame` is an
|
|
alias of :meth:`DataFrame.to_spark`.
|
|
|
|
Parameters
|
|
----------
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
|
|
See Also
|
|
--------
|
|
DataFrame.to_spark
|
|
DataFrame.to_pandas_on_spark
|
|
DataFrame.spark.frame
|
|
|
|
Examples
|
|
--------
|
|
By default, this method loses the index as below.
|
|
|
|
>>> df = ps.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]})
|
|
>>> df.to_spark().show() # doctest: +NORMALIZE_WHITESPACE
|
|
+---+---+---+
|
|
| a| b| c|
|
|
+---+---+---+
|
|
| 1| 4| 7|
|
|
| 2| 5| 8|
|
|
| 3| 6| 9|
|
|
+---+---+---+
|
|
|
|
>>> df = ps.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]})
|
|
>>> df.spark.frame().show() # doctest: +NORMALIZE_WHITESPACE
|
|
+---+---+---+
|
|
| a| b| c|
|
|
+---+---+---+
|
|
| 1| 4| 7|
|
|
| 2| 5| 8|
|
|
| 3| 6| 9|
|
|
+---+---+---+
|
|
|
|
If `index_col` is set, it keeps the index column as specified.
|
|
|
|
>>> df.to_spark(index_col="index").show() # doctest: +NORMALIZE_WHITESPACE
|
|
+-----+---+---+---+
|
|
|index| a| b| c|
|
|
+-----+---+---+---+
|
|
| 0| 1| 4| 7|
|
|
| 1| 2| 5| 8|
|
|
| 2| 3| 6| 9|
|
|
+-----+---+---+---+
|
|
|
|
Keeping index column is useful when you want to call some Spark APIs and
|
|
convert it back to pandas-on-Spark DataFrame without creating a default index, which
|
|
can affect performance.
|
|
|
|
>>> spark_df = df.to_spark(index_col="index")
|
|
>>> spark_df = spark_df.filter("a == 2")
|
|
>>> spark_df.to_pandas_on_spark(index_col="index") # doctest: +NORMALIZE_WHITESPACE
|
|
a b c
|
|
index
|
|
1 2 5 8
|
|
|
|
In case of multi-index, specify a list to `index_col`.
|
|
|
|
>>> new_df = df.set_index("a", append=True)
|
|
>>> new_spark_df = new_df.to_spark(index_col=["index_1", "index_2"])
|
|
>>> new_spark_df.show() # doctest: +NORMALIZE_WHITESPACE
|
|
+-------+-------+---+---+
|
|
|index_1|index_2| b| c|
|
|
+-------+-------+---+---+
|
|
| 0| 1| 4| 7|
|
|
| 1| 2| 5| 8|
|
|
| 2| 3| 6| 9|
|
|
+-------+-------+---+---+
|
|
|
|
Likewise, can be converted to back to pandas-on-Spark DataFrame.
|
|
|
|
>>> new_spark_df.to_pandas_on_spark(
|
|
... index_col=["index_1", "index_2"]) # doctest: +NORMALIZE_WHITESPACE
|
|
b c
|
|
index_1 index_2
|
|
0 1 4 7
|
|
1 2 5 8
|
|
2 3 6 9
|
|
"""
|
|
from pyspark.pandas.utils import name_like_string
|
|
|
|
psdf = self._psdf
|
|
|
|
data_column_names = []
|
|
data_columns = []
|
|
for i, (label, spark_column, column_name) in enumerate(
|
|
zip(
|
|
psdf._internal.column_labels,
|
|
psdf._internal.data_spark_columns,
|
|
psdf._internal.data_spark_column_names,
|
|
)
|
|
):
|
|
name = str(i) if label is None else name_like_string(label)
|
|
data_column_names.append(name)
|
|
if column_name != name:
|
|
spark_column = spark_column.alias(name)
|
|
data_columns.append(spark_column)
|
|
|
|
if index_col is None:
|
|
return psdf._internal.spark_frame.select(data_columns)
|
|
else:
|
|
if isinstance(index_col, str):
|
|
index_col = [index_col]
|
|
|
|
old_index_scols = psdf._internal.index_spark_columns
|
|
|
|
if len(index_col) != len(old_index_scols):
|
|
raise ValueError(
|
|
"length of index columns is %s; however, the length of the given "
|
|
"'index_col' is %s." % (len(old_index_scols), len(index_col))
|
|
)
|
|
|
|
if any(col in data_column_names for col in index_col):
|
|
raise ValueError("'index_col' cannot be overlapped with other columns.")
|
|
|
|
new_index_scols = [
|
|
index_scol.alias(col) for index_scol, col in zip(old_index_scols, index_col)
|
|
]
|
|
return psdf._internal.spark_frame.select(new_index_scols + data_columns)
|
|
|
|
def cache(self) -> "CachedDataFrame":
|
|
"""
|
|
Yields and caches the current DataFrame.
|
|
|
|
The pandas-on-Spark DataFrame is yielded as a protected resource and its corresponding
|
|
data is cached which gets uncached after execution goes of the context.
|
|
|
|
If you want to specify the StorageLevel manually, use :meth:`DataFrame.spark.persist`
|
|
|
|
See Also
|
|
--------
|
|
DataFrame.spark.persist
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
|
|
... columns=['dogs', 'cats'])
|
|
>>> df
|
|
dogs cats
|
|
0 0.2 0.3
|
|
1 0.0 0.6
|
|
2 0.6 0.0
|
|
3 0.2 0.1
|
|
|
|
>>> with df.spark.cache() as cached_df:
|
|
... print(cached_df.count())
|
|
...
|
|
dogs 4
|
|
cats 4
|
|
dtype: int64
|
|
|
|
>>> df = df.spark.cache()
|
|
>>> df.to_pandas().mean(axis=1)
|
|
0 0.25
|
|
1 0.30
|
|
2 0.30
|
|
3 0.15
|
|
dtype: float64
|
|
|
|
To uncache the dataframe, use `unpersist` function
|
|
|
|
>>> df.spark.unpersist()
|
|
"""
|
|
from pyspark.pandas.frame import CachedDataFrame
|
|
|
|
self._psdf._update_internal_frame(
|
|
self._psdf._internal.resolved_copy, requires_same_anchor=False
|
|
)
|
|
return CachedDataFrame(self._psdf._internal)
|
|
|
|
def persist(
|
|
self, storage_level: StorageLevel = StorageLevel.MEMORY_AND_DISK
|
|
) -> "CachedDataFrame":
|
|
"""
|
|
Yields and caches the current DataFrame with a specific StorageLevel.
|
|
If a StogeLevel is not given, the `MEMORY_AND_DISK` level is used by default like PySpark.
|
|
|
|
The pandas-on-Spark DataFrame is yielded as a protected resource and its corresponding
|
|
data is cached which gets uncached after execution goes of the context.
|
|
|
|
See Also
|
|
--------
|
|
DataFrame.spark.cache
|
|
|
|
Examples
|
|
--------
|
|
>>> import pyspark
|
|
>>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
|
|
... columns=['dogs', 'cats'])
|
|
>>> df
|
|
dogs cats
|
|
0 0.2 0.3
|
|
1 0.0 0.6
|
|
2 0.6 0.0
|
|
3 0.2 0.1
|
|
|
|
Set the StorageLevel to `MEMORY_ONLY`.
|
|
|
|
>>> with df.spark.persist(pyspark.StorageLevel.MEMORY_ONLY) as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
... print(cached_df.count())
|
|
...
|
|
Memory Serialized 1x Replicated
|
|
dogs 4
|
|
cats 4
|
|
dtype: int64
|
|
|
|
Set the StorageLevel to `DISK_ONLY`.
|
|
|
|
>>> with df.spark.persist(pyspark.StorageLevel.DISK_ONLY) as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
... print(cached_df.count())
|
|
...
|
|
Disk Serialized 1x Replicated
|
|
dogs 4
|
|
cats 4
|
|
dtype: int64
|
|
|
|
If a StorageLevel is not given, it uses `MEMORY_AND_DISK` by default.
|
|
|
|
>>> with df.spark.persist() as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
... print(cached_df.count())
|
|
...
|
|
Disk Memory Serialized 1x Replicated
|
|
dogs 4
|
|
cats 4
|
|
dtype: int64
|
|
|
|
>>> df = df.spark.persist()
|
|
>>> df.to_pandas().mean(axis=1)
|
|
0 0.25
|
|
1 0.30
|
|
2 0.30
|
|
3 0.15
|
|
dtype: float64
|
|
|
|
To uncache the dataframe, use `unpersist` function
|
|
|
|
>>> df.spark.unpersist()
|
|
"""
|
|
from pyspark.pandas.frame import CachedDataFrame
|
|
|
|
self._psdf._update_internal_frame(
|
|
self._psdf._internal.resolved_copy, requires_same_anchor=False
|
|
)
|
|
return CachedDataFrame(self._psdf._internal, storage_level=storage_level)
|
|
|
|
def hint(self, name: str, *parameters: "PrimitiveType") -> "ps.DataFrame":
|
|
"""
|
|
Specifies some hint on the current DataFrame.
|
|
|
|
Parameters
|
|
----------
|
|
name : A name of the hint.
|
|
parameters : Optional parameters.
|
|
|
|
Returns
|
|
-------
|
|
ret : DataFrame with the hint.
|
|
|
|
See Also
|
|
--------
|
|
broadcast : Marks a DataFrame as small enough for use in broadcast joins.
|
|
|
|
Examples
|
|
--------
|
|
>>> df1 = ps.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
|
|
... 'value': [1, 2, 3, 5]},
|
|
... columns=['lkey', 'value']).set_index('lkey')
|
|
>>> df2 = ps.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
|
|
... 'value': [5, 6, 7, 8]},
|
|
... columns=['rkey', 'value']).set_index('rkey')
|
|
>>> merged = df1.merge(df2.spark.hint("broadcast"), left_index=True, right_index=True)
|
|
>>> merged.spark.explain() # doctest: +ELLIPSIS
|
|
== Physical Plan ==
|
|
...
|
|
...BroadcastHashJoin...
|
|
...
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._psdf._internal.resolved_copy
|
|
return DataFrame(internal.with_new_sdf(internal.spark_frame.hint(name, *parameters)))
|
|
|
|
def to_table(
|
|
self,
|
|
name: str,
|
|
format: Optional[str] = None,
|
|
mode: str = "overwrite",
|
|
partition_cols: Optional[Union[str, List[str]]] = None,
|
|
index_col: Optional[Union[str, List[str]]] = None,
|
|
**options: "OptionalPrimitiveType",
|
|
) -> None:
|
|
"""
|
|
Write the DataFrame into a Spark table. :meth:`DataFrame.spark.to_table`
|
|
is an alias of :meth:`DataFrame.to_table`.
|
|
|
|
Parameters
|
|
----------
|
|
name : str, required
|
|
Table name in Spark.
|
|
format : string, optional
|
|
Specifies the output data source format. Some common ones are:
|
|
|
|
- 'delta'
|
|
- 'parquet'
|
|
- 'orc'
|
|
- 'json'
|
|
- 'csv'
|
|
|
|
mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, default
|
|
'overwrite'. Specifies the behavior of the save operation when the table exists
|
|
already.
|
|
|
|
- 'append': Append the new data to existing data.
|
|
- 'overwrite': Overwrite existing data.
|
|
- 'ignore': Silently ignore this operation if data already exists.
|
|
- 'error' or 'errorifexists': Throw an exception if data already exists.
|
|
|
|
partition_cols : str or list of str, optional, default None
|
|
Names of partitioning columns
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
options
|
|
Additional options passed directly to Spark.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
See Also
|
|
--------
|
|
read_table
|
|
DataFrame.to_spark_io
|
|
DataFrame.spark.to_spark_io
|
|
DataFrame.to_parquet
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame(dict(
|
|
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
|
|
... country=['KR', 'US', 'JP'],
|
|
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
|
|
>>> df
|
|
date country code
|
|
0 2012-01-31 12:00:00 KR 1
|
|
1 2012-02-29 12:00:00 US 2
|
|
2 2012-03-31 12:00:00 JP 3
|
|
|
|
>>> df.to_table('%s.my_table' % db, partition_cols='date')
|
|
"""
|
|
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
|
|
options = options.get("options") # type: ignore
|
|
|
|
self._psdf.spark.frame(index_col=index_col).write.saveAsTable(
|
|
name=name, format=format, mode=mode, partitionBy=partition_cols, **options
|
|
)
|
|
|
|
def to_spark_io(
|
|
self,
|
|
path: Optional[str] = None,
|
|
format: Optional[str] = None,
|
|
mode: str = "overwrite",
|
|
partition_cols: Optional[Union[str, List[str]]] = None,
|
|
index_col: Optional[Union[str, List[str]]] = None,
|
|
**options: "OptionalPrimitiveType",
|
|
) -> None:
|
|
"""Write the DataFrame out to a Spark data source. :meth:`DataFrame.spark.to_spark_io`
|
|
is an alias of :meth:`DataFrame.to_spark_io`.
|
|
|
|
Parameters
|
|
----------
|
|
path : string, optional
|
|
Path to the data source.
|
|
format : string, optional
|
|
Specifies the output data source format. Some common ones are:
|
|
|
|
- 'delta'
|
|
- 'parquet'
|
|
- 'orc'
|
|
- 'json'
|
|
- 'csv'
|
|
mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'}, default
|
|
'overwrite'. Specifies the behavior of the save operation when data already.
|
|
|
|
- 'append': Append the new data to existing data.
|
|
- 'overwrite': Overwrite existing data.
|
|
- 'ignore': Silently ignore this operation if data already exists.
|
|
- 'error' or 'errorifexists': Throw an exception if data already exists.
|
|
partition_cols : str or list of str, optional
|
|
Names of partitioning columns
|
|
index_col: str or list of str, optional, default: None
|
|
Column names to be used in Spark to represent pandas-on-Spark's index. The index name
|
|
in pandas-on-Spark is ignored. By default, the index is always lost.
|
|
options : dict
|
|
All other options passed directly into Spark's data source.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
See Also
|
|
--------
|
|
read_spark_io
|
|
DataFrame.to_delta
|
|
DataFrame.to_parquet
|
|
DataFrame.to_table
|
|
DataFrame.to_spark_io
|
|
DataFrame.spark.to_spark_io
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame(dict(
|
|
... date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
|
|
... country=['KR', 'US', 'JP'],
|
|
... code=[1, 2 ,3]), columns=['date', 'country', 'code'])
|
|
>>> df
|
|
date country code
|
|
0 2012-01-31 12:00:00 KR 1
|
|
1 2012-02-29 12:00:00 US 2
|
|
2 2012-03-31 12:00:00 JP 3
|
|
|
|
>>> df.to_spark_io(path='%s/to_spark_io/foo.json' % path, format='json')
|
|
"""
|
|
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
|
|
options = options.get("options") # type: ignore
|
|
|
|
self._psdf.spark.frame(index_col=index_col).write.save(
|
|
path=path, format=format, mode=mode, partitionBy=partition_cols, **options
|
|
)
|
|
|
|
def explain(self, extended: Optional[bool] = None, mode: Optional[str] = None) -> None:
|
|
"""
|
|
Prints the underlying (logical and physical) Spark plans to the console for debugging
|
|
purpose.
|
|
|
|
Parameters
|
|
----------
|
|
extended : boolean, default ``False``.
|
|
If ``False``, prints only the physical plan.
|
|
mode : string, default ``None``.
|
|
The expected output format of plans.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame({'id': range(10)})
|
|
>>> df.spark.explain() # doctest: +ELLIPSIS
|
|
== Physical Plan ==
|
|
...
|
|
|
|
>>> df.spark.explain(True) # doctest: +ELLIPSIS
|
|
== Parsed Logical Plan ==
|
|
...
|
|
== Analyzed Logical Plan ==
|
|
...
|
|
== Optimized Logical Plan ==
|
|
...
|
|
== Physical Plan ==
|
|
...
|
|
|
|
>>> df.spark.explain("extended") # doctest: +ELLIPSIS
|
|
== Parsed Logical Plan ==
|
|
...
|
|
== Analyzed Logical Plan ==
|
|
...
|
|
== Optimized Logical Plan ==
|
|
...
|
|
== Physical Plan ==
|
|
...
|
|
|
|
>>> df.spark.explain(mode="extended") # doctest: +ELLIPSIS
|
|
== Parsed Logical Plan ==
|
|
...
|
|
== Analyzed Logical Plan ==
|
|
...
|
|
== Optimized Logical Plan ==
|
|
...
|
|
== Physical Plan ==
|
|
...
|
|
"""
|
|
self._psdf._internal.to_internal_spark_frame.explain(extended, mode)
|
|
|
|
def apply(
|
|
self,
|
|
func: Callable[[SparkDataFrame], SparkDataFrame],
|
|
index_col: Optional[Union[str, List[str]]] = None,
|
|
) -> "ps.DataFrame":
|
|
"""
|
|
Applies a function that takes and returns a Spark DataFrame. It allows natively
|
|
apply a Spark function and column APIs with the Spark column internally used
|
|
in Series or Index.
|
|
|
|
.. note:: set `index_col` and keep the column named as so in the output Spark
|
|
DataFrame to avoid using the default index to prevent performance penalty.
|
|
If you omit `index_col`, it will use default index which is potentially
|
|
expensive in general.
|
|
|
|
.. note:: it will lose column labels. This is a synonym of
|
|
``func(psdf.to_spark(index_col)).to_pandas_on_spark(index_col)``.
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
Function to apply the function against the data by using Spark DataFrame.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Raises
|
|
------
|
|
ValueError : If the output from the function is not a Spark DataFrame.
|
|
|
|
Examples
|
|
--------
|
|
>>> psdf = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
|
|
>>> psdf
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
>>> psdf.spark.apply(
|
|
... lambda sdf: sdf.selectExpr("a + b as c", "index"), index_col="index")
|
|
... # doctest: +NORMALIZE_WHITESPACE
|
|
c
|
|
index
|
|
0 5
|
|
1 7
|
|
2 9
|
|
|
|
The case below ends up with using the default index, which should be avoided
|
|
if possible.
|
|
|
|
>>> psdf.spark.apply(lambda sdf: sdf.groupby("a").count().sort("a"))
|
|
a count
|
|
0 1 1
|
|
1 2 1
|
|
2 3 1
|
|
"""
|
|
output = func(self.frame(index_col))
|
|
if not isinstance(output, SparkDataFrame):
|
|
raise ValueError(
|
|
"The output of the function [%s] should be of a "
|
|
"pyspark.sql.DataFrame; however, got [%s]." % (func, type(output))
|
|
)
|
|
psdf = output.to_pandas_on_spark(index_col) # type: ignore
|
|
return cast("ps.DataFrame", psdf)
|
|
|
|
def repartition(self, num_partitions: int) -> "ps.DataFrame":
|
|
"""
|
|
Returns a new DataFrame partitioned by the given partitioning expressions. The
|
|
resulting DataFrame is hash partitioned.
|
|
|
|
Parameters
|
|
----------
|
|
num_partitions : int
|
|
The target number of partitions.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> psdf = ps.DataFrame({"age": [5, 5, 2, 2],
|
|
... "name": ["Bob", "Bob", "Alice", "Alice"]}).set_index("age")
|
|
>>> psdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
|
|
name
|
|
age
|
|
2 Alice
|
|
2 Alice
|
|
5 Bob
|
|
5 Bob
|
|
>>> new_psdf = psdf.spark.repartition(7)
|
|
>>> new_psdf.to_spark().rdd.getNumPartitions()
|
|
7
|
|
>>> new_psdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
|
|
name
|
|
age
|
|
2 Alice
|
|
2 Alice
|
|
5 Bob
|
|
5 Bob
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._psdf._internal.resolved_copy
|
|
repartitioned_sdf = internal.spark_frame.repartition(num_partitions)
|
|
return DataFrame(internal.with_new_sdf(repartitioned_sdf))
|
|
|
|
def coalesce(self, num_partitions: int) -> "ps.DataFrame":
|
|
"""
|
|
Returns a new DataFrame that has exactly `num_partitions` partitions.
|
|
|
|
.. note:: This operation results in a narrow dependency, e.g. if you go from 1000
|
|
partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new
|
|
partitions will claim 10 of the current partitions. If a larger number of partitions is
|
|
requested, it will stay at the current number of partitions. However, if you're doing a
|
|
drastic coalesce, e.g. to num_partitions = 1, this may result in your computation taking
|
|
place on fewer nodes than you like (e.g. one node in the case of num_partitions = 1). To
|
|
avoid this, you can call repartition(). This will add a shuffle step, but means the
|
|
current upstream partitions will be executed in parallel (per whatever the current
|
|
partitioning is).
|
|
|
|
Parameters
|
|
----------
|
|
num_partitions : int
|
|
The target number of partitions.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> psdf = ps.DataFrame({"age": [5, 5, 2, 2],
|
|
... "name": ["Bob", "Bob", "Alice", "Alice"]}).set_index("age")
|
|
>>> psdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
|
|
name
|
|
age
|
|
2 Alice
|
|
2 Alice
|
|
5 Bob
|
|
5 Bob
|
|
>>> new_psdf = psdf.spark.coalesce(1)
|
|
>>> new_psdf.to_spark().rdd.getNumPartitions()
|
|
1
|
|
>>> new_psdf.sort_index() # doctest: +NORMALIZE_WHITESPACE
|
|
name
|
|
age
|
|
2 Alice
|
|
2 Alice
|
|
5 Bob
|
|
5 Bob
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._psdf._internal.resolved_copy
|
|
coalesced_sdf = internal.spark_frame.coalesce(num_partitions)
|
|
return DataFrame(internal.with_new_sdf(coalesced_sdf))
|
|
|
|
def checkpoint(self, eager: bool = True) -> "ps.DataFrame":
|
|
"""Returns a checkpointed version of this DataFrame.
|
|
|
|
Checkpointing can be used to truncate the logical plan of this DataFrame, which is
|
|
especially useful in iterative algorithms where the plan may grow exponentially. It will be
|
|
saved to files inside the checkpoint directory set with `SparkContext.setCheckpointDir`.
|
|
|
|
Parameters
|
|
----------
|
|
eager : bool
|
|
Whether to checkpoint this DataFrame immediately
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> psdf = ps.DataFrame({"a": ["a", "b", "c"]})
|
|
>>> psdf
|
|
a
|
|
0 a
|
|
1 b
|
|
2 c
|
|
>>> new_psdf = psdf.spark.checkpoint() # doctest: +SKIP
|
|
>>> new_psdf # doctest: +SKIP
|
|
a
|
|
0 a
|
|
1 b
|
|
2 c
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._psdf._internal.resolved_copy
|
|
checkpointed_sdf = internal.spark_frame.checkpoint(eager)
|
|
return DataFrame(internal.with_new_sdf(checkpointed_sdf))
|
|
|
|
def local_checkpoint(self, eager: bool = True) -> "ps.DataFrame":
|
|
"""Returns a locally checkpointed version of this DataFrame.
|
|
|
|
Checkpointing can be used to truncate the logical plan of this DataFrame, which is
|
|
especially useful in iterative algorithms where the plan may grow exponentially. Local
|
|
checkpoints are stored in the executors using the caching subsystem and therefore they are
|
|
not reliable.
|
|
|
|
Parameters
|
|
----------
|
|
eager : bool
|
|
Whether to locally checkpoint this DataFrame immediately
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> psdf = ps.DataFrame({"a": ["a", "b", "c"]})
|
|
>>> psdf
|
|
a
|
|
0 a
|
|
1 b
|
|
2 c
|
|
>>> new_psdf = psdf.spark.local_checkpoint()
|
|
>>> new_psdf
|
|
a
|
|
0 a
|
|
1 b
|
|
2 c
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
internal = self._psdf._internal.resolved_copy
|
|
checkpointed_sdf = internal.spark_frame.localCheckpoint(eager)
|
|
return DataFrame(internal.with_new_sdf(checkpointed_sdf))
|
|
|
|
@property
|
|
def analyzed(self) -> "ps.DataFrame":
|
|
"""
|
|
Returns a new DataFrame with the analyzed Spark DataFrame.
|
|
|
|
After multiple operations, the underlying Spark plan could grow huge
|
|
and make the Spark planner take a long time to finish the planning.
|
|
|
|
This function is for the workaround to avoid it.
|
|
|
|
.. note:: After analyzed, operations between the analyzed DataFrame and the original one
|
|
will **NOT** work without setting a config `compute.ops_on_diff_frames` to `True`.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
|
|
>>> df
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
The analyzed one should return the same value.
|
|
|
|
>>> df.spark.analyzed
|
|
a b
|
|
0 1 4
|
|
1 2 5
|
|
2 3 6
|
|
|
|
However, it won't work with the same anchor Series.
|
|
|
|
>>> df + df.spark.analyzed
|
|
Traceback (most recent call last):
|
|
...
|
|
ValueError: ... enable 'compute.ops_on_diff_frames' option.
|
|
|
|
>>> with ps.option_context('compute.ops_on_diff_frames', True):
|
|
... (df + df.spark.analyzed).sort_index()
|
|
a b
|
|
0 2 8
|
|
1 4 10
|
|
2 6 12
|
|
"""
|
|
from pyspark.pandas.frame import DataFrame
|
|
|
|
return DataFrame(self._psdf._internal.resolved_copy)
|
|
|
|
|
|
class CachedSparkFrameMethods(SparkFrameMethods):
|
|
"""Spark related features for cached DataFrame. This is usually created via
|
|
`df.spark.cache()`."""
|
|
|
|
def __init__(self, frame: "CachedDataFrame"):
|
|
super().__init__(frame)
|
|
|
|
@property
|
|
def storage_level(self) -> StorageLevel:
|
|
"""
|
|
Return the storage level of this cache.
|
|
|
|
Examples
|
|
--------
|
|
>>> import pyspark.pandas as ps
|
|
>>> import pyspark
|
|
>>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
|
|
... columns=['dogs', 'cats'])
|
|
>>> df
|
|
dogs cats
|
|
0 0.2 0.3
|
|
1 0.0 0.6
|
|
2 0.6 0.0
|
|
3 0.2 0.1
|
|
|
|
>>> with df.spark.cache() as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
...
|
|
Disk Memory Deserialized 1x Replicated
|
|
|
|
Set the StorageLevel to `MEMORY_ONLY`.
|
|
|
|
>>> with df.spark.persist(pyspark.StorageLevel.MEMORY_ONLY) as cached_df:
|
|
... print(cached_df.spark.storage_level)
|
|
...
|
|
Memory Serialized 1x Replicated
|
|
"""
|
|
return self._psdf._cached.storageLevel
|
|
|
|
def unpersist(self) -> None:
|
|
"""
|
|
The `unpersist` function is used to uncache the pandas-on-Spark DataFrame when it
|
|
is not used with `with` statement.
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
|
|
Examples
|
|
--------
|
|
>>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
|
|
... columns=['dogs', 'cats'])
|
|
>>> df = df.spark.cache()
|
|
|
|
To uncache the dataframe, use `unpersist` function
|
|
|
|
>>> df.spark.unpersist()
|
|
"""
|
|
if self._psdf._cached.is_cached:
|
|
self._psdf._cached.unpersist()
|
|
|
|
|
|
def _test() -> None:
|
|
import os
|
|
import doctest
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import uuid
|
|
import numpy
|
|
import pandas
|
|
from pyspark.sql import SparkSession
|
|
import pyspark.pandas.spark.accessors
|
|
|
|
os.chdir(os.environ["SPARK_HOME"])
|
|
|
|
globs = pyspark.pandas.spark.accessors.__dict__.copy()
|
|
globs["np"] = numpy
|
|
globs["pd"] = pandas
|
|
globs["ps"] = pyspark.pandas
|
|
spark = (
|
|
SparkSession.builder.master("local[4]")
|
|
.appName("pyspark.pandas.spark.accessors tests")
|
|
.getOrCreate()
|
|
)
|
|
|
|
db_name = "db%s" % str(uuid.uuid4()).replace("-", "")
|
|
spark.sql("CREATE DATABASE %s" % db_name)
|
|
globs["db"] = db_name
|
|
|
|
path = tempfile.mkdtemp()
|
|
globs["path"] = path
|
|
|
|
(failure_count, test_count) = doctest.testmod(
|
|
pyspark.pandas.spark.accessors,
|
|
globs=globs,
|
|
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
|
|
)
|
|
|
|
shutil.rmtree(path, ignore_errors=True)
|
|
spark.sql("DROP DATABASE IF EXISTS %s CASCADE" % db_name)
|
|
spark.stop()
|
|
if failure_count:
|
|
sys.exit(-1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_test()
|