6b912e4179
### What changes were proposed in this pull request? There are still naming related to Koalas in test and function name. This PR addressed them to fit pandas-on-spark. - kdf -> psdf - kser -> psser - kidx -> psidx - kmidx -> psmidx - to_koalas() -> to_pandas_on_spark() ### Why are the changes needed? This is because the name Koalas is no longer used in PySpark. ### Does this PR introduce _any_ user-facing change? `to_koalas()` function is renamed to `to_pandas_on_spark()` ### How was this patch tested? Tested in local manually. After changing the related naming, I checked them one by one. Closes #32516 from itholic/SPARK-35364. Authored-by: itholic <haejoon.lee@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
436 lines
15 KiB
Python
436 lines
15 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from contextlib import contextmanager
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from pyspark import pandas as ps
|
|
from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
|
|
|
|
|
|
def normalize_text(s):
|
|
return "\n".join(map(str.strip, s.strip().split("\n")))
|
|
|
|
|
|
class CsvTest(PandasOnSparkTestCase, TestUtils):
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp(prefix=CsvTest.__name__)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir, ignore_errors=True)
|
|
|
|
@property
|
|
def csv_text(self):
|
|
return normalize_text(
|
|
"""
|
|
name,amount
|
|
Alice,100
|
|
Bob,-200
|
|
Charlie,300
|
|
Dennis,400
|
|
Edith,-500
|
|
Frank,600
|
|
Alice,200
|
|
Frank,-200
|
|
Bob,600
|
|
Alice,400
|
|
Frank,200
|
|
Alice,300
|
|
Edith,600
|
|
"""
|
|
)
|
|
|
|
@property
|
|
def csv_text_2(self):
|
|
return normalize_text(
|
|
"""
|
|
A,B
|
|
item1,1
|
|
item2,1,2
|
|
item3,1,2,3,4
|
|
item4,1
|
|
"""
|
|
)
|
|
|
|
@property
|
|
def csv_text_with_comments(self):
|
|
return normalize_text(
|
|
"""
|
|
# header
|
|
%s
|
|
# comment
|
|
Alice,400
|
|
Edith,600
|
|
# footer
|
|
"""
|
|
% self.csv_text
|
|
)
|
|
|
|
@property
|
|
def tab_delimited_csv_text(self):
|
|
return normalize_text(
|
|
"""
|
|
name\tamount
|
|
Alice\t100
|
|
Bob\t-200
|
|
Charlie\t300
|
|
"""
|
|
)
|
|
|
|
@property
|
|
def q_quoted_csv_text(self):
|
|
return normalize_text(
|
|
"""
|
|
QnameQ,QamountQ
|
|
QA,liceQ,Q100Q
|
|
QB,obQ,Q-200Q
|
|
QC,harlieQ,Q300Q
|
|
"""
|
|
)
|
|
|
|
@property
|
|
def e_escapeted_csv_text(self):
|
|
return normalize_text(
|
|
"""
|
|
name,amount
|
|
"AE"lice",100
|
|
"BE"ob",-200
|
|
"CE"harlie",300
|
|
"""
|
|
)
|
|
|
|
@contextmanager
|
|
def csv_file(self, csv):
|
|
with self.temp_file() as tmp:
|
|
with open(tmp, "w") as f:
|
|
f.write(csv)
|
|
yield tmp
|
|
|
|
def test_read_csv(self):
|
|
with self.csv_file(self.csv_text) as fn:
|
|
|
|
def check(header="infer", names=None, usecols=None, index_col=None):
|
|
expected = pd.read_csv(
|
|
fn, header=header, names=names, usecols=usecols, index_col=index_col
|
|
)
|
|
actual = ps.read_csv(
|
|
fn, header=header, names=names, usecols=usecols, index_col=index_col
|
|
)
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
check()
|
|
check(header=0)
|
|
check(header=None)
|
|
check(names=["n", "a"])
|
|
check(names=[("x", "n"), ("y", "a")])
|
|
check(names=[10, 20])
|
|
check(header=0, names=["n", "a"])
|
|
check(usecols=[1])
|
|
check(usecols=[1, 0])
|
|
check(usecols=["amount"])
|
|
check(usecols=["amount", "name"])
|
|
check(usecols=[])
|
|
check(usecols=[1, 1])
|
|
check(usecols=["amount", "amount"])
|
|
check(header=None, usecols=[1])
|
|
check(names=["n", "a"], usecols=["a"])
|
|
check(header=None, names=["n", "a"], usecols=["a"])
|
|
check(index_col=["amount"])
|
|
check(header=None, index_col=[1])
|
|
check(names=["n", "a"], index_col=["a"])
|
|
|
|
# check with pyspark patch.
|
|
expected = pd.read_csv(fn)
|
|
actual = ps.read_csv(fn)
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
self.assertRaisesRegex(
|
|
ValueError, "non-unique", lambda: ps.read_csv(fn, names=["n", "n"])
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError,
|
|
"does not match the number.*3",
|
|
lambda: ps.read_csv(fn, names=["n", "a", "b"]),
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError,
|
|
"does not match the number.*3",
|
|
lambda: ps.read_csv(fn, header=0, names=["n", "a", "b"]),
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError, "Usecols do not match.*3", lambda: ps.read_csv(fn, usecols=[1, 3])
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError,
|
|
"Usecols do not match.*col",
|
|
lambda: ps.read_csv(fn, usecols=["amount", "col"]),
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError, "Unknown header argument 1", lambda: ps.read_csv(fn, header="1")
|
|
)
|
|
expected_error_message = (
|
|
"'usecols' must either be list-like of all strings, "
|
|
"all unicode, all integers or a callable."
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError, expected_error_message, lambda: ps.read_csv(fn, usecols=[1, "amount"])
|
|
)
|
|
|
|
# check with index_col
|
|
expected = pd.read_csv(fn).set_index("name")
|
|
actual = ps.read_csv(fn, index_col="name")
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
def test_read_with_spark_schema(self):
|
|
with self.csv_file(self.csv_text_2) as fn:
|
|
actual = ps.read_csv(fn, names="A string, B string, C long, D long, E long")
|
|
expected = pd.read_csv(fn, names=["A", "B", "C", "D", "E"])
|
|
self.assert_eq(expected, actual)
|
|
|
|
def test_read_csv_with_comment(self):
|
|
with self.csv_file(self.csv_text_with_comments) as fn:
|
|
expected = pd.read_csv(fn, comment="#")
|
|
actual = ps.read_csv(fn, comment="#")
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
self.assertRaisesRegex(
|
|
ValueError,
|
|
"Only length-1 comment characters supported",
|
|
lambda: ps.read_csv(fn, comment="").show(),
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError,
|
|
"Only length-1 comment characters supported",
|
|
lambda: ps.read_csv(fn, comment="##").show(),
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError,
|
|
"Only length-1 comment characters supported",
|
|
lambda: ps.read_csv(fn, comment=1),
|
|
)
|
|
self.assertRaisesRegex(
|
|
ValueError,
|
|
"Only length-1 comment characters supported",
|
|
lambda: ps.read_csv(fn, comment=[1]),
|
|
)
|
|
|
|
def test_read_csv_with_limit(self):
|
|
with self.csv_file(self.csv_text_with_comments) as fn:
|
|
expected = pd.read_csv(fn, comment="#", nrows=2)
|
|
actual = ps.read_csv(fn, comment="#", nrows=2)
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
def test_read_csv_with_sep(self):
|
|
with self.csv_file(self.tab_delimited_csv_text) as fn:
|
|
expected = pd.read_csv(fn, sep="\t")
|
|
actual = ps.read_csv(fn, sep="\t")
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
def test_read_csv_with_squeeze(self):
|
|
with self.csv_file(self.csv_text) as fn:
|
|
expected = pd.read_csv(fn, squeeze=True, usecols=["name"])
|
|
actual = ps.read_csv(fn, squeeze=True, usecols=["name"])
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
expected = pd.read_csv(fn, squeeze=True, usecols=["name", "amount"])
|
|
actual = ps.read_csv(fn, squeeze=True, usecols=["name", "amount"])
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
expected = pd.read_csv(fn, squeeze=True, usecols=["name", "amount"], index_col=["name"])
|
|
actual = ps.read_csv(fn, squeeze=True, usecols=["name", "amount"], index_col=["name"])
|
|
self.assert_eq(expected, actual, almost=True)
|
|
|
|
def test_read_csv_with_mangle_dupe_cols(self):
|
|
self.assertRaisesRegex(
|
|
ValueError, "mangle_dupe_cols", lambda: ps.read_csv("path", mangle_dupe_cols=False)
|
|
)
|
|
|
|
def test_read_csv_with_parse_dates(self):
|
|
self.assertRaisesRegex(
|
|
ValueError, "parse_dates", lambda: ps.read_csv("path", parse_dates=True)
|
|
)
|
|
|
|
def test_read_csv_with_dtype(self):
|
|
with self.csv_file(self.csv_text) as fn:
|
|
self.assert_eq(ps.read_csv(fn), pd.read_csv(fn), almost=True)
|
|
self.assert_eq(ps.read_csv(fn, dtype=str), pd.read_csv(fn, dtype=str))
|
|
self.assert_eq(
|
|
ps.read_csv(fn, dtype={"amount": "int64"}),
|
|
pd.read_csv(fn, dtype={"amount": "int64"}),
|
|
)
|
|
|
|
def test_read_csv_with_quotechar(self):
|
|
with self.csv_file(self.q_quoted_csv_text) as fn:
|
|
self.assert_eq(
|
|
ps.read_csv(fn, quotechar="Q"), pd.read_csv(fn, quotechar="Q"), almost=True
|
|
)
|
|
|
|
def test_read_csv_with_escapechar(self):
|
|
with self.csv_file(self.e_escapeted_csv_text) as fn:
|
|
self.assert_eq(
|
|
ps.read_csv(fn, escapechar="E"), pd.read_csv(fn, escapechar="E"), almost=True
|
|
)
|
|
|
|
self.assert_eq(
|
|
ps.read_csv(fn, escapechar="ABC", escape="E"),
|
|
pd.read_csv(fn, escapechar="E"),
|
|
almost=True,
|
|
)
|
|
|
|
def test_to_csv(self):
|
|
pdf = pd.DataFrame({"aa": [1, 2, 3], "bb": [4, 5, 6]}, index=[0, 1, 3])
|
|
psdf = ps.DataFrame(pdf)
|
|
|
|
self.assert_eq(psdf.to_csv(), pdf.to_csv(index=False))
|
|
self.assert_eq(psdf.to_csv(columns=["aa"]), pdf.to_csv(columns=["aa"], index=False))
|
|
self.assert_eq(psdf.aa.to_csv(), pdf.aa.to_csv(index=False, header=True))
|
|
|
|
pdf = pd.DataFrame({"a": [1, np.nan, 3], "b": ["one", "two", None]}, index=[0, 1, 3])
|
|
psdf = ps.from_pandas(pdf)
|
|
|
|
self.assert_eq(psdf.to_csv(na_rep="null"), pdf.to_csv(na_rep="null", index=False))
|
|
self.assert_eq(
|
|
psdf.a.to_csv(na_rep="null"), pdf.a.to_csv(na_rep="null", index=False, header=True)
|
|
)
|
|
|
|
self.assertRaises(KeyError, lambda: psdf.to_csv(columns=["ab"]))
|
|
|
|
pdf = pd.DataFrame({"a": [1.0, 2.0, 3.0], "b": [4.0, 5.0, 6.0]}, index=[0, 1, 3])
|
|
psdf = ps.from_pandas(pdf)
|
|
|
|
self.assert_eq(psdf.to_csv(), pdf.to_csv(index=False))
|
|
self.assert_eq(psdf.to_csv(header=False), pdf.to_csv(header=False, index=False))
|
|
self.assert_eq(psdf.to_csv(), pdf.to_csv(index=False))
|
|
|
|
# non-string names
|
|
pdf = pd.DataFrame({10: [1, 2, 3], 20: [4, 5, 6]}, index=[0, 1, 3])
|
|
psdf = ps.DataFrame(pdf)
|
|
|
|
self.assert_eq(psdf.to_csv(), pdf.to_csv(index=False))
|
|
self.assert_eq(psdf.to_csv(columns=[10]), pdf.to_csv(columns=[10], index=False))
|
|
|
|
self.assertRaises(TypeError, lambda: psdf.to_csv(columns=10))
|
|
|
|
def _check_output(self, dir, expected):
|
|
output_paths = [path for path in os.listdir(dir) if path.startswith("part-")]
|
|
assert len(output_paths) > 0
|
|
output_path = "%s/%s" % (dir, output_paths[0])
|
|
with open(output_path) as f:
|
|
self.assertEqual(f.read(), expected)
|
|
|
|
def test_to_csv_with_path(self):
|
|
pdf = pd.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
|
|
psdf = ps.DataFrame(pdf)
|
|
|
|
tmp_dir = "{}/tmp1".format(self.tmp_dir)
|
|
|
|
psdf.to_csv(tmp_dir, num_files=1)
|
|
self._check_output(tmp_dir, pdf.to_csv(index=False))
|
|
|
|
tmp_dir = "{}/tmp2".format(self.tmp_dir)
|
|
|
|
self.assertRaises(KeyError, lambda: psdf.to_csv(tmp_dir, columns=["c"], num_files=1))
|
|
|
|
# non-string names
|
|
pdf = pd.DataFrame({10: [1, 2, 3], 20: ["a", "b", "c"]})
|
|
psdf = ps.DataFrame(pdf)
|
|
|
|
tmp_dir = "{}/tmp3".format(self.tmp_dir)
|
|
|
|
psdf.to_csv(tmp_dir, num_files=1)
|
|
self._check_output(tmp_dir, pdf.to_csv(index=False))
|
|
|
|
tmp_dir = "{}/tmp4".format(self.tmp_dir)
|
|
|
|
psdf.to_csv(tmp_dir, columns=[10], num_files=1)
|
|
self._check_output(tmp_dir, pdf.to_csv(columns=[10], index=False))
|
|
|
|
tmp_dir = "{}/tmp5".format(self.tmp_dir)
|
|
|
|
self.assertRaises(TypeError, lambda: psdf.to_csv(tmp_dir, columns=10, num_files=1))
|
|
|
|
def test_to_csv_with_path_and_basic_options(self):
|
|
pdf = pd.DataFrame({"aa": [1, 2, 3], "bb": ["a", "b", "c"]})
|
|
psdf = ps.DataFrame(pdf)
|
|
|
|
psdf.to_csv(self.tmp_dir, num_files=1, sep="|", header=False, columns=["aa"])
|
|
expected = pdf.to_csv(index=False, sep="|", header=False, columns=["aa"])
|
|
|
|
self._check_output(self.tmp_dir, expected)
|
|
|
|
def test_to_csv_with_path_and_basic_options_multiindex_columns(self):
|
|
pdf = pd.DataFrame({("x", "a"): [1, 2, 3], ("y", "b"): ["a", "b", "c"]})
|
|
psdf = ps.DataFrame(pdf)
|
|
|
|
with self.assertRaises(ValueError):
|
|
psdf.to_csv(self.tmp_dir, num_files=1, sep="|", columns=[("x", "a")])
|
|
|
|
psdf.to_csv(self.tmp_dir, num_files=1, sep="|", header=["a"], columns=[("x", "a")])
|
|
pdf.columns = ["a", "b"]
|
|
expected = pdf.to_csv(index=False, sep="|", columns=["a"])
|
|
|
|
self._check_output(self.tmp_dir, expected)
|
|
|
|
def test_to_csv_with_path_and_pyspark_options(self):
|
|
pdf = pd.DataFrame({"a": [1, 2, 3, None], "b": ["a", "b", "c", None]})
|
|
psdf = ps.DataFrame(pdf)
|
|
|
|
psdf.to_csv(self.tmp_dir, nullValue="null", num_files=1)
|
|
expected = pdf.to_csv(index=False, na_rep="null")
|
|
|
|
self._check_output(self.tmp_dir, expected)
|
|
|
|
def test_to_csv_with_partition_cols(self):
|
|
pdf = pd.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
|
|
psdf = ps.DataFrame(pdf)
|
|
|
|
psdf.to_csv(self.tmp_dir, partition_cols="b", num_files=1)
|
|
|
|
partition_paths = [path for path in os.listdir(self.tmp_dir) if path.startswith("b=")]
|
|
assert len(partition_paths) > 0
|
|
for partition_path in partition_paths:
|
|
column, value = partition_path.split("=")
|
|
expected = pdf[pdf[column] == value].drop("b", axis=1).to_csv(index=False)
|
|
|
|
output_paths = [
|
|
path
|
|
for path in os.listdir("%s/%s" % (self.tmp_dir, partition_path))
|
|
if path.startswith("part-")
|
|
]
|
|
assert len(output_paths) > 0
|
|
output_path = "%s/%s/%s" % (self.tmp_dir, partition_path, output_paths[0])
|
|
with open(output_path) as f:
|
|
self.assertEqual(f.read(), expected)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import unittest
|
|
from pyspark.pandas.tests.test_csv import * # noqa: F401
|
|
|
|
try:
|
|
import xmlrunner # type: ignore[import]
|
|
|
|
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
|
|
except ImportError:
|
|
testRunner = None
|
|
unittest.main(testRunner=testRunner, verbosity=2)
|