[SPARK-24044][PYTHON] Explicitly print out skipped tests from unittest module

## What changes were proposed in this pull request?

This PR proposes to remove duplicated dependency checking logics and also print out skipped tests from unittests.

For example, as below:

```
Skipped tests in pyspark.sql.tests with pypy:
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
...

Skipped tests in pyspark.sql.tests with python3:
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
...
```

Currently, it's not printed out in the console. I think we should better print out skipped tests in the console.

## How was this patch tested?

Manually tested. Also, fortunately, Jenkins has good environment to test the skipped output.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21107 from HyukjinKwon/skipped-tests-print.
This commit is contained in:
hyukjinkwon 2018-04-26 15:11:42 -07:00 committed by Bryan Cutler
parent 4f1e38649e
commit f7435bec6a
6 changed files with 98 additions and 104 deletions

View file

@ -2136,17 +2136,23 @@ class ImageReaderTest2(PySparkTestCase):
@classmethod
def setUpClass(cls):
super(ImageReaderTest2, cls).setUpClass()
cls.hive_available = True
# Note that here we enable Hive's support.
cls.spark = None
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.spark = HiveContext._createForTesting(cls.sc)
cls.hive_available = False
if cls.hive_available:
cls.spark = HiveContext._createForTesting(cls.sc)
def setUp(self):
if not self.hive_available:
self.skipTest("Hive is not available.")
@classmethod
def tearDownClass(cls):
@ -2662,6 +2668,6 @@ class EstimatorTest(unittest.TestCase):
if __name__ == "__main__":
from pyspark.ml.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)

View file

@ -1767,9 +1767,9 @@ if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)
if not _have_scipy:
print("NOTE: SciPy tests were skipped as it does not seem to be installed")
sc.stop()

View file

@ -3096,23 +3096,28 @@ class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
filename_pattern = (
"sql/core/target/scala-*/test-classes/org/apache/spark/sql/"
"TestQueryExecutionListener.class")
if not glob.glob(os.path.join(SPARK_HOME, filename_pattern)):
raise unittest.SkipTest(
cls.has_listener = bool(glob.glob(os.path.join(SPARK_HOME, filename_pattern)))
if cls.has_listener:
# Note that 'spark.sql.queryExecutionListeners' is a static immutable configuration.
cls.spark = SparkSession.builder \
.master("local[4]") \
.appName(cls.__name__) \
.config(
"spark.sql.queryExecutionListeners",
"org.apache.spark.sql.TestQueryExecutionListener") \
.getOrCreate()
def setUp(self):
if not self.has_listener:
raise self.skipTest(
"'org.apache.spark.sql.TestQueryExecutionListener' is not "
"available. Will skip the related tests.")
# Note that 'spark.sql.queryExecutionListeners' is a static immutable configuration.
cls.spark = SparkSession.builder \
.master("local[4]") \
.appName(cls.__name__) \
.config(
"spark.sql.queryExecutionListeners",
"org.apache.spark.sql.TestQueryExecutionListener") \
.getOrCreate()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
if hasattr(cls, "spark"):
cls.spark.stop()
def tearDown(self):
self.spark._jvm.OnSuccessCall.clear()
@ -3196,18 +3201,22 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
cls.hive_available = True
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
os.unlink(cls.tempdir.name)
cls.spark = HiveContext._createForTesting(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
if cls.hive_available:
cls.spark = HiveContext._createForTesting(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
def setUp(self):
if not self.hive_available:
self.skipTest("Hive is not available.")
@classmethod
def tearDownClass(cls):
@ -5316,6 +5325,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
if __name__ == "__main__":
from pyspark.sql.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)

View file

@ -1590,11 +1590,11 @@ if __name__ == "__main__":
sys.stderr.write("[Running %s]\n" % (testcase))
tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
if xmlrunner:
result = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=3).run(tests)
result = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2).run(tests)
if not result.wasSuccessful():
failed = True
else:
result = unittest.TextTestRunner(verbosity=3).run(tests)
result = unittest.TextTestRunner(verbosity=2).run(tests)
if not result.wasSuccessful():
failed = True
sys.exit(failed)

View file

@ -2353,15 +2353,7 @@ class NumPyTests(PySparkTestCase):
if __name__ == "__main__":
from pyspark.tests import *
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if not _have_numpy:
print("NOTE: Skipping NumPy tests as it does not seem to be installed")
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
if not _have_scipy:
print("NOTE: SciPy tests were skipped as it does not seem to be installed")
if not _have_numpy:
print("NOTE: NumPy tests were skipped as it does not seem to be installed")
unittest.main(verbosity=2)

View file

@ -32,6 +32,7 @@ if sys.version < '3':
else:
import queue as Queue
from distutils.version import LooseVersion
from multiprocessing import Manager
# Append `SPARK_HOME/dev` to the Python path so that we can import the sparktestsupport module
@ -50,6 +51,7 @@ def print_red(text):
print('\033[31m' + text + '\033[0m')
SKIPPED_TESTS = Manager().dict()
LOG_FILE = os.path.join(SPARK_HOME, "python/unit-tests.log")
FAILURE_REPORTING_LOCK = Lock()
LOGGER = logging.getLogger()
@ -109,8 +111,34 @@ def run_individual_python_test(test_name, pyspark_python):
# this code is invoked from a thread other than the main thread.
os._exit(-1)
else:
per_test_output.close()
LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration)
skipped_counts = 0
try:
per_test_output.seek(0)
# Here expects skipped test output from unittest when verbosity level is
# 2 (or --verbose option is enabled).
decoded_lines = map(lambda line: line.decode(), iter(per_test_output))
skipped_tests = list(filter(
lambda line: re.search('test_.* \(pyspark\..*\) ... skipped ', line),
decoded_lines))
skipped_counts = len(skipped_tests)
if skipped_counts > 0:
key = (pyspark_python, test_name)
SKIPPED_TESTS[key] = skipped_tests
per_test_output.close()
except:
import traceback
print_red("\nGot an exception while trying to store "
"skipped test output:\n%s" % traceback.format_exc())
# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if
# this code is invoked from a thread other than the main thread.
os._exit(-1)
if skipped_counts != 0:
LOGGER.info(
"Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, test_name,
duration, skipped_counts)
else:
LOGGER.info(
"Finished test(%s): %s (%is)", pyspark_python, test_name, duration)
def get_default_python_executables():
@ -152,65 +180,17 @@ def parse_opts():
return opts
def _check_dependencies(python_exec, modules_to_test):
if "COVERAGE_PROCESS_START" in os.environ:
# Make sure if coverage is installed.
try:
subprocess_check_output(
[python_exec, "-c", "import coverage"],
stderr=open(os.devnull, 'w'))
except:
print_red("Coverage is not installed in Python executable '%s' "
"but 'COVERAGE_PROCESS_START' environment variable is set, "
"exiting." % python_exec)
sys.exit(-1)
# If we should test 'pyspark-sql', it checks if PyArrow and Pandas are installed and
# explicitly prints out. See SPARK-23300.
if pyspark_sql in modules_to_test:
# TODO(HyukjinKwon): Relocate and deduplicate these version specifications.
minimum_pyarrow_version = '0.8.0'
minimum_pandas_version = '0.19.2'
try:
pyarrow_version = subprocess_check_output(
[python_exec, "-c", "import pyarrow; print(pyarrow.__version__)"],
universal_newlines=True,
stderr=open(os.devnull, 'w')).strip()
if LooseVersion(pyarrow_version) >= LooseVersion(minimum_pyarrow_version):
LOGGER.info("Will test PyArrow related features against Python executable "
"'%s' in '%s' module." % (python_exec, pyspark_sql.name))
else:
LOGGER.warning(
"Will skip PyArrow related features against Python executable "
"'%s' in '%s' module. PyArrow >= %s is required; however, PyArrow "
"%s was found." % (
python_exec, pyspark_sql.name, minimum_pyarrow_version, pyarrow_version))
except:
LOGGER.warning(
"Will skip PyArrow related features against Python executable "
"'%s' in '%s' module. PyArrow >= %s is required; however, PyArrow "
"was not found." % (python_exec, pyspark_sql.name, minimum_pyarrow_version))
try:
pandas_version = subprocess_check_output(
[python_exec, "-c", "import pandas; print(pandas.__version__)"],
universal_newlines=True,
stderr=open(os.devnull, 'w')).strip()
if LooseVersion(pandas_version) >= LooseVersion(minimum_pandas_version):
LOGGER.info("Will test Pandas related features against Python executable "
"'%s' in '%s' module." % (python_exec, pyspark_sql.name))
else:
LOGGER.warning(
"Will skip Pandas related features against Python executable "
"'%s' in '%s' module. Pandas >= %s is required; however, Pandas "
"%s was found." % (
python_exec, pyspark_sql.name, minimum_pandas_version, pandas_version))
except:
LOGGER.warning(
"Will skip Pandas related features against Python executable "
"'%s' in '%s' module. Pandas >= %s is required; however, Pandas "
"was not found." % (python_exec, pyspark_sql.name, minimum_pandas_version))
def _check_coverage(python_exec):
# Make sure if coverage is installed.
try:
subprocess_check_output(
[python_exec, "-c", "import coverage"],
stderr=open(os.devnull, 'w'))
except:
print_red("Coverage is not installed in Python executable '%s' "
"but 'COVERAGE_PROCESS_START' environment variable is set, "
"exiting." % python_exec)
sys.exit(-1)
def main():
@ -237,9 +217,10 @@ def main():
task_queue = Queue.PriorityQueue()
for python_exec in python_execs:
# Check if the python executable has proper dependencies installed to run tests
# for given modules properly.
_check_dependencies(python_exec, modules_to_test)
# Check if the python executable has coverage installed when 'COVERAGE_PROCESS_START'
# environmental variable is set.
if "COVERAGE_PROCESS_START" in os.environ:
_check_coverage(python_exec)
python_implementation = subprocess_check_output(
[python_exec, "-c", "import platform; print(platform.python_implementation())"],
@ -281,6 +262,12 @@ def main():
total_duration = time.time() - start_time
LOGGER.info("Tests passed in %i seconds", total_duration)
for key, lines in sorted(SKIPPED_TESTS.items()):
pyspark_python, test_name = key
LOGGER.info("\nSkipped tests in %s with %s:" % (test_name, pyspark_python))
for line in lines:
LOGGER.info(" %s" % line.rstrip())
if __name__ == "__main__":
main()