[SPARK-35040][PYTHON] Remove Spark-version related codes from test codes
### What changes were proposed in this pull request? Removes PySpark version dependent codes from pyspark.pandas test codes. ### Why are the changes needed? There are several places to check the PySpark version and switch the logic, but now those are not necessary. We should remove them. We will do the same thing after we finish porting tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #32300 from xinrong-databricks/port.rmv_spark_version_chk_in_tests. Authored-by: Xinrong Meng <xinrong.meng@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
This commit is contained in:
parent
6ab00488d0
commit
4fcbf59079
|
@ -22,7 +22,6 @@ from datetime import datetime
|
|||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyspark
|
||||
|
||||
import pyspark.pandas as ps
|
||||
from pyspark.pandas.exceptions import PandasNotImplementedError
|
||||
|
@ -280,12 +279,7 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils):
|
|||
pidx.names = ["renamed_number", None]
|
||||
kidx.names = ["renamed_number", None]
|
||||
self.assertEqual(kidx.names, pidx.names)
|
||||
if LooseVersion(pyspark.__version__) < LooseVersion("2.4"):
|
||||
# PySpark < 2.4 does not support struct type with arrow enabled.
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(kidx, pidx)
|
||||
else:
|
||||
self.assert_eq(kidx, pidx)
|
||||
self.assert_eq(kidx, pidx)
|
||||
|
||||
with self.assertRaises(PandasNotImplementedError):
|
||||
kidx.name
|
||||
|
@ -1401,11 +1395,7 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils):
|
|||
|
||||
self.assert_eq(kidx.asof("2014-01-01"), pidx.asof("2014-01-01"))
|
||||
self.assert_eq(kidx.asof("2014-01-02"), pidx.asof("2014-01-02"))
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("3.0"):
|
||||
self.assert_eq(repr(kidx.asof("1999-01-02")), repr(pidx.asof("1999-01-02")))
|
||||
else:
|
||||
# FIXME: self.assert_eq(repr(kidx.asof("1999-01-02")), repr(pidx.asof("1999-01-02")))
|
||||
pass
|
||||
self.assert_eq(repr(kidx.asof("1999-01-02")), repr(pidx.asof("1999-01-02")))
|
||||
|
||||
# Decreasing values
|
||||
pidx = pd.Index(["2014-01-03", "2014-01-02", "2013-12-31"])
|
||||
|
@ -1427,11 +1417,7 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils):
|
|||
self.assert_eq(kidx.asof("2014-01-01"), pd.Timestamp("2014-01-02 00:00:00"))
|
||||
self.assert_eq(kidx.asof("2014-01-02"), pd.Timestamp("2014-01-02 00:00:00"))
|
||||
self.assert_eq(kidx.asof("1999-01-02"), pd.Timestamp("2013-12-31 00:00:00"))
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("3.0"):
|
||||
self.assert_eq(repr(kidx.asof("2015-01-02")), repr(pd.NaT))
|
||||
else:
|
||||
# FIXME: self.assert_eq(repr(kidx.asof("2015-01-02")), repr(pd.NaT))
|
||||
pass
|
||||
self.assert_eq(repr(kidx.asof("2015-01-02")), repr(pd.NaT))
|
||||
|
||||
# Not increasing, neither decreasing (ValueError)
|
||||
kidx = ps.Index(["2013-12-31", "2015-01-02", "2014-01-03"])
|
||||
|
@ -2249,13 +2235,7 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils):
|
|||
kmidx = ps.from_pandas(pmidx)
|
||||
|
||||
self.assert_eq(kidx.tolist(), pidx.tolist())
|
||||
|
||||
if LooseVersion(pyspark.__version__) < LooseVersion("2.4"):
|
||||
# PySpark < 2.4 does not support struct type with arrow enabled.
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(kmidx.tolist(), pmidx.tolist())
|
||||
else:
|
||||
self.assert_eq(kidx.tolist(), pidx.tolist())
|
||||
self.assert_eq(kmidx.tolist(), pmidx.tolist())
|
||||
|
||||
def test_index_ops(self):
|
||||
pidx = pd.Index([1, 2, 3, 4, 5])
|
||||
|
|
|
@ -25,7 +25,6 @@ from io import StringIO
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
from pandas.tseries.offsets import DateOffset
|
||||
import pyspark
|
||||
from pyspark import StorageLevel
|
||||
from pyspark.ml.linalg import SparseVector
|
||||
from pyspark.sql import functions as F
|
||||
|
@ -566,11 +565,7 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
pdf = pd.DataFrame({"a": pd.Series([], dtype="i1"), "b": pd.Series([], dtype="str")})
|
||||
|
||||
kdf = ps.from_pandas(pdf)
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
self.assert_eq(kdf, pdf)
|
||||
else:
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(kdf, pdf)
|
||||
self.assert_eq(kdf, pdf)
|
||||
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
kdf = ps.from_pandas(pdf)
|
||||
|
@ -602,11 +597,7 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
)
|
||||
|
||||
kdf = ps.from_pandas(pdf)
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
self.assert_eq(kdf, pdf)
|
||||
else:
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(kdf, pdf)
|
||||
self.assert_eq(kdf, pdf)
|
||||
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
kdf = ps.from_pandas(pdf)
|
||||
|
@ -2991,10 +2982,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
self.assert_eq(ktable.index, ptable.index)
|
||||
self.assert_eq(repr(ktable.index), repr(ptable.index))
|
||||
|
||||
@unittest.skipIf(
|
||||
LooseVersion(pyspark.__version__) < LooseVersion("2.4"),
|
||||
"stack won't work properly with PySpark<2.4",
|
||||
)
|
||||
def test_stack(self):
|
||||
pdf_single_level_cols = pd.DataFrame(
|
||||
[[0, 1], [2, 3]], index=["cat", "dog"], columns=["weight", "height"]
|
||||
|
@ -3236,22 +3223,13 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
self.assert_eq(pdf.cumprod().sum(), kdf.cumprod().sum(), almost=True)
|
||||
|
||||
def test_cumprod(self):
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
pdf = pd.DataFrame(
|
||||
[[2.0, 1.0, 1], [5, None, 2], [1.0, -1.0, -3], [2.0, 0, 4], [4.0, 9.0, 5]],
|
||||
columns=list("ABC"),
|
||||
index=np.random.rand(5),
|
||||
)
|
||||
kdf = ps.from_pandas(pdf)
|
||||
self._test_cumprod(pdf, kdf)
|
||||
else:
|
||||
pdf = pd.DataFrame(
|
||||
[[2, 1, 1], [5, 1, 2], [1, -1, -3], [2, 0, 4], [4, 9, 5]],
|
||||
columns=list("ABC"),
|
||||
index=np.random.rand(5),
|
||||
)
|
||||
kdf = ps.from_pandas(pdf)
|
||||
self._test_cumprod(pdf, kdf)
|
||||
pdf = pd.DataFrame(
|
||||
[[2.0, 1.0, 1], [5, None, 2], [1.0, -1.0, -3], [2.0, 0, 4], [4.0, 9.0, 5]],
|
||||
columns=list("ABC"),
|
||||
index=np.random.rand(5),
|
||||
)
|
||||
kdf = ps.from_pandas(pdf)
|
||||
self._test_cumprod(pdf, kdf)
|
||||
|
||||
def test_cumprod_multiindex_columns(self):
|
||||
arrays = [np.array(["A", "A", "B", "B"]), np.array(["one", "two", "one", "two"])]
|
||||
|
@ -4726,13 +4704,8 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
sparse_vector = SparseVector(len(sparse_values), sparse_values)
|
||||
pdf = pd.DataFrame({"a": [sparse_vector], "b": [10]})
|
||||
|
||||
if LooseVersion(pyspark.__version__) < LooseVersion("2.4"):
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
kdf = ps.from_pandas(pdf)
|
||||
self.assert_eq(kdf, pdf)
|
||||
else:
|
||||
kdf = ps.from_pandas(pdf)
|
||||
self.assert_eq(kdf, pdf)
|
||||
kdf = ps.from_pandas(pdf)
|
||||
self.assert_eq(kdf, pdf)
|
||||
|
||||
def test_eval(self):
|
||||
pdf = pd.DataFrame({"A": range(1, 6), "B": range(10, 0, -2)})
|
||||
|
@ -5162,10 +5135,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
self.assert_eq(p_name, k_name)
|
||||
self.assert_eq(p_items, k_items)
|
||||
|
||||
@unittest.skipIf(
|
||||
LooseVersion(pyspark.__version__) < LooseVersion("3.0"),
|
||||
"tail won't work properly with PySpark<3.0",
|
||||
)
|
||||
def test_tail(self):
|
||||
pdf = pd.DataFrame({"x": range(1000)})
|
||||
kdf = ps.from_pandas(pdf)
|
||||
|
@ -5185,10 +5154,6 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
with self.assertRaisesRegex(TypeError, "bad operand type for unary -: 'str'"):
|
||||
kdf.tail("10")
|
||||
|
||||
@unittest.skipIf(
|
||||
LooseVersion(pyspark.__version__) < LooseVersion("3.0"),
|
||||
"last_valid_index won't work properly with PySpark<3.0",
|
||||
)
|
||||
def test_last_valid_index(self):
|
||||
pdf = pd.DataFrame(
|
||||
{"a": [1, 2, 3, None], "b": [1.0, 2.0, 3.0, None], "c": [100, 200, 400, None]},
|
||||
|
|
|
@ -23,7 +23,6 @@ import os
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyspark
|
||||
|
||||
from pyspark import pandas as ps
|
||||
from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
|
||||
|
@ -96,10 +95,6 @@ class DataFrameSparkIOTest(PandasOnSparkTestCase, TestUtils):
|
|||
expected_idx.sort_values(by="f").to_spark().toPandas(),
|
||||
)
|
||||
|
||||
@unittest.skipIf(
|
||||
LooseVersion(pyspark.__version__) < LooseVersion("3.0.0"),
|
||||
"The test only works with Spark>=3.0",
|
||||
)
|
||||
def test_parquet_read_with_pandas_metadata(self):
|
||||
with self.temp_dir() as tmp:
|
||||
expected1 = self.test_pdf
|
||||
|
@ -263,35 +258,32 @@ class DataFrameSparkIOTest(PandasOnSparkTestCase, TestUtils):
|
|||
pd.read_excel(open(path1, "rb"), index_col=0, squeeze=True),
|
||||
)
|
||||
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("3.0.0"):
|
||||
self.assert_eq(ps.read_excel(path1), pd.read_excel(path1))
|
||||
self.assert_eq(ps.read_excel(path1, index_col=0), pd.read_excel(path1, index_col=0))
|
||||
self.assert_eq(
|
||||
ps.read_excel(path1, index_col=0, squeeze=True),
|
||||
pd.read_excel(path1, index_col=0, squeeze=True),
|
||||
)
|
||||
self.assert_eq(ps.read_excel(path1), pd.read_excel(path1))
|
||||
self.assert_eq(ps.read_excel(path1, index_col=0), pd.read_excel(path1, index_col=0))
|
||||
self.assert_eq(
|
||||
ps.read_excel(path1, index_col=0, squeeze=True),
|
||||
pd.read_excel(path1, index_col=0, squeeze=True),
|
||||
)
|
||||
|
||||
self.assert_eq(ps.read_excel(tmp), pd.read_excel(path1))
|
||||
self.assert_eq(ps.read_excel(tmp), pd.read_excel(path1))
|
||||
|
||||
path2 = "{}/file2.xlsx".format(tmp)
|
||||
self.test_pdf[["i32"]].to_excel(path2)
|
||||
self.assert_eq(
|
||||
ps.read_excel(tmp, index_col=0).sort_index(),
|
||||
pd.concat(
|
||||
[pd.read_excel(path1, index_col=0), pd.read_excel(path2, index_col=0)]
|
||||
).sort_index(),
|
||||
)
|
||||
self.assert_eq(
|
||||
ps.read_excel(tmp, index_col=0, squeeze=True).sort_index(),
|
||||
pd.concat(
|
||||
[
|
||||
pd.read_excel(path1, index_col=0, squeeze=True),
|
||||
pd.read_excel(path2, index_col=0, squeeze=True),
|
||||
]
|
||||
).sort_index(),
|
||||
)
|
||||
else:
|
||||
self.assertRaises(ValueError, lambda: ps.read_excel(tmp))
|
||||
path2 = "{}/file2.xlsx".format(tmp)
|
||||
self.test_pdf[["i32"]].to_excel(path2)
|
||||
self.assert_eq(
|
||||
ps.read_excel(tmp, index_col=0).sort_index(),
|
||||
pd.concat(
|
||||
[pd.read_excel(path1, index_col=0), pd.read_excel(path2, index_col=0)]
|
||||
).sort_index(),
|
||||
)
|
||||
self.assert_eq(
|
||||
ps.read_excel(tmp, index_col=0, squeeze=True).sort_index(),
|
||||
pd.concat(
|
||||
[
|
||||
pd.read_excel(path1, index_col=0, squeeze=True),
|
||||
pd.read_excel(path2, index_col=0, squeeze=True),
|
||||
]
|
||||
).sort_index(),
|
||||
)
|
||||
|
||||
with self.temp_dir() as tmp:
|
||||
path1 = "{}/file1.xlsx".format(tmp)
|
||||
|
@ -317,69 +309,66 @@ class DataFrameSparkIOTest(PandasOnSparkTestCase, TestUtils):
|
|||
self.assert_eq(kdfs["Sheet_name_1"], pdfs1_squeezed["Sheet_name_1"])
|
||||
self.assert_eq(kdfs["Sheet_name_2"], pdfs1_squeezed["Sheet_name_2"])
|
||||
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("3.0.0"):
|
||||
self.assert_eq(
|
||||
ps.read_excel(tmp, index_col=0, sheet_name="Sheet_name_2"),
|
||||
pdfs1["Sheet_name_2"],
|
||||
)
|
||||
|
||||
for sheet_name in sheet_names:
|
||||
kdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0)
|
||||
self.assert_eq(kdfs["Sheet_name_1"], pdfs1["Sheet_name_1"])
|
||||
self.assert_eq(kdfs["Sheet_name_2"], pdfs1["Sheet_name_2"])
|
||||
|
||||
kdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0, squeeze=True)
|
||||
self.assert_eq(kdfs["Sheet_name_1"], pdfs1_squeezed["Sheet_name_1"])
|
||||
self.assert_eq(kdfs["Sheet_name_2"], pdfs1_squeezed["Sheet_name_2"])
|
||||
|
||||
path2 = "{}/file2.xlsx".format(tmp)
|
||||
with pd.ExcelWriter(path2) as writer:
|
||||
self.test_pdf.to_excel(writer, sheet_name="Sheet_name_1")
|
||||
self.test_pdf[["i32"]].to_excel(writer, sheet_name="Sheet_name_2")
|
||||
|
||||
pdfs2 = pd.read_excel(path2, sheet_name=None, index_col=0)
|
||||
pdfs2_squeezed = pd.read_excel(path2, sheet_name=None, index_col=0, squeeze=True)
|
||||
|
||||
self.assert_eq(
|
||||
ps.read_excel(tmp, sheet_name="Sheet_name_2", index_col=0).sort_index(),
|
||||
pd.concat([pdfs1["Sheet_name_2"], pdfs2["Sheet_name_2"]]).sort_index(),
|
||||
)
|
||||
self.assert_eq(
|
||||
ps.read_excel(
|
||||
tmp, sheet_name="Sheet_name_2", index_col=0, squeeze=True
|
||||
).sort_index(),
|
||||
pd.concat(
|
||||
[pdfs1_squeezed["Sheet_name_2"], pdfs2_squeezed["Sheet_name_2"]]
|
||||
).sort_index(),
|
||||
)
|
||||
|
||||
for sheet_name in sheet_names:
|
||||
kdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0)
|
||||
self.assert_eq(
|
||||
ps.read_excel(tmp, index_col=0, sheet_name="Sheet_name_2"),
|
||||
pdfs1["Sheet_name_2"],
|
||||
kdfs["Sheet_name_1"].sort_index(),
|
||||
pd.concat([pdfs1["Sheet_name_1"], pdfs2["Sheet_name_1"]]).sort_index(),
|
||||
)
|
||||
|
||||
for sheet_name in sheet_names:
|
||||
kdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0)
|
||||
self.assert_eq(kdfs["Sheet_name_1"], pdfs1["Sheet_name_1"])
|
||||
self.assert_eq(kdfs["Sheet_name_2"], pdfs1["Sheet_name_2"])
|
||||
|
||||
kdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0, squeeze=True)
|
||||
self.assert_eq(kdfs["Sheet_name_1"], pdfs1_squeezed["Sheet_name_1"])
|
||||
self.assert_eq(kdfs["Sheet_name_2"], pdfs1_squeezed["Sheet_name_2"])
|
||||
|
||||
path2 = "{}/file2.xlsx".format(tmp)
|
||||
with pd.ExcelWriter(path2) as writer:
|
||||
self.test_pdf.to_excel(writer, sheet_name="Sheet_name_1")
|
||||
self.test_pdf[["i32"]].to_excel(writer, sheet_name="Sheet_name_2")
|
||||
|
||||
pdfs2 = pd.read_excel(path2, sheet_name=None, index_col=0)
|
||||
pdfs2_squeezed = pd.read_excel(path2, sheet_name=None, index_col=0, squeeze=True)
|
||||
|
||||
self.assert_eq(
|
||||
ps.read_excel(tmp, sheet_name="Sheet_name_2", index_col=0).sort_index(),
|
||||
kdfs["Sheet_name_2"].sort_index(),
|
||||
pd.concat([pdfs1["Sheet_name_2"], pdfs2["Sheet_name_2"]]).sort_index(),
|
||||
)
|
||||
|
||||
kdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0, squeeze=True)
|
||||
self.assert_eq(
|
||||
ps.read_excel(
|
||||
tmp, sheet_name="Sheet_name_2", index_col=0, squeeze=True
|
||||
kdfs["Sheet_name_1"].sort_index(),
|
||||
pd.concat(
|
||||
[pdfs1_squeezed["Sheet_name_1"], pdfs2_squeezed["Sheet_name_1"]]
|
||||
).sort_index(),
|
||||
)
|
||||
self.assert_eq(
|
||||
kdfs["Sheet_name_2"].sort_index(),
|
||||
pd.concat(
|
||||
[pdfs1_squeezed["Sheet_name_2"], pdfs2_squeezed["Sheet_name_2"]]
|
||||
).sort_index(),
|
||||
)
|
||||
|
||||
for sheet_name in sheet_names:
|
||||
kdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0)
|
||||
self.assert_eq(
|
||||
kdfs["Sheet_name_1"].sort_index(),
|
||||
pd.concat([pdfs1["Sheet_name_1"], pdfs2["Sheet_name_1"]]).sort_index(),
|
||||
)
|
||||
self.assert_eq(
|
||||
kdfs["Sheet_name_2"].sort_index(),
|
||||
pd.concat([pdfs1["Sheet_name_2"], pdfs2["Sheet_name_2"]]).sort_index(),
|
||||
)
|
||||
|
||||
kdfs = ps.read_excel(tmp, sheet_name=sheet_name, index_col=0, squeeze=True)
|
||||
self.assert_eq(
|
||||
kdfs["Sheet_name_1"].sort_index(),
|
||||
pd.concat(
|
||||
[pdfs1_squeezed["Sheet_name_1"], pdfs2_squeezed["Sheet_name_1"]]
|
||||
).sort_index(),
|
||||
)
|
||||
self.assert_eq(
|
||||
kdfs["Sheet_name_2"].sort_index(),
|
||||
pd.concat(
|
||||
[pdfs1_squeezed["Sheet_name_2"], pdfs2_squeezed["Sheet_name_2"]]
|
||||
).sort_index(),
|
||||
)
|
||||
else:
|
||||
self.assertRaises(ValueError, lambda: ps.read_excel(tmp))
|
||||
|
||||
def test_read_orc(self):
|
||||
with self.temp_dir() as tmp:
|
||||
path = "{}/file1.orc".format(tmp)
|
||||
|
|
|
@ -15,11 +15,9 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
from distutils.version import LooseVersion
|
||||
import os
|
||||
|
||||
import pandas as pd
|
||||
import pyspark
|
||||
|
||||
from pyspark import pandas as ps
|
||||
from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
|
||||
|
@ -43,10 +41,7 @@ class SparkFrameMethodsTest(PandasOnSparkTestCase, SQLTestUtils, TestUtils):
|
|||
kdf1 = ps.from_pandas(pdf1)
|
||||
kdf2 = ps.from_pandas(pdf2)
|
||||
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("3.0"):
|
||||
hints = ["broadcast", "merge", "shuffle_hash", "shuffle_replicate_nl"]
|
||||
else:
|
||||
hints = ["broadcast"]
|
||||
hints = ["broadcast", "merge", "shuffle_hash", "shuffle_replicate_nl"]
|
||||
|
||||
for hint in hints:
|
||||
self.assert_eq(
|
||||
|
|
|
@ -22,8 +22,6 @@ import unittest
|
|||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
import pyspark
|
||||
|
||||
from pyspark import pandas as ps
|
||||
from pyspark.pandas.config import set_option, reset_option
|
||||
from pyspark.pandas.frame import DataFrame
|
||||
|
@ -1550,10 +1548,7 @@ class OpsOnDiffFramesEnabledTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
kser1 = ps.from_pandas(pser1)
|
||||
kser2 = ps.from_pandas(pser2)
|
||||
|
||||
if LooseVersion(pyspark.__version__) < LooseVersion("2.4"):
|
||||
self.assertRaises(ValueError, lambda: kser1.repeat(kser2))
|
||||
else:
|
||||
self.assert_eq(kser1.repeat(kser2).sort_index(), pser1.repeat(pser2).sort_index())
|
||||
self.assert_eq(kser1.repeat(kser2).sort_index(), pser1.repeat(pser2).sort_index())
|
||||
|
||||
def test_series_ops(self):
|
||||
pser1 = pd.Series([1, 2, 3, 4, 5, 6, 7], name="x", index=[11, 12, 13, 14, 15, 16, 17])
|
||||
|
|
|
@ -15,10 +15,7 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
from distutils.version import LooseVersion
|
||||
|
||||
import numpy as np
|
||||
import pyspark
|
||||
|
||||
from pyspark import pandas as ps
|
||||
from pyspark.pandas.config import set_option, reset_option, option_context
|
||||
|
@ -82,26 +79,25 @@ class ReprTest(PandasOnSparkTestCase):
|
|||
kser = ps.range(ReprTest.max_display_count + 1).id.rename()
|
||||
self.assert_eq(repr(kser), repr(kser.to_pandas()))
|
||||
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
kser = ps.MultiIndex.from_tuples(
|
||||
[(100 * i, i) for i in range(ReprTest.max_display_count)]
|
||||
).to_series()
|
||||
self.assertTrue("Showing only the first" not in repr(kser))
|
||||
self.assert_eq(repr(kser), repr(kser.to_pandas()))
|
||||
kser = ps.MultiIndex.from_tuples(
|
||||
[(100 * i, i) for i in range(ReprTest.max_display_count)]
|
||||
).to_series()
|
||||
self.assertTrue("Showing only the first" not in repr(kser))
|
||||
self.assert_eq(repr(kser), repr(kser.to_pandas()))
|
||||
|
||||
kser = ps.MultiIndex.from_tuples(
|
||||
[(100 * i, i) for i in range(ReprTest.max_display_count + 1)]
|
||||
).to_series()
|
||||
self.assertTrue("Showing only the first" in repr(kser))
|
||||
self.assertTrue(
|
||||
repr(kser).startswith(repr(kser.to_pandas().head(ReprTest.max_display_count)))
|
||||
)
|
||||
|
||||
with option_context("display.max_rows", None):
|
||||
kser = ps.MultiIndex.from_tuples(
|
||||
[(100 * i, i) for i in range(ReprTest.max_display_count + 1)]
|
||||
).to_series()
|
||||
self.assertTrue("Showing only the first" in repr(kser))
|
||||
self.assertTrue(
|
||||
repr(kser).startswith(repr(kser.to_pandas().head(ReprTest.max_display_count)))
|
||||
)
|
||||
|
||||
with option_context("display.max_rows", None):
|
||||
kser = ps.MultiIndex.from_tuples(
|
||||
[(100 * i, i) for i in range(ReprTest.max_display_count + 1)]
|
||||
).to_series()
|
||||
self.assert_eq(repr(kser), repr(kser.to_pandas()))
|
||||
self.assert_eq(repr(kser), repr(kser.to_pandas()))
|
||||
|
||||
def test_repr_indexes(self):
|
||||
kidx = ps.range(ReprTest.max_display_count).index
|
||||
|
|
|
@ -21,11 +21,10 @@ from distutils.version import LooseVersion
|
|||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyspark
|
||||
|
||||
from pyspark import pandas as ps
|
||||
from pyspark.pandas.utils import name_like_string
|
||||
from pyspark.testing.pandasutils import PandasOnSparkTestCase, SPARK_CONF_ARROW_ENABLED
|
||||
from pyspark.testing.pandasutils import PandasOnSparkTestCase
|
||||
|
||||
|
||||
class ReshapeTest(PandasOnSparkTestCase):
|
||||
|
@ -111,41 +110,23 @@ class ReshapeTest(PandasOnSparkTestCase):
|
|||
)
|
||||
kdf = ps.from_pandas(pdf)
|
||||
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.d), pd.get_dummies(pdf.d, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.dt), pd.get_dummies(pdf.dt, dtype=np.int8))
|
||||
else:
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.d), pd.get_dummies(pdf.d, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.dt), pd.get_dummies(pdf.dt, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.d), pd.get_dummies(pdf.d, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.dt), pd.get_dummies(pdf.dt, dtype=np.int8))
|
||||
|
||||
def test_get_dummies_boolean(self):
|
||||
pdf = pd.DataFrame({"b": [True, False, True]})
|
||||
kdf = ps.from_pandas(pdf)
|
||||
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.b), pd.get_dummies(pdf.b, dtype=np.int8))
|
||||
else:
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.b), pd.get_dummies(pdf.b, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.b), pd.get_dummies(pdf.b, dtype=np.int8))
|
||||
|
||||
def test_get_dummies_decimal(self):
|
||||
pdf = pd.DataFrame({"d": [Decimal(1.0), Decimal(2.0), Decimal(1)]})
|
||||
kdf = ps.from_pandas(pdf)
|
||||
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.d), pd.get_dummies(pdf.d, dtype=np.int8), almost=True)
|
||||
else:
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(
|
||||
ps.get_dummies(kdf.d), pd.get_dummies(pdf.d, dtype=np.int8), almost=True
|
||||
)
|
||||
self.assert_eq(ps.get_dummies(kdf), pd.get_dummies(pdf, dtype=np.int8))
|
||||
self.assert_eq(ps.get_dummies(kdf.d), pd.get_dummies(pdf.d, dtype=np.int8), almost=True)
|
||||
|
||||
def test_get_dummies_kwargs(self):
|
||||
# pser = pd.Series([1, 1, 1, 2, 2, 1, 3, 4], dtype='category')
|
||||
|
|
|
@ -24,7 +24,6 @@ from datetime import datetime, timedelta
|
|||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyspark
|
||||
from pyspark.ml.linalg import SparseVector
|
||||
from pyspark.sql import functions as F
|
||||
|
||||
|
@ -147,11 +146,7 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
self.assert_eq(ps.from_pandas(pser_a), pser_a)
|
||||
|
||||
kser_b = ps.from_pandas(pser_b)
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
self.assert_eq(kser_b, pser_b)
|
||||
else:
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(kser_b, pser_b)
|
||||
self.assert_eq(kser_b, pser_b)
|
||||
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(ps.from_pandas(pser_a), pser_a)
|
||||
|
@ -164,11 +159,7 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
self.assert_eq(ps.from_pandas(pser_a), pser_a)
|
||||
|
||||
kser_b = ps.from_pandas(pser_b)
|
||||
if LooseVersion(pyspark.__version__) >= LooseVersion("2.4"):
|
||||
self.assert_eq(kser_b, pser_b)
|
||||
else:
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(kser_b, pser_b)
|
||||
self.assert_eq(kser_b, pser_b)
|
||||
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self.assert_eq(ps.from_pandas(pser_a), pser_a)
|
||||
|
@ -629,7 +620,7 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
self.assertEqual(ps.Series(range(100)).nunique(approx=True), 103)
|
||||
self.assertEqual(ps.Series(range(100)).nunique(approx=True, rsd=0.01), 100)
|
||||
|
||||
def _test_value_counts(self):
|
||||
def test_value_counts(self):
|
||||
# this is also containing test for Index & MultiIndex
|
||||
pser = pd.Series(
|
||||
[1, 2, 1, 3, 3, np.nan, 1, 4, 2, np.nan, 3, np.nan, 3, 1, 3],
|
||||
|
@ -857,17 +848,6 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
almost=True,
|
||||
)
|
||||
|
||||
def test_value_counts(self):
|
||||
if LooseVersion(pyspark.__version__) < LooseVersion("2.4"):
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
self._test_value_counts()
|
||||
self.assertRaises(
|
||||
RuntimeError,
|
||||
lambda: ps.MultiIndex.from_tuples([("x", "a"), ("x", "b")]).value_counts(),
|
||||
)
|
||||
else:
|
||||
self._test_value_counts()
|
||||
|
||||
def test_nsmallest(self):
|
||||
sample_lst = [1, 2, 3, 4, np.nan, 6]
|
||||
pser = pd.Series(sample_lst, name="x")
|
||||
|
@ -1892,14 +1872,8 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
sparse_values = {0: 0.1, 1: 1.1}
|
||||
sparse_vector = SparseVector(len(sparse_values), sparse_values)
|
||||
pser = pd.Series([sparse_vector])
|
||||
|
||||
if LooseVersion(pyspark.__version__) < LooseVersion("2.4"):
|
||||
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False}):
|
||||
kser = ps.from_pandas(pser)
|
||||
self.assert_eq(kser, pser)
|
||||
else:
|
||||
kser = ps.from_pandas(pser)
|
||||
self.assert_eq(kser, pser)
|
||||
kser = ps.from_pandas(pser)
|
||||
self.assert_eq(kser, pser)
|
||||
|
||||
def test_repeat(self):
|
||||
pser = pd.Series(["a", "b", "c"], name="0", index=np.random.rand(3))
|
||||
|
@ -1914,10 +1888,7 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
pdf = pd.DataFrame({"a": ["a", "b", "c"], "rep": [10, 20, 30]}, index=np.random.rand(3))
|
||||
kdf = ps.from_pandas(pdf)
|
||||
|
||||
if LooseVersion(pyspark.__version__) < LooseVersion("2.4"):
|
||||
self.assertRaises(ValueError, lambda: kdf.a.repeat(kdf.rep))
|
||||
else:
|
||||
self.assert_eq(kdf.a.repeat(kdf.rep).sort_index(), pdf.a.repeat(pdf.rep).sort_index())
|
||||
self.assert_eq(kdf.a.repeat(kdf.rep).sort_index(), pdf.a.repeat(pdf.rep).sort_index())
|
||||
|
||||
def test_take(self):
|
||||
pser = pd.Series([100, 200, 300, 400, 500], name="Koalas")
|
||||
|
@ -2408,10 +2379,6 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
self.assert_eq((kdf["b"] * 10).dot(kdf), (pdf["b"] * 10).dot(pdf))
|
||||
self.assert_eq((kdf["b"] * 10).dot(kdf + 1), (pdf["b"] * 10).dot(pdf + 1))
|
||||
|
||||
@unittest.skipIf(
|
||||
LooseVersion(pyspark.__version__) < LooseVersion("3.0"),
|
||||
"tail won't work properly with PySpark<3.0",
|
||||
)
|
||||
def test_tail(self):
|
||||
pser = pd.Series(range(1000), name="Koalas")
|
||||
kser = ps.from_pandas(pser)
|
||||
|
@ -2509,10 +2476,6 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
kser = ps.from_pandas(pser)
|
||||
self.assert_eq(pser.hasnans, kser.hasnans)
|
||||
|
||||
@unittest.skipIf(
|
||||
LooseVersion(pyspark.__version__) < LooseVersion("3.0"),
|
||||
"last_valid_index won't work properly with PySpark<3.0",
|
||||
)
|
||||
def test_last_valid_index(self):
|
||||
pser = pd.Series([250, 1.5, 320, 1, 0.3, None, None, None, None])
|
||||
kser = ps.from_pandas(pser)
|
||||
|
|
Loading…
Reference in a new issue