fdcad6ef48
Running run-tests.py with Python 2.6 cause following error: ``` Running PySpark tests. Output is in python//Users/tomohiko/.jenkins/jobs/pyspark_test/workspace/python/unit-tests.log Will test against the following Python executables: ['python2.6', 'python3.4', 'pypy'] Will test the following Python modules: ['pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-sql', 'pyspark-streaming'] Traceback (most recent call last): File "./python/run-tests.py", line 196, in <module> main() File "./python/run-tests.py", line 159, in main python_implementation = subprocess.check_output( AttributeError: 'module' object has no attribute 'check_output' ... ``` The cause of this error is using subprocess.check_output function, which exists since Python 2.7. (ref. https://docs.python.org/2.7/library/subprocess.html#subprocess.check_output) Author: cocoatomo <cocoatomo77@gmail.com> Closes #7161 from cocoatomo/issues/8763-test-fails-py26 and squashes the following commits: cf4f901 [cocoatomo] [SPARK-8763] backport process.check_output function from Python 2.7
214 lines
8 KiB
Python
Executable file
214 lines
8 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
from __future__ import print_function
|
|
import logging
|
|
from optparse import OptionParser
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from threading import Thread, Lock
|
|
import time
|
|
if sys.version < '3':
|
|
import Queue
|
|
else:
|
|
import queue as Queue
|
|
if sys.version_info >= (2, 7):
|
|
subprocess_check_output = subprocess.check_output
|
|
else:
|
|
# SPARK-8763
|
|
# backported from subprocess module in Python 2.7
|
|
def subprocess_check_output(*popenargs, **kwargs):
|
|
if 'stdout' in kwargs:
|
|
raise ValueError('stdout argument not allowed, it will be overridden.')
|
|
process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
|
|
output, unused_err = process.communicate()
|
|
retcode = process.poll()
|
|
if retcode:
|
|
cmd = kwargs.get("args")
|
|
if cmd is None:
|
|
cmd = popenargs[0]
|
|
raise subprocess.CalledProcessError(retcode, cmd, output=output)
|
|
return output
|
|
|
|
|
|
# Append `SPARK_HOME/dev` to the Python path so that we can import the sparktestsupport module
|
|
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../dev/"))
|
|
|
|
|
|
from sparktestsupport import SPARK_HOME # noqa (suppress pep8 warnings)
|
|
from sparktestsupport.shellutils import which # noqa
|
|
from sparktestsupport.modules import all_modules # noqa
|
|
|
|
|
|
python_modules = dict((m.name, m) for m in all_modules if m.python_test_goals if m.name != 'root')
|
|
|
|
|
|
def print_red(text):
|
|
print('\033[31m' + text + '\033[0m')
|
|
|
|
|
|
LOG_FILE = os.path.join(SPARK_HOME, "python/unit-tests.log")
|
|
FAILURE_REPORTING_LOCK = Lock()
|
|
LOGGER = logging.getLogger()
|
|
|
|
|
|
def run_individual_python_test(test_name, pyspark_python):
|
|
env = {'SPARK_TESTING': '1', 'PYSPARK_PYTHON': which(pyspark_python)}
|
|
LOGGER.debug("Starting test(%s): %s", pyspark_python, test_name)
|
|
start_time = time.time()
|
|
try:
|
|
per_test_output = tempfile.TemporaryFile()
|
|
retcode = subprocess.Popen(
|
|
[os.path.join(SPARK_HOME, "bin/pyspark"), test_name],
|
|
stderr=per_test_output, stdout=per_test_output, env=env).wait()
|
|
except:
|
|
LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python)
|
|
# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if
|
|
# this code is invoked from a thread other than the main thread.
|
|
os._exit(1)
|
|
duration = time.time() - start_time
|
|
# Exit on the first failure.
|
|
if retcode != 0:
|
|
try:
|
|
with FAILURE_REPORTING_LOCK:
|
|
with open(LOG_FILE, 'ab') as log_file:
|
|
per_test_output.seek(0)
|
|
log_file.writelines(per_test_output)
|
|
per_test_output.seek(0)
|
|
for line in per_test_output:
|
|
decoded_line = line.decode()
|
|
if not re.match('[0-9]+', decoded_line):
|
|
print(decoded_line, end='')
|
|
per_test_output.close()
|
|
except:
|
|
LOGGER.exception("Got an exception while trying to print failed test output")
|
|
finally:
|
|
print_red("\nHad test failures in %s with %s; see logs." % (test_name, pyspark_python))
|
|
# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if
|
|
# this code is invoked from a thread other than the main thread.
|
|
os._exit(-1)
|
|
else:
|
|
per_test_output.close()
|
|
LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration)
|
|
|
|
|
|
def get_default_python_executables():
|
|
python_execs = [x for x in ["python2.6", "python3.4", "pypy"] if which(x)]
|
|
if "python2.6" not in python_execs:
|
|
LOGGER.warning("Not testing against `python2.6` because it could not be found; falling"
|
|
" back to `python` instead")
|
|
python_execs.insert(0, "python")
|
|
return python_execs
|
|
|
|
|
|
def parse_opts():
|
|
parser = OptionParser(
|
|
prog="run-tests"
|
|
)
|
|
parser.add_option(
|
|
"--python-executables", type="string", default=','.join(get_default_python_executables()),
|
|
help="A comma-separated list of Python executables to test against (default: %default)"
|
|
)
|
|
parser.add_option(
|
|
"--modules", type="string",
|
|
default=",".join(sorted(python_modules.keys())),
|
|
help="A comma-separated list of Python modules to test (default: %default)"
|
|
)
|
|
parser.add_option(
|
|
"-p", "--parallelism", type="int", default=4,
|
|
help="The number of suites to test in parallel (default %default)"
|
|
)
|
|
parser.add_option(
|
|
"--verbose", action="store_true",
|
|
help="Enable additional debug logging"
|
|
)
|
|
|
|
(opts, args) = parser.parse_args()
|
|
if args:
|
|
parser.error("Unsupported arguments: %s" % ' '.join(args))
|
|
if opts.parallelism < 1:
|
|
parser.error("Parallelism cannot be less than 1")
|
|
return opts
|
|
|
|
|
|
def main():
|
|
opts = parse_opts()
|
|
if (opts.verbose):
|
|
log_level = logging.DEBUG
|
|
else:
|
|
log_level = logging.INFO
|
|
logging.basicConfig(stream=sys.stdout, level=log_level, format="%(message)s")
|
|
LOGGER.info("Running PySpark tests. Output is in python/%s", LOG_FILE)
|
|
if os.path.exists(LOG_FILE):
|
|
os.remove(LOG_FILE)
|
|
python_execs = opts.python_executables.split(',')
|
|
modules_to_test = []
|
|
for module_name in opts.modules.split(','):
|
|
if module_name in python_modules:
|
|
modules_to_test.append(python_modules[module_name])
|
|
else:
|
|
print("Error: unrecognized module %s" % module_name)
|
|
sys.exit(-1)
|
|
LOGGER.info("Will test against the following Python executables: %s", python_execs)
|
|
LOGGER.info("Will test the following Python modules: %s", [x.name for x in modules_to_test])
|
|
|
|
task_queue = Queue.Queue()
|
|
for python_exec in python_execs:
|
|
python_implementation = subprocess_check_output(
|
|
[python_exec, "-c", "import platform; print(platform.python_implementation())"],
|
|
universal_newlines=True).strip()
|
|
LOGGER.debug("%s python_implementation is %s", python_exec, python_implementation)
|
|
LOGGER.debug("%s version is: %s", python_exec, subprocess_check_output(
|
|
[python_exec, "--version"], stderr=subprocess.STDOUT, universal_newlines=True).strip())
|
|
for module in modules_to_test:
|
|
if python_implementation not in module.blacklisted_python_implementations:
|
|
for test_goal in module.python_test_goals:
|
|
task_queue.put((python_exec, test_goal))
|
|
|
|
def process_queue(task_queue):
|
|
while True:
|
|
try:
|
|
(python_exec, test_goal) = task_queue.get_nowait()
|
|
except Queue.Empty:
|
|
break
|
|
try:
|
|
run_individual_python_test(test_goal, python_exec)
|
|
finally:
|
|
task_queue.task_done()
|
|
|
|
start_time = time.time()
|
|
for _ in range(opts.parallelism):
|
|
worker = Thread(target=process_queue, args=(task_queue,))
|
|
worker.daemon = True
|
|
worker.start()
|
|
try:
|
|
task_queue.join()
|
|
except (KeyboardInterrupt, SystemExit):
|
|
print_red("Exiting due to interrupt")
|
|
sys.exit(-1)
|
|
total_duration = time.time() - start_time
|
|
LOGGER.info("Tests passed in %i seconds", total_duration)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|