spark-instrumented-optimizer/python/pyspark/sql/tests/test_dataframe.py

916 lines
39 KiB
Python
Raw Normal View History

[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import pydoc
import shutil
import tempfile
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
import time
import unittest
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException, IllegalArgumentException
from pyspark.testing.sqlutils import ReusedSQLTestCase, SQLTestUtils, have_pyarrow, have_pandas, \
pandas_requirement_message, pyarrow_requirement_message
2018-11-14 23:30:52 -05:00
from pyspark.testing.utils import QuietTest
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
class DataFrameTests(ReusedSQLTestCase):
def test_range(self):
self.assertEqual(self.spark.range(1, 1).count(), 0)
self.assertEqual(self.spark.range(1, 0, -1).count(), 1)
self.assertEqual(self.spark.range(0, 1 << 40, 1 << 39).count(), 2)
self.assertEqual(self.spark.range(-2).count(), 0)
self.assertEqual(self.spark.range(3).count(), 3)
def test_duplicated_column_names(self):
df = self.spark.createDataFrame([(1, 2)], ["c", "c"])
row = df.select('*').first()
self.assertEqual(1, row[0])
self.assertEqual(2, row[1])
self.assertEqual("Row(c=1, c=2)", str(row))
# Cannot access columns
self.assertRaises(AnalysisException, lambda: df.select(df[0]).first())
self.assertRaises(AnalysisException, lambda: df.select(df.c).first())
self.assertRaises(AnalysisException, lambda: df.select(df["c"]).first())
def test_freqItems(self):
vals = [Row(a=1, b=-2.0) if i % 2 == 0 else Row(a=i, b=i * 1.0) for i in range(100)]
df = self.sc.parallelize(vals).toDF()
items = df.stat.freqItems(("a", "b"), 0.4).collect()[0]
self.assertTrue(1 in items[0])
self.assertTrue(-2.0 in items[1])
def test_help_command(self):
# Regression test for SPARK-5464
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
df = self.spark.read.json(rdd)
# render_doc() reproduces the help() exception without printing output
pydoc.render_doc(df)
pydoc.render_doc(df.foo)
pydoc.render_doc(df.take(1))
def test_dropna(self):
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("height", DoubleType(), True)])
# shouldn't drop a non-null row
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', 50, 80.1)], schema).dropna().count(),
1)
# dropping rows with a single null value
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', None, 80.1)], schema).dropna().count(),
0)
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', None, 80.1)], schema).dropna(how='any').count(),
0)
# if how = 'all', only drop rows if all values are null
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', None, 80.1)], schema).dropna(how='all').count(),
1)
self.assertEqual(self.spark.createDataFrame(
[(None, None, None)], schema).dropna(how='all').count(),
0)
# how and subset
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', 50, None)], schema).dropna(how='any', subset=['name', 'age']).count(),
1)
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', None, None)], schema).dropna(how='any', subset=['name', 'age']).count(),
0)
# threshold
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', None, 80.1)], schema).dropna(thresh=2).count(),
1)
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', None, None)], schema).dropna(thresh=2).count(),
0)
# threshold and subset
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', 50, None)], schema).dropna(thresh=2, subset=['name', 'age']).count(),
1)
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', None, 180.9)], schema).dropna(thresh=2, subset=['name', 'age']).count(),
0)
# thresh should take precedence over how
self.assertEqual(self.spark.createDataFrame(
[(u'Alice', 50, None)], schema).dropna(
how='any', thresh=2, subset=['name', 'age']).count(),
1)
def test_fillna(self):
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("height", DoubleType(), True),
StructField("spy", BooleanType(), True)])
# fillna shouldn't change non-null values
row = self.spark.createDataFrame([(u'Alice', 10, 80.1, True)], schema).fillna(50).first()
self.assertEqual(row.age, 10)
# fillna with int
row = self.spark.createDataFrame([(u'Alice', None, None, None)], schema).fillna(50).first()
self.assertEqual(row.age, 50)
self.assertEqual(row.height, 50.0)
# fillna with double
row = self.spark.createDataFrame(
[(u'Alice', None, None, None)], schema).fillna(50.1).first()
self.assertEqual(row.age, 50)
self.assertEqual(row.height, 50.1)
# fillna with bool
row = self.spark.createDataFrame(
[(u'Alice', None, None, None)], schema).fillna(True).first()
self.assertEqual(row.age, None)
self.assertEqual(row.spy, True)
# fillna with string
row = self.spark.createDataFrame([(None, None, None, None)], schema).fillna("hello").first()
self.assertEqual(row.name, u"hello")
self.assertEqual(row.age, None)
# fillna with subset specified for numeric cols
row = self.spark.createDataFrame(
[(None, None, None, None)], schema).fillna(50, subset=['name', 'age']).first()
self.assertEqual(row.name, None)
self.assertEqual(row.age, 50)
self.assertEqual(row.height, None)
self.assertEqual(row.spy, None)
# fillna with subset specified for string cols
row = self.spark.createDataFrame(
[(None, None, None, None)], schema).fillna("haha", subset=['name', 'age']).first()
self.assertEqual(row.name, "haha")
self.assertEqual(row.age, None)
self.assertEqual(row.height, None)
self.assertEqual(row.spy, None)
# fillna with subset specified for bool cols
row = self.spark.createDataFrame(
[(None, None, None, None)], schema).fillna(True, subset=['name', 'spy']).first()
self.assertEqual(row.name, None)
self.assertEqual(row.age, None)
self.assertEqual(row.height, None)
self.assertEqual(row.spy, True)
# fillna with dictionary for boolean types
row = self.spark.createDataFrame([Row(a=None), Row(a=True)]).fillna({"a": True}).first()
self.assertEqual(row.a, True)
def test_repartitionByRange_dataframe(self):
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("height", DoubleType(), True)])
df1 = self.spark.createDataFrame(
[(u'Bob', 27, 66.0), (u'Alice', 10, 10.0), (u'Bob', 10, 66.0)], schema)
df2 = self.spark.createDataFrame(
[(u'Alice', 10, 10.0), (u'Bob', 10, 66.0), (u'Bob', 27, 66.0)], schema)
# test repartitionByRange(numPartitions, *cols)
df3 = df1.repartitionByRange(2, "name", "age")
self.assertEqual(df3.rdd.getNumPartitions(), 2)
self.assertEqual(df3.rdd.first(), df2.rdd.first())
self.assertEqual(df3.rdd.take(3), df2.rdd.take(3))
# test repartitionByRange(numPartitions, *cols)
df4 = df1.repartitionByRange(3, "name", "age")
self.assertEqual(df4.rdd.getNumPartitions(), 3)
self.assertEqual(df4.rdd.first(), df2.rdd.first())
self.assertEqual(df4.rdd.take(3), df2.rdd.take(3))
# test repartitionByRange(*cols)
df5 = df1.repartitionByRange("name", "age")
self.assertEqual(df5.rdd.first(), df2.rdd.first())
self.assertEqual(df5.rdd.take(3), df2.rdd.take(3))
def test_replace(self):
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("height", DoubleType(), True)])
# replace with int
row = self.spark.createDataFrame([(u'Alice', 10, 10.0)], schema).replace(10, 20).first()
self.assertEqual(row.age, 20)
self.assertEqual(row.height, 20.0)
# replace with double
row = self.spark.createDataFrame(
[(u'Alice', 80, 80.0)], schema).replace(80.0, 82.1).first()
self.assertEqual(row.age, 82)
self.assertEqual(row.height, 82.1)
# replace with string
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace(u'Alice', u'Ann').first()
self.assertEqual(row.name, u"Ann")
self.assertEqual(row.age, 10)
# replace with subset specified by a string of a column name w/ actual change
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace(10, 20, subset='age').first()
self.assertEqual(row.age, 20)
# replace with subset specified by a string of a column name w/o actual change
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace(10, 20, subset='height').first()
self.assertEqual(row.age, 10)
# replace with subset specified with one column replaced, another column not in subset
# stays unchanged.
row = self.spark.createDataFrame(
[(u'Alice', 10, 10.0)], schema).replace(10, 20, subset=['name', 'age']).first()
self.assertEqual(row.name, u'Alice')
self.assertEqual(row.age, 20)
self.assertEqual(row.height, 10.0)
# replace with subset specified but no column will be replaced
row = self.spark.createDataFrame(
[(u'Alice', 10, None)], schema).replace(10, 20, subset=['name', 'height']).first()
self.assertEqual(row.name, u'Alice')
self.assertEqual(row.age, 10)
self.assertEqual(row.height, None)
# replace with lists
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace([u'Alice'], [u'Ann']).first()
self.assertTupleEqual(row, (u'Ann', 10, 80.1))
# replace with dict
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace({10: 11}).first()
self.assertTupleEqual(row, (u'Alice', 11, 80.1))
# test backward compatibility with dummy value
dummy_value = 1
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace({'Alice': 'Bob'}, dummy_value).first()
self.assertTupleEqual(row, (u'Bob', 10, 80.1))
# test dict with mixed numerics
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace({10: -10, 80.1: 90.5}).first()
self.assertTupleEqual(row, (u'Alice', -10, 90.5))
# replace with tuples
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace((u'Alice', ), (u'Bob', )).first()
self.assertTupleEqual(row, (u'Bob', 10, 80.1))
# replace multiple columns
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace((10, 80.0), (20, 90)).first()
self.assertTupleEqual(row, (u'Alice', 20, 90.0))
# test for mixed numerics
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace((10, 80), (20, 90.5)).first()
self.assertTupleEqual(row, (u'Alice', 20, 90.5))
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace({10: 20, 80: 90.5}).first()
self.assertTupleEqual(row, (u'Alice', 20, 90.5))
# replace with boolean
row = (self
.spark.createDataFrame([(u'Alice', 10, 80.0)], schema)
.selectExpr("name = 'Bob'", 'age <= 15')
.replace(False, True).first())
self.assertTupleEqual(row, (True, True))
# replace string with None and then drop None rows
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace(u'Alice', None).dropna()
self.assertEqual(row.count(), 0)
# replace with number and None
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace([10, 80], [20, None]).first()
self.assertTupleEqual(row, (u'Alice', 20, None))
# should fail if subset is not list, tuple or None
with self.assertRaises(ValueError):
self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace({10: 11}, subset=1).first()
# should fail if to_replace and value have different length
with self.assertRaises(ValueError):
self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace(["Alice", "Bob"], ["Eve"]).first()
# should fail if when received unexpected type
with self.assertRaises(ValueError):
from datetime import datetime
self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace(datetime.now(), datetime.now()).first()
# should fail if provided mixed type replacements
with self.assertRaises(ValueError):
self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace(["Alice", 10], ["Eve", 20]).first()
with self.assertRaises(ValueError):
self.spark.createDataFrame(
[(u'Alice', 10, 80.1)], schema).replace({u"Alice": u"Bob", 10: 20}).first()
with self.assertRaisesRegexp(
TypeError,
'value argument is required when to_replace is not a dictionary.'):
self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace(["Alice", "Bob"]).first()
def test_with_column_with_existing_name(self):
keys = self.df.withColumn("key", self.df.key).select("key").collect()
self.assertEqual([r.key for r in keys], list(range(100)))
# regression test for SPARK-10417
def test_column_iterator(self):
def foo():
for x in self.df.key:
break
self.assertRaises(TypeError, foo)
def test_generic_hints(self):
from pyspark.sql import DataFrame
df1 = self.spark.range(10e10).toDF("id")
df2 = self.spark.range(10e10).toDF("id")
self.assertIsInstance(df1.hint("broadcast"), DataFrame)
self.assertIsInstance(df1.hint("broadcast", []), DataFrame)
# Dummy rules
self.assertIsInstance(df1.hint("broadcast", "foo", "bar"), DataFrame)
self.assertIsInstance(df1.hint("broadcast", ["foo", "bar"]), DataFrame)
plan = df1.join(df2.hint("broadcast"), "id")._jdf.queryExecution().executedPlan()
self.assertEqual(1, plan.toString().count("BroadcastHashJoin"))
# add tests for SPARK-23647 (test more types for hint)
def test_extended_hint_types(self):
from pyspark.sql import DataFrame
df = self.spark.range(10e10).toDF("id")
such_a_nice_list = ["itworks1", "itworks2", "itworks3"]
hinted_df = df.hint("my awesome hint", 1.2345, "what", such_a_nice_list)
logical_plan = hinted_df._jdf.queryExecution().logical()
self.assertEqual(1, logical_plan.toString().count("1.2345"))
self.assertEqual(1, logical_plan.toString().count("what"))
self.assertEqual(3, logical_plan.toString().count("itworks"))
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
def test_sample(self):
self.assertRaisesRegexp(
TypeError,
"should be a bool, float and number",
lambda: self.spark.range(1).sample())
self.assertRaises(
TypeError,
lambda: self.spark.range(1).sample("a"))
self.assertRaises(
TypeError,
lambda: self.spark.range(1).sample(seed="abc"))
self.assertRaises(
IllegalArgumentException,
lambda: self.spark.range(1).sample(-1.0))
def test_toDF_with_schema_string(self):
data = [Row(key=i, value=str(i)) for i in range(100)]
rdd = self.sc.parallelize(data, 5)
df = rdd.toDF("key: int, value: string")
self.assertEqual(df.schema.simpleString(), "struct<key:int,value:string>")
self.assertEqual(df.collect(), data)
# different but compatible field types can be used.
df = rdd.toDF("key: string, value: string")
self.assertEqual(df.schema.simpleString(), "struct<key:string,value:string>")
self.assertEqual(df.collect(), [Row(key=str(i), value=str(i)) for i in range(100)])
# field names can differ.
df = rdd.toDF(" a: int, b: string ")
self.assertEqual(df.schema.simpleString(), "struct<a:int,b:string>")
self.assertEqual(df.collect(), data)
# number of fields must match.
self.assertRaisesRegexp(Exception, "Length of object",
lambda: rdd.toDF("key: int").collect())
# field types mismatch will cause exception at runtime.
self.assertRaisesRegexp(Exception, "FloatType can not accept",
lambda: rdd.toDF("key: float, value: string").collect())
# flat schema values will be wrapped into row.
df = rdd.map(lambda row: row.key).toDF("int")
self.assertEqual(df.schema.simpleString(), "struct<value:int>")
self.assertEqual(df.collect(), [Row(key=i) for i in range(100)])
# users can use DataType directly instead of data type string.
df = rdd.map(lambda row: row.key).toDF(IntegerType())
self.assertEqual(df.schema.simpleString(), "struct<value:int>")
self.assertEqual(df.collect(), [Row(key=i) for i in range(100)])
def test_join_without_on(self):
df1 = self.spark.range(1).toDF("a")
df2 = self.spark.range(1).toDF("b")
with self.sql_conf({"spark.sql.crossJoin.enabled": False}):
self.assertRaises(AnalysisException, lambda: df1.join(df2, how="inner").collect())
with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
actual = df1.join(df2, how="inner").collect()
expected = [Row(a=0, b=0)]
self.assertEqual(actual, expected)
# Regression test for invalid join methods when on is None, Spark-14761
def test_invalid_join_method(self):
df1 = self.spark.createDataFrame([("Alice", 5), ("Bob", 8)], ["name", "age"])
df2 = self.spark.createDataFrame([("Alice", 80), ("Bob", 90)], ["name", "height"])
self.assertRaises(IllegalArgumentException, lambda: df1.join(df2, how="invalid-join-type"))
# Cartesian products require cross join syntax
def test_require_cross(self):
df1 = self.spark.createDataFrame([(1, "1")], ("key", "value"))
df2 = self.spark.createDataFrame([(1, "1")], ("key", "value"))
[SPARK-28621][SQL] Make spark.sql.crossJoin.enabled default value true ### What changes were proposed in this pull request? Make `spark.sql.crossJoin.enabled` default value true ### Why are the changes needed? For implicit cross join, we can set up a watchdog to cancel it if running for a long time. When "spark.sql.crossJoin.enabled" is false, because `CheckCartesianProducts` is implemented in logical plan stage, it may generate some mismatching error which may confuse end user: * it's done in logical phase, so we may fail queries that can be executed via broadcast join, which is very fast. * if we move the check to the physical phase, then a query may success at the beginning, and begin to fail when the table size gets larger (other people insert data to the table). This can be quite confusing. * the CROSS JOIN syntax doesn't work well if join reorder happens. * some non-equi-join will generate plan using cartesian product, but `CheckCartesianProducts` do not detect it and raise error. So that in order to address this in simpler way, we can turn off showing this cross-join error by default. For reference, I list some cases raising mismatching error here: Providing: ``` spark.range(2).createOrReplaceTempView("sm1") // can be broadcast spark.range(50000000).createOrReplaceTempView("bg1") // cannot be broadcast spark.range(60000000).createOrReplaceTempView("bg2") // cannot be broadcast ``` 1) Some join could be convert to broadcast nested loop join, but CheckCartesianProducts raise error. e.g. ``` select sm1.id, bg1.id from bg1 join sm1 where sm1.id < bg1.id ``` 2) Some join will run by CartesianJoin but CheckCartesianProducts DO NOT raise error. e.g. ``` select bg1.id, bg2.id from bg1 join bg2 where bg1.id < bg2.id ``` ### Does this PR introduce any user-facing change? ### How was this patch tested? Closes #25520 from WeichenXu123/SPARK-28621. Authored-by: WeichenXu <weichen.xu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-27 09:53:37 -04:00
with self.sql_conf({"spark.sql.crossJoin.enabled": False}):
# joins without conditions require cross join syntax
self.assertRaises(AnalysisException, lambda: df1.join(df2).collect())
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
[SPARK-28621][SQL] Make spark.sql.crossJoin.enabled default value true ### What changes were proposed in this pull request? Make `spark.sql.crossJoin.enabled` default value true ### Why are the changes needed? For implicit cross join, we can set up a watchdog to cancel it if running for a long time. When "spark.sql.crossJoin.enabled" is false, because `CheckCartesianProducts` is implemented in logical plan stage, it may generate some mismatching error which may confuse end user: * it's done in logical phase, so we may fail queries that can be executed via broadcast join, which is very fast. * if we move the check to the physical phase, then a query may success at the beginning, and begin to fail when the table size gets larger (other people insert data to the table). This can be quite confusing. * the CROSS JOIN syntax doesn't work well if join reorder happens. * some non-equi-join will generate plan using cartesian product, but `CheckCartesianProducts` do not detect it and raise error. So that in order to address this in simpler way, we can turn off showing this cross-join error by default. For reference, I list some cases raising mismatching error here: Providing: ``` spark.range(2).createOrReplaceTempView("sm1") // can be broadcast spark.range(50000000).createOrReplaceTempView("bg1") // cannot be broadcast spark.range(60000000).createOrReplaceTempView("bg2") // cannot be broadcast ``` 1) Some join could be convert to broadcast nested loop join, but CheckCartesianProducts raise error. e.g. ``` select sm1.id, bg1.id from bg1 join sm1 where sm1.id < bg1.id ``` 2) Some join will run by CartesianJoin but CheckCartesianProducts DO NOT raise error. e.g. ``` select bg1.id, bg2.id from bg1 join bg2 where bg1.id < bg2.id ``` ### Does this PR introduce any user-facing change? ### How was this patch tested? Closes #25520 from WeichenXu123/SPARK-28621. Authored-by: WeichenXu <weichen.xu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-08-27 09:53:37 -04:00
# works with crossJoin
self.assertEqual(1, df1.crossJoin(df2).count())
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
def test_cache(self):
spark = self.spark
with self.tempView("tab1", "tab2"):
spark.createDataFrame([(2, 2), (3, 3)]).createOrReplaceTempView("tab1")
spark.createDataFrame([(2, 2), (3, 3)]).createOrReplaceTempView("tab2")
self.assertFalse(spark.catalog.isCached("tab1"))
self.assertFalse(spark.catalog.isCached("tab2"))
spark.catalog.cacheTable("tab1")
self.assertTrue(spark.catalog.isCached("tab1"))
self.assertFalse(spark.catalog.isCached("tab2"))
spark.catalog.cacheTable("tab2")
spark.catalog.uncacheTable("tab1")
self.assertFalse(spark.catalog.isCached("tab1"))
self.assertTrue(spark.catalog.isCached("tab2"))
spark.catalog.clearCache()
self.assertFalse(spark.catalog.isCached("tab1"))
self.assertFalse(spark.catalog.isCached("tab2"))
self.assertRaisesRegexp(
AnalysisException,
"does_not_exist",
lambda: spark.catalog.isCached("does_not_exist"))
self.assertRaisesRegexp(
AnalysisException,
"does_not_exist",
lambda: spark.catalog.cacheTable("does_not_exist"))
self.assertRaisesRegexp(
AnalysisException,
"does_not_exist",
lambda: spark.catalog.uncacheTable("does_not_exist"))
def _to_pandas(self):
from datetime import datetime, date
schema = StructType().add("a", IntegerType()).add("b", StringType())\
.add("c", BooleanType()).add("d", FloatType())\
.add("dt", DateType()).add("ts", TimestampType())
data = [
(1, "foo", True, 3.0, date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)),
(2, "foo", True, 5.0, None, None),
(3, "bar", False, -1.0, date(2012, 3, 3), datetime(2012, 3, 3, 3, 3, 3)),
(4, "bar", False, 6.0, date(2100, 4, 4), datetime(2100, 4, 4, 4, 4, 4)),
]
df = self.spark.createDataFrame(data, schema)
return df.toPandas()
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas(self):
import numpy as np
pdf = self._to_pandas()
types = pdf.dtypes
self.assertEquals(types[0], np.int32)
self.assertEquals(types[1], np.object)
self.assertEquals(types[2], np.bool)
self.assertEquals(types[3], np.float32)
self.assertEquals(types[4], np.object) # datetime.date
self.assertEquals(types[5], 'datetime64[ns]')
[SPARK-31441] Support duplicated column names for toPandas with arrow execution ### What changes were proposed in this pull request? This PR is adding support duplicated column names for `toPandas` with Arrow execution. ### Why are the changes needed? When we execute `toPandas()` with Arrow execution, it fails if the column names have duplicates. ```py >>> spark.sql("select 1 v, 1 v").toPandas() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/path/to/lib/python3.7/site-packages/pyspark/sql/dataframe.py", line 2132, in toPandas pdf = table.to_pandas() File "pyarrow/array.pxi", line 441, in pyarrow.lib._PandasConvertible.to_pandas File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 653, in table_to_blockmanager columns = _deserialize_column_index(table, all_columns, column_indexes) File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 704, in _deserialize_column_index columns = _flatten_single_level_multiindex(columns) File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 937, in _flatten_single_level_multiindex raise ValueError('Found non-unique column index') ValueError: Found non-unique column index ``` ### Does this PR introduce any user-facing change? Yes, previously we will face an error above, but after this PR, we will see the result: ```py >>> spark.sql("select 1 v, 1 v").toPandas() v v 0 1 1 ``` ### How was this patch tested? Added and modified related tests. Closes #28210 from ueshin/issues/SPARK-31441/to_pandas. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-14 01:08:56 -04:00
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_with_duplicated_column_names(self):
import numpy as np
sql = "select 1 v, 1 v"
for arrowEnabled in [False, True]:
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
df = self.spark.sql(sql)
pdf = df.toPandas()
types = pdf.dtypes
self.assertEquals(types.iloc[0], np.int32)
self.assertEquals(types.iloc[1], np.int32)
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_on_cross_join(self):
import numpy as np
sql = """
select t1.*, t2.* from (
select explode(sequence(1, 3)) v
) t1 left join (
select explode(sequence(1, 3)) v
) t2
"""
[SPARK-31441] Support duplicated column names for toPandas with arrow execution ### What changes were proposed in this pull request? This PR is adding support duplicated column names for `toPandas` with Arrow execution. ### Why are the changes needed? When we execute `toPandas()` with Arrow execution, it fails if the column names have duplicates. ```py >>> spark.sql("select 1 v, 1 v").toPandas() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/path/to/lib/python3.7/site-packages/pyspark/sql/dataframe.py", line 2132, in toPandas pdf = table.to_pandas() File "pyarrow/array.pxi", line 441, in pyarrow.lib._PandasConvertible.to_pandas File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 653, in table_to_blockmanager columns = _deserialize_column_index(table, all_columns, column_indexes) File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 704, in _deserialize_column_index columns = _flatten_single_level_multiindex(columns) File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 937, in _flatten_single_level_multiindex raise ValueError('Found non-unique column index') ValueError: Found non-unique column index ``` ### Does this PR introduce any user-facing change? Yes, previously we will face an error above, but after this PR, we will see the result: ```py >>> spark.sql("select 1 v, 1 v").toPandas() v v 0 1 1 ``` ### How was this patch tested? Added and modified related tests. Closes #28210 from ueshin/issues/SPARK-31441/to_pandas. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-04-14 01:08:56 -04:00
for arrowEnabled in [False, True]:
with self.sql_conf({"spark.sql.crossJoin.enabled": True,
"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
df = self.spark.sql(sql)
pdf = df.toPandas()
types = pdf.dtypes
self.assertEquals(types.iloc[0], np.int32)
self.assertEquals(types.iloc[1], np.int32)
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
@unittest.skipIf(have_pandas, "Required Pandas was found.")
def test_to_pandas_required_pandas_not_found(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(ImportError, 'Pandas >= .* must be installed'):
self._to_pandas()
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_avoid_astype(self):
import numpy as np
schema = StructType().add("a", IntegerType()).add("b", StringType())\
.add("c", IntegerType())
data = [(1, "foo", 16777220), (None, "bar", None)]
df = self.spark.createDataFrame(data, schema)
types = df.toPandas().dtypes
self.assertEquals(types[0], np.float64) # doesn't convert to np.int32 due to NaN value.
self.assertEquals(types[1], np.object)
self.assertEquals(types[2], np.float64)
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_from_empty_dataframe(self):
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
# SPARK-29188 test that toPandas() on an empty dataframe has the correct dtypes
import numpy as np
sql = """
SELECT CAST(1 AS TINYINT) AS tinyint,
CAST(1 AS SMALLINT) AS smallint,
CAST(1 AS INT) AS int,
CAST(1 AS BIGINT) AS bigint,
CAST(0 AS FLOAT) AS float,
CAST(0 AS DOUBLE) AS double,
CAST(1 AS BOOLEAN) AS boolean,
CAST('foo' AS STRING) AS string,
CAST('2019-01-01' AS TIMESTAMP) AS timestamp
"""
dtypes_when_nonempty_df = self.spark.sql(sql).toPandas().dtypes
dtypes_when_empty_df = self.spark.sql(sql).filter("False").toPandas().dtypes
self.assertTrue(np.all(dtypes_when_empty_df == dtypes_when_nonempty_df))
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_from_null_dataframe(self):
[SPARK-29188][PYTHON][FOLLOW-UP] Explicitly disable Arrow execution for all test of toPandas empty types ### What changes were proposed in this pull request? Another followup of https://github.com/apache/spark/commit/4398dfa709598226517474afbf47cd9e3e384826 I missed two more tests added: ``` ====================================================================== ERROR [0.133s]: test_to_pandas_from_mixed_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 617, in test_to_pandas_from_mixed_dataframe self.assertTrue(np.all(pdf_with_only_nulls.dtypes == pdf_with_some_nulls.dtypes)) AssertionError: False is not true ====================================================================== ERROR [0.061s]: test_to_pandas_from_null_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 588, in test_to_pandas_from_null_dataframe self.assertEqual(types[0], np.float64) AssertionError: dtype('O') != <class 'numpy.float64'> ---------------------------------------------------------------------- ``` ### Why are the changes needed? To make the test independent of default values of configuration. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually tested and Jenkins should test. Closes #27250 from HyukjinKwon/SPARK-29188-followup2. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-17 01:00:18 -05:00
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
# SPARK-29188 test that toPandas() on a dataframe with only nulls has correct dtypes
import numpy as np
sql = """
SELECT CAST(NULL AS TINYINT) AS tinyint,
CAST(NULL AS SMALLINT) AS smallint,
CAST(NULL AS INT) AS int,
CAST(NULL AS BIGINT) AS bigint,
CAST(NULL AS FLOAT) AS float,
CAST(NULL AS DOUBLE) AS double,
CAST(NULL AS BOOLEAN) AS boolean,
CAST(NULL AS STRING) AS string,
CAST(NULL AS TIMESTAMP) AS timestamp
"""
pdf = self.spark.sql(sql).toPandas()
types = pdf.dtypes
self.assertEqual(types[0], np.float64)
self.assertEqual(types[1], np.float64)
self.assertEqual(types[2], np.float64)
self.assertEqual(types[3], np.float64)
self.assertEqual(types[4], np.float32)
self.assertEqual(types[5], np.float64)
self.assertEqual(types[6], np.object)
self.assertEqual(types[7], np.object)
self.assertTrue(np.can_cast(np.datetime64, types[8]))
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_from_mixed_dataframe(self):
[SPARK-29188][PYTHON][FOLLOW-UP] Explicitly disable Arrow execution for all test of toPandas empty types ### What changes were proposed in this pull request? Another followup of https://github.com/apache/spark/commit/4398dfa709598226517474afbf47cd9e3e384826 I missed two more tests added: ``` ====================================================================== ERROR [0.133s]: test_to_pandas_from_mixed_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 617, in test_to_pandas_from_mixed_dataframe self.assertTrue(np.all(pdf_with_only_nulls.dtypes == pdf_with_some_nulls.dtypes)) AssertionError: False is not true ====================================================================== ERROR [0.061s]: test_to_pandas_from_null_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 588, in test_to_pandas_from_null_dataframe self.assertEqual(types[0], np.float64) AssertionError: dtype('O') != <class 'numpy.float64'> ---------------------------------------------------------------------- ``` ### Why are the changes needed? To make the test independent of default values of configuration. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually tested and Jenkins should test. Closes #27250 from HyukjinKwon/SPARK-29188-followup2. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-17 01:00:18 -05:00
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
# SPARK-29188 test that toPandas() on a dataframe with some nulls has correct dtypes
import numpy as np
sql = """
SELECT CAST(col1 AS TINYINT) AS tinyint,
CAST(col2 AS SMALLINT) AS smallint,
CAST(col3 AS INT) AS int,
CAST(col4 AS BIGINT) AS bigint,
CAST(col5 AS FLOAT) AS float,
CAST(col6 AS DOUBLE) AS double,
CAST(col7 AS BOOLEAN) AS boolean,
CAST(col8 AS STRING) AS string,
timestamp_seconds(col9) AS timestamp
[SPARK-29188][PYTHON][FOLLOW-UP] Explicitly disable Arrow execution for all test of toPandas empty types ### What changes were proposed in this pull request? Another followup of https://github.com/apache/spark/commit/4398dfa709598226517474afbf47cd9e3e384826 I missed two more tests added: ``` ====================================================================== ERROR [0.133s]: test_to_pandas_from_mixed_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 617, in test_to_pandas_from_mixed_dataframe self.assertTrue(np.all(pdf_with_only_nulls.dtypes == pdf_with_some_nulls.dtypes)) AssertionError: False is not true ====================================================================== ERROR [0.061s]: test_to_pandas_from_null_dataframe (pyspark.sql.tests.test_dataframe.DataFrameTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/jenkins/python/pyspark/sql/tests/test_dataframe.py", line 588, in test_to_pandas_from_null_dataframe self.assertEqual(types[0], np.float64) AssertionError: dtype('O') != <class 'numpy.float64'> ---------------------------------------------------------------------- ``` ### Why are the changes needed? To make the test independent of default values of configuration. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually tested and Jenkins should test. Closes #27250 from HyukjinKwon/SPARK-29188-followup2. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-17 01:00:18 -05:00
FROM VALUES (1, 1, 1, 1, 1, 1, 1, 1, 1),
(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
"""
pdf_with_some_nulls = self.spark.sql(sql).toPandas()
pdf_with_only_nulls = self.spark.sql(sql).filter('tinyint is null').toPandas()
self.assertTrue(np.all(pdf_with_only_nulls.dtypes == pdf_with_some_nulls.dtypes))
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
def test_create_dataframe_from_array_of_long(self):
import array
data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))]
df = self.spark.createDataFrame(data)
self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807]))
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_create_dataframe_from_pandas_with_timestamp(self):
import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
"d": [pd.Timestamp.now().date()]}, columns=["d", "ts"])
# test types are inferred correctly without specifying schema
df = self.spark.createDataFrame(pdf)
self.assertTrue(isinstance(df.schema['ts'].dataType, TimestampType))
self.assertTrue(isinstance(df.schema['d'].dataType, DateType))
# test with schema will accept pdf as input
df = self.spark.createDataFrame(pdf, schema="d date, ts timestamp")
self.assertTrue(isinstance(df.schema['ts'].dataType, TimestampType))
self.assertTrue(isinstance(df.schema['d'].dataType, DateType))
@unittest.skipIf(have_pandas, "Required Pandas was found.")
def test_create_dataframe_required_pandas_not_found(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
ImportError,
"(Pandas >= .* must be installed|No module named '?pandas'?)"):
import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
"d": [pd.Timestamp.now().date()]})
self.spark.createDataFrame(pdf)
# Regression test for SPARK-23360
@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_create_dataframe_from_pandas_with_dst(self):
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
import pandas as pd
from pandas.util.testing import assert_frame_equal
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
from datetime import datetime
pdf = pd.DataFrame({'time': [datetime(2015, 10, 31, 22, 30)]})
df = self.spark.createDataFrame(pdf)
assert_frame_equal(pdf, df.toPandas())
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
orig_env_tz = os.environ.get('TZ', None)
try:
tz = 'America/Los_Angeles'
os.environ['TZ'] = tz
time.tzset()
with self.sql_conf({'spark.sql.session.timeZone': tz}):
df = self.spark.createDataFrame(pdf)
assert_frame_equal(pdf, df.toPandas())
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
finally:
del os.environ['TZ']
if orig_env_tz is not None:
os.environ['TZ'] = orig_env_tz
time.tzset()
def test_repr_behaviors(self):
import re
pattern = re.compile(r'^ *\|', re.MULTILINE)
df = self.spark.createDataFrame([(1, "1"), (22222, "22222")], ("key", "value"))
# test when eager evaluation is enabled and _repr_html_ will not be called
with self.sql_conf({"spark.sql.repl.eagerEval.enabled": True}):
expected1 = """+-----+-----+
|| key|value|
|+-----+-----+
|| 1| 1|
||22222|22222|
|+-----+-----+
|"""
self.assertEquals(re.sub(pattern, '', expected1), df.__repr__())
with self.sql_conf({"spark.sql.repl.eagerEval.truncate": 3}):
expected2 = """+---+-----+
||key|value|
|+---+-----+
|| 1| 1|
||222| 222|
|+---+-----+
|"""
self.assertEquals(re.sub(pattern, '', expected2), df.__repr__())
with self.sql_conf({"spark.sql.repl.eagerEval.maxNumRows": 1}):
expected3 = """+---+-----+
||key|value|
|+---+-----+
|| 1| 1|
|+---+-----+
|only showing top 1 row
|"""
self.assertEquals(re.sub(pattern, '', expected3), df.__repr__())
# test when eager evaluation is enabled and _repr_html_ will be called
with self.sql_conf({"spark.sql.repl.eagerEval.enabled": True}):
expected1 = """<table border='1'>
|<tr><th>key</th><th>value</th></tr>
|<tr><td>1</td><td>1</td></tr>
|<tr><td>22222</td><td>22222</td></tr>
|</table>
|"""
self.assertEquals(re.sub(pattern, '', expected1), df._repr_html_())
with self.sql_conf({"spark.sql.repl.eagerEval.truncate": 3}):
expected2 = """<table border='1'>
|<tr><th>key</th><th>value</th></tr>
|<tr><td>1</td><td>1</td></tr>
|<tr><td>222</td><td>222</td></tr>
|</table>
|"""
self.assertEquals(re.sub(pattern, '', expected2), df._repr_html_())
with self.sql_conf({"spark.sql.repl.eagerEval.maxNumRows": 1}):
expected3 = """<table border='1'>
|<tr><th>key</th><th>value</th></tr>
|<tr><td>1</td><td>1</td></tr>
|</table>
|only showing top 1 row
|"""
self.assertEquals(re.sub(pattern, '', expected3), df._repr_html_())
# test when eager evaluation is disabled and _repr_html_ will be called
with self.sql_conf({"spark.sql.repl.eagerEval.enabled": False}):
expected = "DataFrame[key: bigint, value: string]"
self.assertEquals(None, df._repr_html_())
self.assertEquals(expected, df.__repr__())
with self.sql_conf({"spark.sql.repl.eagerEval.truncate": 3}):
self.assertEquals(None, df._repr_html_())
self.assertEquals(expected, df.__repr__())
with self.sql_conf({"spark.sql.repl.eagerEval.maxNumRows": 1}):
self.assertEquals(None, df._repr_html_())
self.assertEquals(expected, df.__repr__())
[SPARK-23961][SPARK-27548][PYTHON] Fix error when toLocalIterator goes out of scope and properly raise errors from worker ## What changes were proposed in this pull request? This fixes an error when a PySpark local iterator, for both RDD and DataFrames, goes out of scope and the connection is closed before fully consuming the iterator. The error occurs on the JVM in the serving thread, when Python closes the local socket while the JVM is writing to it. This usually happens when there is enough data to fill the socket read buffer, causing the write call to block. Additionally, this fixes a problem when an error occurs in the Python worker and the collect job is cancelled with an exception. Previously, the Python driver was never notified of the error so the user could get a partial result (iteration until the error) and the application will continue. With this change, an error in the worker is sent to the Python iterator and is then raised. The change here introduces a protocol for PySpark local iterators that work as follows: 1) The local socket connection is made when the iterator is created 2) When iterating, Python first sends a request for partition data as a non-zero integer 3) While the JVM local iterator over partitions has next, it triggers a job to collect the next partition 4) The JVM sends a nonzero response to indicate it has the next partition to send 5) The next partition is sent to Python and read by the PySpark deserializer 6) After sending the entire partition, an `END_OF_DATA_SECTION` is sent to Python which stops the deserializer and allows to make another request 7) When the JVM gets a request from Python but has already consumed it's local iterator, it will send a zero response to Python and both will close the socket cleanly 8) If an error occurs in the worker, a negative response is sent to Python followed by the error message. Python will then raise a RuntimeError with the message, stopping iteration. 9) When the PySpark local iterator is garbage-collected, it will read any remaining data from the current partition (this is data that has already been collected) and send a request of zero to tell the JVM to stop collection jobs and close the connection. Steps 1, 3, 5, 6 are the same as before. Step 8 was completely missing before because errors in the worker were never communicated back to Python. The other steps add synchronization to allow for a clean closing of the socket, with a small trade-off in performance for each partition. This is mainly because the JVM does not start collecting partition data until it receives a request to do so, where before it would eagerly write all data until the socket receive buffer is full. ## How was this patch tested? Added new unit tests for DataFrame and RDD `toLocalIterator` and tested not fully consuming the iterator. Manual tests with Python 2.7 and 3.6. Closes #24070 from BryanCutler/pyspark-toLocalIterator-clean-stop-SPARK-23961. Authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-05-07 17:47:39 -04:00
def test_to_local_iterator(self):
df = self.spark.range(8, numPartitions=4)
expected = df.collect()
it = df.toLocalIterator()
self.assertEqual(expected, list(it))
# Test DataFrame with empty partition
df = self.spark.range(3, numPartitions=4)
it = df.toLocalIterator()
expected = df.collect()
self.assertEqual(expected, list(it))
[SPARK-27659][PYTHON] Allow PySpark to prefetch during toLocalIterator ### What changes were proposed in this pull request? This PR allows Python toLocalIterator to prefetch the next partition while the first partition is being collected. The PR also adds a demo micro bench mark in the examples directory, we may wish to keep this or not. ### Why are the changes needed? In https://issues.apache.org/jira/browse/SPARK-23961 / 5e79ae3b40b76e3473288830ab958fc4834dcb33 we changed PySpark to only pull one partition at a time. This is memory efficient, but if partitions take time to compute this can mean we're spending more time blocking. ### Does this PR introduce any user-facing change? A new param is added to toLocalIterator ### How was this patch tested? New unit test inside of `test_rdd.py` checks the time that the elements are evaluated at. Another test that the results remain the same are added to `test_dataframe.py`. I also ran a micro benchmark in the examples directory `prefetch.py` which shows an improvement of ~40% in this specific use case. > > 19/08/16 17:11:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable > Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). > Running timers: > > [Stage 32:> (0 + 1) / 1] > Results: > > Prefetch time: > > 100.228110831 > > > Regular time: > > 188.341721614 > > > Closes #25515 from holdenk/SPARK-27659-allow-pyspark-tolocalitr-to-prefetch. Authored-by: Holden Karau <hkarau@apple.com> Signed-off-by: Holden Karau <hkarau@apple.com>
2019-09-20 12:59:31 -04:00
def test_to_local_iterator_prefetch(self):
df = self.spark.range(8, numPartitions=4)
expected = df.collect()
it = df.toLocalIterator(prefetchPartitions=True)
self.assertEqual(expected, list(it))
[SPARK-23961][SPARK-27548][PYTHON] Fix error when toLocalIterator goes out of scope and properly raise errors from worker ## What changes were proposed in this pull request? This fixes an error when a PySpark local iterator, for both RDD and DataFrames, goes out of scope and the connection is closed before fully consuming the iterator. The error occurs on the JVM in the serving thread, when Python closes the local socket while the JVM is writing to it. This usually happens when there is enough data to fill the socket read buffer, causing the write call to block. Additionally, this fixes a problem when an error occurs in the Python worker and the collect job is cancelled with an exception. Previously, the Python driver was never notified of the error so the user could get a partial result (iteration until the error) and the application will continue. With this change, an error in the worker is sent to the Python iterator and is then raised. The change here introduces a protocol for PySpark local iterators that work as follows: 1) The local socket connection is made when the iterator is created 2) When iterating, Python first sends a request for partition data as a non-zero integer 3) While the JVM local iterator over partitions has next, it triggers a job to collect the next partition 4) The JVM sends a nonzero response to indicate it has the next partition to send 5) The next partition is sent to Python and read by the PySpark deserializer 6) After sending the entire partition, an `END_OF_DATA_SECTION` is sent to Python which stops the deserializer and allows to make another request 7) When the JVM gets a request from Python but has already consumed it's local iterator, it will send a zero response to Python and both will close the socket cleanly 8) If an error occurs in the worker, a negative response is sent to Python followed by the error message. Python will then raise a RuntimeError with the message, stopping iteration. 9) When the PySpark local iterator is garbage-collected, it will read any remaining data from the current partition (this is data that has already been collected) and send a request of zero to tell the JVM to stop collection jobs and close the connection. Steps 1, 3, 5, 6 are the same as before. Step 8 was completely missing before because errors in the worker were never communicated back to Python. The other steps add synchronization to allow for a clean closing of the socket, with a small trade-off in performance for each partition. This is mainly because the JVM does not start collecting partition data until it receives a request to do so, where before it would eagerly write all data until the socket receive buffer is full. ## How was this patch tested? Added new unit tests for DataFrame and RDD `toLocalIterator` and tested not fully consuming the iterator. Manual tests with Python 2.7 and 3.6. Closes #24070 from BryanCutler/pyspark-toLocalIterator-clean-stop-SPARK-23961. Authored-by: Bryan Cutler <cutlerb@gmail.com> Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-05-07 17:47:39 -04:00
def test_to_local_iterator_not_fully_consumed(self):
# SPARK-23961: toLocalIterator throws exception when not fully consumed
# Create a DataFrame large enough so that write to socket will eventually block
df = self.spark.range(1 << 20, numPartitions=2)
it = df.toLocalIterator()
self.assertEqual(df.take(1)[0], next(it))
with QuietTest(self.sc):
it = None # remove iterator from scope, socket is closed when cleaned up
# Make sure normal df operations still work
result = []
for i, row in enumerate(df.toLocalIterator()):
result.append(row)
if i == 7:
break
self.assertEqual(df.take(8), result)
[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset ### What changes were proposed in this pull request? This PR added two DeveloperApis to the Dataset[T] class. Both methods are just exposing lower-level methods to the Dataset[T] class. ### Why are the changes needed? They are useful for checking whether two dataframes are the same when implementing dataframe caching in python, and also get a unique ID. It's easier to use if we wrap the lower-level APIs. ### Does this PR introduce any user-facing change? ``` scala> val df1 = Seq((1,2),(4,5)).toDF("col1", "col2") df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int] scala> val df2 = Seq((1,2),(4,5)).toDF("col1", "col2") df2: org.apache.spark.sql.DataFrame = [col1: int, col2: int] scala> val df3 = Seq((0,2),(4,5)).toDF("col1", "col2") df3: org.apache.spark.sql.DataFrame = [col1: int, col2: int] scala> val df4 = Seq((0,2),(4,5)).toDF("col0", "col2") df4: org.apache.spark.sql.DataFrame = [col0: int, col2: int] scala> df1.semanticHash res0: Int = 594427822 scala> df2.semanticHash res1: Int = 594427822 scala> df1.sameSemantics(df2) res2: Boolean = true scala> df1.sameSemantics(df3) res3: Boolean = false scala> df3.semanticHash res4: Int = -1592702048 scala> df4.semanticHash res5: Int = -1592702048 scala> df4.sameSemantics(df3) res6: Boolean = true ``` ### How was this patch tested? Unit test in scala and doctest in python. Note: comments are copied from the corresponding lower-level APIs. Note: There are some issues to be fixed that would improve the hash collision rate: https://github.com/apache/spark/pull/27565#discussion_r379881028 Closes #27565 from liangz1/df-same-result. Authored-by: Liang Zhang <liang.zhang@databricks.com> Signed-off-by: WeichenXu <weichen.xu@databricks.com>
2020-02-17 20:22:26 -05:00
def test_same_semantics_error(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(ValueError, "should be of DataFrame.*int"):
self.spark.range(10).sameSemantics(1)
def test_input_files(self):
tpath = tempfile.mkdtemp()
shutil.rmtree(tpath)
try:
self.spark.range(1, 100, 1, 10).write.parquet(tpath)
# read parquet file and get the input files list
input_files_list = self.spark.read.parquet(tpath).inputFiles()
# input files list should contain 10 entries
self.assertEquals(len(input_files_list), 10)
# all file paths in list must contain tpath
for file_path in input_files_list:
self.assertTrue(tpath in file_path)
finally:
shutil.rmtree(tpath)
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
# These tests are separate because it uses 'spark.sql.queryExecutionListeners' which is
# static and immutable. This can't be set or unset, for example, via `spark.conf`.
@classmethod
def setUpClass(cls):
import glob
from pyspark.find_spark_home import _find_spark_home
SPARK_HOME = _find_spark_home()
filename_pattern = (
"sql/core/target/scala-*/test-classes/org/apache/spark/sql/"
"TestQueryExecutionListener.class")
cls.has_listener = bool(glob.glob(os.path.join(SPARK_HOME, filename_pattern)))
if cls.has_listener:
# Note that 'spark.sql.queryExecutionListeners' is a static immutable configuration.
cls.spark = SparkSession.builder \
.master("local[4]") \
.appName(cls.__name__) \
.config(
"spark.sql.queryExecutionListeners",
"org.apache.spark.sql.TestQueryExecutionListener") \
.getOrCreate()
def setUp(self):
if not self.has_listener:
raise self.skipTest(
"'org.apache.spark.sql.TestQueryExecutionListener' is not "
"available. Will skip the related tests.")
@classmethod
def tearDownClass(cls):
if hasattr(cls, "spark"):
cls.spark.stop()
def tearDown(self):
self.spark._jvm.OnSuccessCall.clear()
def test_query_execution_listener_on_collect(self):
self.assertFalse(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should not be called before 'collect'")
self.spark.sql("SELECT * FROM range(1)").collect()
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty(10000)
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
self.assertTrue(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should be called after 'collect'")
@unittest.skipIf(
not have_pandas or not have_pyarrow,
pandas_requirement_message or pyarrow_requirement_message)
def test_query_execution_listener_on_collect_with_arrow(self):
[SPARK-27834][SQL][R][PYTHON] Make separate PySpark/SparkR vectorization configurations ## What changes were proposed in this pull request? `spark.sql.execution.arrow.enabled` was added when we add PySpark arrow optimization. Later, in the current master, SparkR arrow optimization was added and it's controlled by the same configuration `spark.sql.execution.arrow.enabled`. There look two issues about this: 1. `spark.sql.execution.arrow.enabled` in PySpark was added from 2.3.0 whereas SparkR optimization was added 3.0.0. The stability is different so it's problematic when we change the default value for one of both optimization first. 2. Suppose users want to share some JVM by PySpark and SparkR. They are currently forced to use the optimization for all or none if the configuration is set globally. This PR proposes two separate configuration groups for PySpark and SparkR about Arrow optimization: - Deprecate `spark.sql.execution.arrow.enabled` - Add `spark.sql.execution.arrow.pyspark.enabled` (fallback to `spark.sql.execution.arrow.enabled`) - Add `spark.sql.execution.arrow.sparkr.enabled` - Deprecate `spark.sql.execution.arrow.fallback.enabled` - Add `spark.sql.execution.arrow.pyspark.fallback.enabled ` (fallback to `spark.sql.execution.arrow.fallback.enabled`) Note that `spark.sql.execution.arrow.maxRecordsPerBatch` is used within JVM side for both. Note that `spark.sql.execution.arrow.fallback.enabled` was added due to behaviour change. We don't need it in SparkR - SparkR side has the automatic fallback. ## How was this patch tested? Manually tested and some unittests were added. Closes #24700 from HyukjinKwon/separate-sparkr-arrow. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-02 21:01:37 -04:00
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": True}):
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
self.assertFalse(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should not be "
"called before 'toPandas'")
self.spark.sql("SELECT * FROM range(1)").toPandas()
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty(10000)
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
self.assertTrue(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should be called after 'toPandas'")
if __name__ == "__main__":
from pyspark.sql.tests.test_dataframe import *
try:
import xmlrunner
[SPARK-28130][PYTHON] Print pretty messages for skipped tests when xmlrunner is available in PySpark ## What changes were proposed in this pull request? Currently, pretty skipped message added by https://github.com/apache/spark/commit/f7435bec6a9348cfbbe26b13c230c08545d16067 mechanism seems not working when xmlrunner is installed apparently. This PR fixes two things: 1. When `xmlrunner` is installed, seems `xmlrunner` does not respect `vervosity` level in unittests (default is level 1). So the output looks as below ``` Running tests... ---------------------------------------------------------------------- SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ---------------------------------------------------------------------- ``` So it is not caught by our message detection mechanism. 2. If we manually set the `vervocity` level to `xmlrunner`, it prints messages as below: ``` test_mixed_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s) test_mixed_udf_and_sql (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s) ... ``` This is different in our Jenkins machine: ``` test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.' test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.' ... ``` Note that last `SKIP` is different. This PR fixes the regular expression to catch `SKIP` case as well. ## How was this patch tested? Manually tested. **Before:** ``` Starting test(python2.7): pyspark.... Finished test(python2.7): pyspark.... (0s) ... Tests passed in 562 seconds ======================================================================== ... ``` **After:** ``` Starting test(python2.7): pyspark.... Finished test(python2.7): pyspark.... (48s) ... 93 tests were skipped ... Tests passed in 560 seconds Skipped tests pyspark.... with python2.7: pyspark...(...) ... SKIP (0.000s) ... ======================================================================== ... ``` Closes #24927 from HyukjinKwon/SPARK-28130. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-23 20:58:17 -04:00
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
[SPARK-26032][PYTHON] Break large sql/tests.py files into smaller files ## What changes were proposed in this pull request? This is the official first attempt to break huge single `tests.py` file - I did it locally before few times and gave up for some reasons. Now, currently it really makes the unittests super hard to read and difficult to check. To me, it even bothers me to to scroll down the big file. It's one single 7000 lines file! This is not only readability issue. Since one big test takes most of tests time, the tests don't run in parallel fully - although it will costs to start and stop the context. We could pick up one example and follow. Given my investigation, the current style looks closer to NumPy structure and looks easier to follow. Please see https://github.com/numpy/numpy/tree/master/numpy. Basically this PR proposes to break down `pyspark/sql/tests.py` into ...: ```bash pyspark ... ├── sql ... │   ├── tests # Includes all tests broken down from 'pyspark/sql/tests.py' │ │  │ # Each matchs to module in 'pyspark/sql'. Additionally, some logical group can │ │  │ # be added. For instance, 'test_arrow.py', 'test_datasources.py' ... │   │   ├── __init__.py │   │   ├── test_appsubmit.py │   │   ├── test_arrow.py │   │   ├── test_catalog.py │   │   ├── test_column.py │   │   ├── test_conf.py │   │   ├── test_context.py │   │   ├── test_dataframe.py │   │   ├── test_datasources.py │   │   ├── test_functions.py │   │   ├── test_group.py │   │   ├── test_pandas_udf.py │   │   ├── test_pandas_udf_grouped_agg.py │   │   ├── test_pandas_udf_grouped_map.py │   │   ├── test_pandas_udf_scalar.py │   │   ├── test_pandas_udf_window.py │   │   ├── test_readwriter.py │   │   ├── test_serde.py │   │   ├── test_session.py │   │   ├── test_streaming.py │   │   ├── test_types.py │   │   ├── test_udf.py │   │   └── test_utils.py ... ├── testing # Includes testing utils that can be used in unittests. │   ├── __init__.py │   └── sqlutils.py ... ``` ## How was this patch tested? Existing tests should cover. `cd python` and `./run-tests-with-coverage`. Manually checked they are actually being ran. Each test (not officially) can be ran via: ``` SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests.test_pandas_udf_scalar ``` Note that if you're using Mac and Python 3, you might have to `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`. Closes #23021 from HyukjinKwon/SPARK-25344. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-11-14 01:51:11 -05:00
except ImportError:
2018-11-14 23:30:52 -05:00
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)