spark-instrumented-optimizer/python/pyspark/pandas/tests/test_spark_functions.py

55 lines
2.1 KiB
Python
Raw Normal View History

[SPARK-35344][PYTHON] Support creating a Column of numpy literals in pandas API on Spark ### What changes were proposed in this pull request? The PR is proposed to support creating a Column of numpy literal value in pandas-on-Spark. It consists of three changes mainly: - Enable the `lit` function defined in `pyspark.pandas.spark.functions` to support numpy literals input. ```py >>> from pyspark.pandas.spark import functions as SF >>> SF.lit(np.int64(1)) Column<'CAST(1 AS BIGINT)'> >>> SF.lit(np.int32(1)) Column<'CAST(1 AS INT)'> >>> SF.lit(np.int8(1)) Column<'CAST(1 AS TINYINT)'> >>> SF.lit(np.byte(1)) Column<'CAST(1 AS TINYINT)'> >>> SF.lit(np.float32(1)) Column<'CAST(1.0 AS FLOAT)'> ``` - Substitute `F.lit` by `SF.lit`, that is, use `lit` function defined in `pyspark.pandas.spark.functions` rather than `lit` function defined in `pyspark.sql.functions` to allow creating columns out of numpy literals. - Enable numpy literals input in `isin` method Non-goal: - Some pandas-on-Spark APIs use PySpark column-related APIs internally, and these column-related APIs don't support numpy literals, thus numpy literals are disallowed as input (e.g. `to_replace` parameter in `replace` API). This PR doesn't aim to adjust all of them. This PR adjusts `isin` only, because the PR is inspired by that (as https://github.com/databricks/koalas/issues/2161). - To complete mappings between all kinds of numpy literals and Spark data types should be a followup task. ### Why are the changes needed? Spark (`lit` function defined in `pyspark.sql.functions`) doesn't support creating a Column out of numpy literal value. So `lit` function defined in `pyspark.pandas.spark.functions` is adjusted in order to support that in pandas-on-Spark. ### Does this PR introduce _any_ user-facing change? Yes. Before: ```py >>> a = ps.DataFrame({'source': [1,2,3,4,5]}) >>> a.source.isin([np.int64(1), np.int64(2)]) Traceback (most recent call last): ... AttributeError: 'numpy.int64' object has no attribute '_get_object_id' ``` After: ```py >>> a = ps.DataFrame({'source': [1,2,3,4,5]}) >>> a.source.isin([np.int64(1), np.int64(2)]) 0 True 1 True 2 False 3 False 4 False Name: source, dtype: bool ``` ### How was this patch tested? Unit tests. Closes #32955 from xinrong-databricks/datatypeops_literal. Authored-by: Xinrong Meng <xinrong.meng@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2021-06-28 22:03:42 -04:00
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import numpy as np
from pyspark.pandas.spark import functions as SF
from pyspark.pandas.utils import spark_column_equals
from pyspark.sql import functions as F
from pyspark.sql.types import (
ByteType,
FloatType,
IntegerType,
LongType,
)
from pyspark.testing.pandasutils import PandasOnSparkTestCase
class SparkFunctionsTests(PandasOnSparkTestCase):
def test_lit(self):
self.assertTrue(spark_column_equals(SF.lit(np.int64(1)), F.lit(1).astype(LongType())))
self.assertTrue(spark_column_equals(SF.lit(np.int32(1)), F.lit(1).astype(IntegerType())))
self.assertTrue(spark_column_equals(SF.lit(np.int8(1)), F.lit(1).astype(ByteType())))
self.assertTrue(spark_column_equals(SF.lit(np.byte(1)), F.lit(1).astype(ByteType())))
self.assertTrue(
spark_column_equals(SF.lit(np.float32(1)), F.lit(float(1)).astype(FloatType()))
)
self.assertTrue(spark_column_equals(SF.lit(1), F.lit(1)))
if __name__ == "__main__":
import unittest
from pyspark.pandas.tests.test_spark_functions import * # noqa: F401
try:
import xmlrunner # type: ignore[import]
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)