spark-instrumented-optimizer/python/pyspark/tests/test_appsubmit.py

249 lines
10 KiB
Python
Raw Normal View History

2018-11-14 23:30:52 -05:00
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import re
import shutil
import subprocess
import tempfile
import unittest
import zipfile
class SparkSubmitTests(unittest.TestCase):
def setUp(self):
self.programDir = tempfile.mkdtemp()
tmp_dir = tempfile.gettempdir()
self.sparkSubmit = [
os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit"),
"--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
"--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
]
def tearDown(self):
shutil.rmtree(self.programDir)
def createTempFile(self, name, content, dir=None):
"""
Create a temp file with the given name and content and return its path.
Strips leading spaces from content up to the first '|' in each line.
"""
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
if dir is None:
path = os.path.join(self.programDir, name)
else:
os.makedirs(os.path.join(self.programDir, dir))
path = os.path.join(self.programDir, dir, name)
with open(path, "w") as f:
f.write(content)
return path
def createFileInZip(self, name, content, ext=".zip", dir=None, zip_name=None):
"""
Create a zip archive containing a file with the given content and return its path.
Strips leading spaces from content up to the first '|' in each line.
"""
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
if dir is None:
path = os.path.join(self.programDir, name + ext)
else:
path = os.path.join(self.programDir, dir, zip_name + ext)
zip = zipfile.ZipFile(path, 'w')
zip.writestr(name, content)
zip.close()
return path
def create_spark_package(self, artifact_name):
group_id, artifact_id, version = artifact_name.split(":")
self.createTempFile("%s-%s.pom" % (artifact_id, version), ("""
|<?xml version="1.0" encoding="UTF-8"?>
|<project xmlns="http://maven.apache.org/POM/4.0.0"
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
| xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
| http://maven.apache.org/xsd/maven-4.0.0.xsd">
| <modelVersion>4.0.0</modelVersion>
| <groupId>%s</groupId>
| <artifactId>%s</artifactId>
| <version>%s</version>
|</project>
""" % (group_id, artifact_id, version)).lstrip(),
os.path.join(group_id, artifact_id, version))
self.createFileInZip("%s.py" % artifact_id, """
|def myfunc(x):
| return x + 1
""", ".jar", os.path.join(group_id, artifact_id, version),
"%s-%s" % (artifact_id, version))
def test_single_script(self):
"""Submit and test a single script file"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect())
""")
proc = subprocess.Popen(self.sparkSubmit + [script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 4, 6]", out.decode('utf-8'))
def test_script_with_local_functions(self):
"""Submit and test a single script file calling a global function"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|
|def foo(x):
| return x * 3
|
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(foo).collect())
""")
proc = subprocess.Popen(self.sparkSubmit + [script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[3, 6, 9]", out.decode('utf-8'))
def test_module_dependency(self):
"""Submit and test a script with a dependency on another module"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|from mylib import myfunc
|
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
zip = self.createFileInZip("mylib.py", """
|def myfunc(x):
| return x + 1
""")
proc = subprocess.Popen(self.sparkSubmit + ["--py-files", zip, script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
def test_module_dependency_on_cluster(self):
"""Submit and test a script with a dependency on another module on a cluster"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|from mylib import myfunc
|
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
zip = self.createFileInZip("mylib.py", """
|def myfunc(x):
| return x + 1
""")
proc = subprocess.Popen(self.sparkSubmit + ["--py-files", zip, "--master",
"local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
def test_package_dependency(self):
"""Submit and test a script with a dependency on a Spark Package"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|from mylib import myfunc
|
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen(
self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
def test_package_dependency_on_cluster(self):
"""Submit and test a script with a dependency on a Spark Package on a cluster"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|from mylib import myfunc
|
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen(
self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, "--master", "local-cluster[1,1,1024]",
script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
def test_single_script_on_cluster(self):
"""Submit and test a single script on a cluster"""
script = self.createTempFile("test.py", """
|from pyspark import SparkContext
|
|def foo(x):
| return x * 2
|
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(foo).collect())
""")
# this will fail if you have different spark.executor.memory
# in conf/spark-defaults.conf
proc = subprocess.Popen(
self.sparkSubmit + ["--master", "local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 4, 6]", out.decode('utf-8'))
def test_user_configuration(self):
"""Make sure user configuration is respected (SPARK-19307)"""
script = self.createTempFile("test.py", """
|from pyspark import SparkConf, SparkContext
|
|conf = SparkConf().set("spark.test_config", "1")
|sc = SparkContext(conf = conf)
|try:
| if sc._conf.get("spark.test_config") != "1":
| raise Exception("Cannot find spark.test_config in SparkContext's conf.")
|finally:
| sc.stop()
""")
proc = subprocess.Popen(
self.sparkSubmit + ["--master", "local", script],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode, msg="Process failed with error:\n {0}".format(out))
if __name__ == "__main__":
from pyspark.tests.test_appsubmit import *
try:
import xmlrunner
[SPARK-28130][PYTHON] Print pretty messages for skipped tests when xmlrunner is available in PySpark ## What changes were proposed in this pull request? Currently, pretty skipped message added by https://github.com/apache/spark/commit/f7435bec6a9348cfbbe26b13c230c08545d16067 mechanism seems not working when xmlrunner is installed apparently. This PR fixes two things: 1. When `xmlrunner` is installed, seems `xmlrunner` does not respect `vervosity` level in unittests (default is level 1). So the output looks as below ``` Running tests... ---------------------------------------------------------------------- SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ---------------------------------------------------------------------- ``` So it is not caught by our message detection mechanism. 2. If we manually set the `vervocity` level to `xmlrunner`, it prints messages as below: ``` test_mixed_udf (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s) test_mixed_udf_and_sql (pyspark.sql.tests.test_pandas_udf_scalar.ScalarPandasUDFTests) ... SKIP (0.000s) ... ``` This is different in our Jenkins machine: ``` test_createDataFrame_column_name_encoding (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.' test_createDataFrame_does_not_modify_input (pyspark.sql.tests.test_arrow.ArrowTests) ... skipped 'Pandas >= 0.23.2 must be installed; however, it was not found.' ... ``` Note that last `SKIP` is different. This PR fixes the regular expression to catch `SKIP` case as well. ## How was this patch tested? Manually tested. **Before:** ``` Starting test(python2.7): pyspark.... Finished test(python2.7): pyspark.... (0s) ... Tests passed in 562 seconds ======================================================================== ... ``` **After:** ``` Starting test(python2.7): pyspark.... Finished test(python2.7): pyspark.... (48s) ... 93 tests were skipped ... Tests passed in 560 seconds Skipped tests pyspark.... with python2.7: pyspark...(...) ... SKIP (0.000s) ... ======================================================================== ... ``` Closes #24927 from HyukjinKwon/SPARK-28130. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-06-23 20:58:17 -04:00
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
2018-11-14 23:30:52 -05:00
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)