# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from distutils.version import LooseVersion import unittest import glob import os import numpy as np import pandas as pd import pyarrow as pa from pyspark import pandas as ps from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils class DataFrameSparkIOTest(PandasOnSparkTestCase, TestUtils): """Test cases for big data I/O using Spark.""" @property def test_column_order(self): return ["i32", "i64", "f", "bhello"] @property def test_pdf(self): pdf = pd.DataFrame( { "i32": np.arange(20, dtype=np.int32) % 3, "i64": np.arange(20, dtype=np.int64) % 5, "f": np.arange(20, dtype=np.float64), "bhello": np.random.choice(["hello", "yo", "people"], size=20).astype("O"), }, columns=self.test_column_order, index=np.random.rand(20), ) return pdf def test_parquet_read(self): with self.temp_dir() as tmp: data = self.test_pdf self.spark.createDataFrame(data, "i32 int, i64 long, f double, bhello string").coalesce( 1 ).write.parquet(tmp, mode="overwrite") def check(columns, expected): if LooseVersion("0.21.1") <= LooseVersion(pd.__version__): expected = pd.read_parquet(tmp, columns=columns) actual = ps.read_parquet(tmp, columns=columns) self.assertPandasEqual(expected, actual.to_pandas()) check(None, data) check(["i32", "i64"], data[["i32", "i64"]]) check(["i64", "i32"], data[["i64", "i32"]]) if LooseVersion(pa.__version__) < LooseVersion("1.0.0"): # TODO: `pd.read_parquet()` changed the behavior due to PyArrow 1.0.0. # We might want to adjust the behavior. Let's see how pandas handles it. check(("i32", "i64"), data[["i32", "i64"]]) check(["a", "b", "i32", "i64"], data[["i32", "i64"]]) check([], pd.DataFrame([])) check(["a"], pd.DataFrame([])) check("i32", pd.DataFrame([])) check("float", data[["f"]]) # check with pyspark patch. if LooseVersion("0.21.1") <= LooseVersion(pd.__version__): expected = pd.read_parquet(tmp) else: expected = data actual = ps.read_parquet(tmp) self.assertPandasEqual(expected, actual.to_pandas()) # When index columns are known pdf = self.test_pdf expected = ps.DataFrame(pdf) expected_idx = expected.set_index("bhello")[["f", "i32", "i64"]] actual_idx = ps.read_parquet(tmp, index_col="bhello")[["f", "i32", "i64"]] self.assert_eq( actual_idx.sort_values(by="f").to_spark().toPandas(), expected_idx.sort_values(by="f").to_spark().toPandas(), ) def test_parquet_read_with_pandas_metadata(self): with self.temp_dir() as tmp: expected1 = self.test_pdf path1 = "{}/file1.parquet".format(tmp) expected1.to_parquet(path1) self.assert_eq(ps.read_parquet(path1, pandas_metadata=True), expected1) expected2 = expected1.reset_index() path2 = "{}/file2.parquet".format(tmp) expected2.to_parquet(path2) self.assert_eq(ps.read_parquet(path2, pandas_metadata=True), expected2) expected3 = expected2.set_index("index", append=True) path3 = "{}/file3.parquet".format(tmp) expected3.to_parquet(path3) self.assert_eq(ps.read_parquet(path3, pandas_metadata=True), expected3) def test_parquet_write(self): with self.temp_dir() as tmp: pdf = self.test_pdf expected = ps.DataFrame(pdf) # Write out partitioned by one column expected.to_parquet(tmp, mode="overwrite", partition_cols="i32") # Reset column order, as once the data is written out, Spark rearranges partition # columns to appear first. actual = ps.read_parquet(tmp) self.assertFalse((actual.columns == self.test_column_order).all()) actual = actual[self.test_column_order] self.assert_eq( actual.sort_values(by="f").to_spark().toPandas(), expected.sort_values(by="f").to_spark().toPandas(), ) # Write out partitioned by two columns expected.to_parquet(tmp, mode="overwrite", partition_cols=["i32", "bhello"]) # Reset column order, as once the data is written out, Spark rearranges partition # columns to appear first. actual = ps.read_parquet(tmp) self.assertFalse((actual.columns == self.test_column_order).all()) actual = actual[self.test_column_order] self.assert_eq( actual.sort_values(by="f").to_spark().toPandas(), expected.sort_values(by="f").to_spark().toPandas(), ) def test_table(self): with self.table("test_table"): pdf = self.test_pdf expected = ps.DataFrame(pdf) # Write out partitioned by one column expected.spark.to_table("test_table", mode="overwrite", partition_cols="i32") # Reset column order, as once the data is written out, Spark rearranges partition # columns to appear first. actual = ps.read_table("test_table") self.assertFalse((actual.columns == self.test_column_order).all()) actual = actual[self.test_column_order] self.assert_eq( actual.sort_values(by="f").to_spark().toPandas(), expected.sort_values(by="f").to_spark().toPandas(), ) # Write out partitioned by two columns expected.to_table("test_table", mode="overwrite", partition_cols=["i32", "bhello"]) # Reset column order, as once the data is written out, Spark rearranges partition # columns to appear first. actual = ps.read_table("test_table") self.assertFalse((actual.columns == self.test_column_order).all()) actual = actual[self.test_column_order] self.assert_eq( actual.sort_values(by="f").to_spark().toPandas(), expected.sort_values(by="f").to_spark().toPandas(), ) # When index columns are known expected_idx = expected.set_index("bhello")[["f", "i32", "i64"]] actual_idx = ps.read_table("test_table", index_col="bhello")[["f", "i32", "i64"]] self.assert_eq( actual_idx.sort_values(by="f").to_spark().toPandas(), expected_idx.sort_values(by="f").to_spark().toPandas(), ) expected_idx = expected.set_index(["bhello"])[["f", "i32", "i64"]] actual_idx = ps.read_table("test_table", index_col=["bhello"])[["f", "i32", "i64"]] self.assert_eq( actual_idx.sort_values(by="f").to_spark().toPandas(), expected_idx.sort_values(by="f").to_spark().toPandas(), ) expected_idx = expected.set_index(["i32", "bhello"])[["f", "i64"]] actual_idx = ps.read_table("test_table", index_col=["i32", "bhello"])[["f", "i64"]] self.assert_eq( actual_idx.sort_values(by="f").to_spark().toPandas(), expected_idx.sort_values(by="f").to_spark().toPandas(), ) def test_spark_io(self): with self.temp_dir() as tmp: pdf = self.test_pdf expected = ps.DataFrame(pdf) # Write out partitioned by one column expected.to_spark_io(tmp, format="json", mode="overwrite", partition_cols="i32") # Reset column order, as once the data is written out, Spark rearranges partition # columns to appear first. actual = ps.read_spark_io(tmp, format="json") self.assertFalse((actual.columns == self.test_column_order).all()) actual = actual[self.test_column_order] self.assert_eq( actual.sort_values(by="f").to_spark().toPandas(), expected.sort_values(by="f").to_spark().toPandas(), ) # Write out partitioned by two columns expected.to_spark_io( tmp, format="json", mode="overwrite", partition_cols=["i32", "bhello"] ) # Reset column order, as once the data is written out, Spark rearranges partition # columns to appear first. actual = ps.read_spark_io(path=tmp, format="json") self.assertFalse((actual.columns == self.test_column_order).all()) actual = actual[self.test_column_order] self.assert_eq( actual.sort_values(by="f").to_spark().toPandas(), expected.sort_values(by="f").to_spark().toPandas(), ) # When index columns are known pdf = self.test_pdf expected = ps.DataFrame(pdf) col_order = ["f", "i32", "i64"] expected_idx = expected.set_index("bhello")[col_order] actual_idx = ps.read_spark_io(tmp, format="json", index_col="bhello")[col_order] self.assert_eq( actual_idx.sort_values(by="f").to_spark().toPandas(), expected_idx.sort_values(by="f").to_spark().toPandas(), ) @unittest.skip("openpyxl") def test_read_excel(self): with self.temp_dir() as tmp: path1 = "{}/file1.xlsx".format(tmp) self.test_pdf[["i32"]].to_excel(path1) self.assert_eq(ps.read_excel(open(path1, "rb")), pd.read_excel(open(path1, "rb"))) self.assert_eq( ps.read_excel(open(path1, "rb"), index_col=0), pd.read_excel(open(path1, "rb"), index_col=0), ) self.assert_eq( ps.read_excel(open(path1, "rb"), index_col=0, squeeze=True), pd.read_excel(open(path1, "rb"), index_col=0, squeeze=True), ) self.assert_eq(ps.read_excel(path1), pd.read_excel(path1)) self.assert_eq(ps.read_excel(path1, index_col=0), pd.read_excel(path1, index_col=0)) self.assert_eq( ps.read_excel(path1, index_col=0, squeeze=True), pd.read_excel(path1, index_col=0, squeeze=True), ) self.assert_eq(ps.read_excel(tmp), pd.read_excel(path1)) path2 = "{}/file2.xlsx".format(tmp) self.test_pdf[["i32"]].to_excel(path2) self.assert_eq( ps.read_excel(tmp, index_col=0).sort_index(), pd.concat( [pd.read_excel(path1, index_col=0), pd.read_excel(path2, index_col=0)] ).sort_index(), ) self.assert_eq( ps.read_excel(tmp, index_col=0, squeeze=True).sort_index(), pd.concat( [ pd.read_excel(path1, index_col=0, squeeze=True), pd.read_excel(path2, index_col=0, squeeze=True), ] ).sort_index(), ) with self.temp_dir() as tmp: path1 = "{}/file1.xlsx".format(tmp) with pd.ExcelWriter(path1) as writer: self.test_pdf.to_excel(writer, sheet_name="Sheet_name_1") self.test_pdf[["i32"]].to_excel(writer, sheet_name="Sheet_name_2") sheet_names = [["Sheet_name_1", "Sheet_name_2"], None] pdfs1 = pd.read_excel(open(path1, "rb"), sheet_name=None, index_col=0) pdfs1_squeezed = pd.read_excel( open(path1, "rb"), sheet_name=None, index_col=0, squeeze=True ) for sheet_name in sheet_names: psdfs = ps.read_excel(open(path1, "rb"), sheet_name=sheet_name, index_col=0) self.assert_eq(psdfs["Sheet_name_1"], pdfs1["Sheet_name_1"]) self.assert_eq(psdfs["Sheet_name_2"], pdfs1["Sheet_name_2"]) psdfs = ps.read_excel( open(path1, "rb"), sheet_name=sheet_name, index_col=0, squeeze=True ) self.assert_eq(psdfs["Sheet_name_1"], pdfs1_squeezed["Sheet_name_1"]) self.assert_eq(psdfs["Sheet_name_2"], pdfs1_squeezed["Sheet_name_2"]) self.assert_eq( ps.read_excel(tmp, index_col=0, sheet_name="Sheet_name_2"), pdfs1["Sheet_name_2"], ) for sheet_name in sheet_names: psdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0) self.assert_eq(psdfs["Sheet_name_1"], pdfs1["Sheet_name_1"]) self.assert_eq(psdfs["Sheet_name_2"], pdfs1["Sheet_name_2"]) psdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0, squeeze=True) self.assert_eq(psdfs["Sheet_name_1"], pdfs1_squeezed["Sheet_name_1"]) self.assert_eq(psdfs["Sheet_name_2"], pdfs1_squeezed["Sheet_name_2"]) path2 = "{}/file2.xlsx".format(tmp) with pd.ExcelWriter(path2) as writer: self.test_pdf.to_excel(writer, sheet_name="Sheet_name_1") self.test_pdf[["i32"]].to_excel(writer, sheet_name="Sheet_name_2") pdfs2 = pd.read_excel(path2, sheet_name=None, index_col=0) pdfs2_squeezed = pd.read_excel(path2, sheet_name=None, index_col=0, squeeze=True) self.assert_eq( ps.read_excel(tmp, sheet_name="Sheet_name_2", index_col=0).sort_index(), pd.concat([pdfs1["Sheet_name_2"], pdfs2["Sheet_name_2"]]).sort_index(), ) self.assert_eq( ps.read_excel( tmp, sheet_name="Sheet_name_2", index_col=0, squeeze=True ).sort_index(), pd.concat( [pdfs1_squeezed["Sheet_name_2"], pdfs2_squeezed["Sheet_name_2"]] ).sort_index(), ) for sheet_name in sheet_names: psdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0) self.assert_eq( psdfs["Sheet_name_1"].sort_index(), pd.concat([pdfs1["Sheet_name_1"], pdfs2["Sheet_name_1"]]).sort_index(), ) self.assert_eq( psdfs["Sheet_name_2"].sort_index(), pd.concat([pdfs1["Sheet_name_2"], pdfs2["Sheet_name_2"]]).sort_index(), ) psdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0, squeeze=True) self.assert_eq( psdfs["Sheet_name_1"].sort_index(), pd.concat( [pdfs1_squeezed["Sheet_name_1"], pdfs2_squeezed["Sheet_name_1"]] ).sort_index(), ) self.assert_eq( psdfs["Sheet_name_2"].sort_index(), pd.concat( [pdfs1_squeezed["Sheet_name_2"], pdfs2_squeezed["Sheet_name_2"]] ).sort_index(), ) def test_read_orc(self): with self.temp_dir() as tmp: path = "{}/file1.orc".format(tmp) data = self.test_pdf self.spark.createDataFrame(data, "i32 int, i64 long, f double, bhello string").coalesce( 1 ).write.orc(path, mode="overwrite") # `spark.write.orc` create a directory contains distributed orc files. # But pandas only can read from file, not directory. Therefore, we need orc file path. orc_file_path = glob.glob(os.path.join(path, "*.orc"))[0] expected = data.reset_index()[data.columns] actual = ps.read_orc(path) self.assertPandasEqual(expected, actual.to_pandas()) # columns columns = ["i32", "i64"] expected = data.reset_index()[columns] actual = ps.read_orc(path, columns=columns) self.assertPandasEqual(expected, actual.to_pandas()) # index_col expected = data.set_index("i32") actual = ps.read_orc(path, index_col="i32") self.assert_eq(actual, expected) expected = data.set_index(["i32", "f"]) actual = ps.read_orc(path, index_col=["i32", "f"]) self.assert_eq(actual, expected) # index_col with columns expected = data.set_index("i32")[["i64", "bhello"]] actual = ps.read_orc(path, index_col=["i32"], columns=["i64", "bhello"]) self.assert_eq(actual, expected) expected = data.set_index(["i32", "f"])[["bhello", "i64"]] actual = ps.read_orc(path, index_col=["i32", "f"], columns=["bhello", "i64"]) self.assert_eq(actual, expected) msg = "Unknown column name 'i'" with self.assertRaises(ValueError, msg=msg): ps.read_orc(path, columns="i32") msg = "Unknown column name 'i34'" with self.assertRaises(ValueError, msg=msg): ps.read_orc(path, columns=["i34", "i64"]) def test_orc_write(self): with self.temp_dir() as tmp: pdf = self.test_pdf expected = ps.DataFrame(pdf) # Write out partitioned by one column expected.to_orc(tmp, mode="overwrite", partition_cols="i32") # Reset column order, as once the data is written out, Spark rearranges partition # columns to appear first. actual = ps.read_orc(tmp) self.assertFalse((actual.columns == self.test_column_order).all()) actual = actual[self.test_column_order] self.assert_eq( actual.sort_values(by="f").to_spark().toPandas(), expected.sort_values(by="f").to_spark().toPandas(), ) # Write out partitioned by two columns expected.to_orc(tmp, mode="overwrite", partition_cols=["i32", "bhello"]) # Reset column order, as once the data is written out, Spark rearranges partition # columns to appear first. actual = ps.read_orc(tmp) self.assertFalse((actual.columns == self.test_column_order).all()) actual = actual[self.test_column_order] self.assert_eq( actual.sort_values(by="f").to_spark().toPandas(), expected.sort_values(by="f").to_spark().toPandas(), ) if __name__ == "__main__": from pyspark.pandas.tests.test_dataframe_spark_io import * # noqa: F401 try: import xmlrunner # type: ignore[import] testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) except ImportError: testRunner = None unittest.main(testRunner=testRunner, verbosity=2)