[SPARK-35806][PYTHON] Mapping the mode
argument to pandas in DataFrame.to_csv
### What changes were proposed in this pull request?
The `DataFrame.to_csv` has `mode` arguments both in pandas and pandas API on Spark.
However, pandas allows the string "w", "w+", "a", "a+" where as pandas-on-Spark allows "append", "overwrite", "ignore", "error" or "errorifexists".
We should map them while `mode` can still accept the existing parameters("append", "overwrite", "ignore", "error" or "errorifexists") as well.
### Why are the changes needed?
APIs in pandas-on-Spark should follows the behavior of pandas for preventing the existing pandas code break.
### Does this PR introduce _any_ user-facing change?
`DataFrame.to_csv` now can accept "w", "w+", "a", "a+" as well, same as pandas.
### How was this patch tested?
Add the unit test and manually write the file with the new acceptable strings.
Closes #33414 from itholic/SPARK-35806.
Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 2f42afc53a
)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
parent
b80ceb552d
commit
8d58211b9d
|
@ -74,6 +74,7 @@ from pyspark.pandas.utils import (
|
|||
sql_conf,
|
||||
validate_arguments_and_invoke_function,
|
||||
validate_axis,
|
||||
validate_mode,
|
||||
SPARK_CONF_ARROW_ENABLED,
|
||||
)
|
||||
|
||||
|
@ -650,7 +651,7 @@ class Frame(object, metaclass=ABCMeta):
|
|||
date_format: Optional[str] = None,
|
||||
escapechar: Optional[str] = None,
|
||||
num_files: Optional[int] = None,
|
||||
mode: str = "overwrite",
|
||||
mode: str = "w",
|
||||
partition_cols: Optional[Union[str, List[str]]] = None,
|
||||
index_col: Optional[Union[str, List[str]]] = None,
|
||||
**options: Any
|
||||
|
@ -688,14 +689,16 @@ class Frame(object, metaclass=ABCMeta):
|
|||
when appropriate.
|
||||
num_files : the number of files to be written in `path` directory when
|
||||
this is a path.
|
||||
mode : str {'append', 'overwrite', 'ignore', 'error', 'errorifexists'},
|
||||
default 'overwrite'. Specifies the behavior of the save operation when the
|
||||
destination exists already.
|
||||
mode : str
|
||||
Python write mode, default 'w'.
|
||||
|
||||
- 'append': Append the new data to existing data.
|
||||
- 'overwrite': Overwrite existing data.
|
||||
- 'ignore': Silently ignore this operation if data already exists.
|
||||
- 'error' or 'errorifexists': Throw an exception if data already exists.
|
||||
.. note:: mode can accept the strings for Spark writing mode.
|
||||
Such as 'append', 'overwrite', 'ignore', 'error', 'errorifexists'.
|
||||
|
||||
- 'append' (equivalent to 'a'): Append the new data to existing data.
|
||||
- 'overwrite' (equivalent to 'w'): Overwrite existing data.
|
||||
- 'ignore': Silently ignore this operation if data already exists.
|
||||
- 'error' or 'errorifexists': Throw an exception if data already exists.
|
||||
|
||||
partition_cols : str or list of str, optional, default None
|
||||
Names of partitioning columns
|
||||
|
@ -867,6 +870,7 @@ class Frame(object, metaclass=ABCMeta):
|
|||
)
|
||||
sdf = sdf.repartition(num_files)
|
||||
|
||||
mode = validate_mode(mode)
|
||||
builder = sdf.write.mode(mode)
|
||||
if partition_cols is not None:
|
||||
builder.partitionBy(partition_cols)
|
||||
|
|
|
@ -21,6 +21,7 @@ from pyspark.pandas.utils import (
|
|||
lazy_property,
|
||||
validate_arguments_and_invoke_function,
|
||||
validate_bool_kwarg,
|
||||
validate_mode,
|
||||
)
|
||||
from pyspark.testing.pandasutils import PandasOnSparkTestCase
|
||||
from pyspark.testing.sqlutils import SQLTestUtils
|
||||
|
@ -82,6 +83,15 @@ class UtilsTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
):
|
||||
validate_bool_kwarg(pandas_on_spark, "pandas_on_spark")
|
||||
|
||||
def test_validate_mode(self):
|
||||
self.assert_eq(validate_mode("a"), "append")
|
||||
self.assert_eq(validate_mode("w"), "overwrite")
|
||||
self.assert_eq(validate_mode("a+"), "append")
|
||||
self.assert_eq(validate_mode("w+"), "overwrite")
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
validate_mode("r")
|
||||
|
||||
|
||||
class TestClassForLazyProp:
|
||||
def __init__(self):
|
||||
|
|
|
@ -754,6 +754,34 @@ def validate_how(how: str) -> str:
|
|||
return how
|
||||
|
||||
|
||||
def validate_mode(mode: str) -> str:
|
||||
"""Check the given mode for writing is valid."""
|
||||
if mode in ("w", "w+"):
|
||||
# 'w' in pandas equals 'overwrite' in Spark
|
||||
# '+' is meaningless for writing methods, but pandas just pass it as 'w'.
|
||||
mode = "overwrite"
|
||||
if mode in ("a", "a+"):
|
||||
# 'a' in pandas equals 'append' in Spark
|
||||
# '+' is meaningless for writing methods, but pandas just pass it as 'a'.
|
||||
mode = "append"
|
||||
if mode not in (
|
||||
"w",
|
||||
"a",
|
||||
"w+",
|
||||
"a+",
|
||||
"overwrite",
|
||||
"append",
|
||||
"ignore",
|
||||
"error",
|
||||
"errorifexists",
|
||||
):
|
||||
raise ValueError(
|
||||
"The 'mode' parameter has to be amongst the following values: ",
|
||||
"['w', 'a', 'w+', 'a+', 'overwrite', 'append', 'ignore', 'error', 'errorifexists']",
|
||||
)
|
||||
return mode
|
||||
|
||||
|
||||
@overload
|
||||
def verify_temp_column_name(df: SparkDataFrame, column_name_or_label: str) -> str:
|
||||
...
|
||||
|
|
Loading…
Reference in a new issue