[SPARK-24126][PYSPARK] Use build-specific temp directory for pyspark tests.

This avoids polluting and leaving garbage behind in /tmp, and allows the
usual build tools to clean up any leftover files.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21198 from vanzin/SPARK-24126.
This commit is contained in:
Marcelo Vanzin 2018-05-07 13:00:18 +08:00 committed by hyukjinkwon
parent f38ea00e83
commit a634d66ce7
4 changed files with 54 additions and 18 deletions

View file

@ -3092,8 +3092,8 @@ class HiveSparkSubmitTests(SparkSubmitTests):
|print(hive_context.sql("show databases").collect())
""")
proc = subprocess.Popen(
[self.sparkSubmit, "--master", "local-cluster[1,1,1024]",
"--driver-class-path", hive_site_dir, script],
self.sparkSubmit + ["--master", "local-cluster[1,1,1024]",
"--driver-class-path", hive_site_dir, script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)

View file

@ -63,7 +63,7 @@ class PySparkStreamingTestCase(unittest.TestCase):
class_name = cls.__name__
conf = SparkConf().set("spark.default.parallelism", 1)
cls.sc = SparkContext(appName=class_name, conf=conf)
cls.sc.setCheckpointDir("/tmp")
cls.sc.setCheckpointDir(tempfile.mkdtemp())
@classmethod
def tearDownClass(cls):
@ -1549,7 +1549,9 @@ if __name__ == "__main__":
kinesis_jar_present = True
jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
jars_args = "--jars %s" % jars
os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
StreamingListenerTests]

View file

@ -1951,7 +1951,12 @@ class SparkSubmitTests(unittest.TestCase):
def setUp(self):
self.programDir = tempfile.mkdtemp()
self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
tmp_dir = tempfile.gettempdir()
self.sparkSubmit = [
os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit"),
"--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
"--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
]
def tearDown(self):
shutil.rmtree(self.programDir)
@ -2017,7 +2022,7 @@ class SparkSubmitTests(unittest.TestCase):
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect())
""")
proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
proc = subprocess.Popen(self.sparkSubmit + [script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 4, 6]", out.decode('utf-8'))
@ -2033,7 +2038,7 @@ class SparkSubmitTests(unittest.TestCase):
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(foo).collect())
""")
proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
proc = subprocess.Popen(self.sparkSubmit + [script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[3, 6, 9]", out.decode('utf-8'))
@ -2051,7 +2056,7 @@ class SparkSubmitTests(unittest.TestCase):
|def myfunc(x):
| return x + 1
""")
proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
proc = subprocess.Popen(self.sparkSubmit + ["--py-files", zip, script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
@ -2070,7 +2075,7 @@ class SparkSubmitTests(unittest.TestCase):
|def myfunc(x):
| return x + 1
""")
proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master",
proc = subprocess.Popen(self.sparkSubmit + ["--py-files", zip, "--master",
"local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
@ -2087,8 +2092,10 @@ class SparkSubmitTests(unittest.TestCase):
|print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, script], stdout=subprocess.PIPE)
proc = subprocess.Popen(
self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
@ -2103,9 +2110,11 @@ class SparkSubmitTests(unittest.TestCase):
|print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, "--master",
"local-cluster[1,1,1024]", script], stdout=subprocess.PIPE)
proc = subprocess.Popen(
self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, "--master", "local-cluster[1,1,1024]",
script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
@ -2124,7 +2133,7 @@ class SparkSubmitTests(unittest.TestCase):
# this will fail if you have different spark.executor.memory
# in conf/spark-defaults.conf
proc = subprocess.Popen(
[self.sparkSubmit, "--master", "local-cluster[1,1,1024]", script],
self.sparkSubmit + ["--master", "local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
@ -2144,7 +2153,7 @@ class SparkSubmitTests(unittest.TestCase):
| sc.stop()
""")
proc = subprocess.Popen(
[self.sparkSubmit, "--master", "local", script],
self.sparkSubmit + ["--master", "local", script],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, err = proc.communicate()

View file

@ -22,11 +22,13 @@ import logging
from optparse import OptionParser
import os
import re
import shutil
import subprocess
import sys
import tempfile
from threading import Thread, Lock
import time
import uuid
if sys.version < '3':
import Queue
else:
@ -68,7 +70,7 @@ else:
raise Exception("Cannot find assembly build directory, please build Spark first.")
def run_individual_python_test(test_name, pyspark_python):
def run_individual_python_test(target_dir, test_name, pyspark_python):
env = dict(os.environ)
env.update({
'SPARK_DIST_CLASSPATH': SPARK_DIST_CLASSPATH,
@ -77,6 +79,23 @@ def run_individual_python_test(test_name, pyspark_python):
'PYSPARK_PYTHON': which(pyspark_python),
'PYSPARK_DRIVER_PYTHON': which(pyspark_python)
})
# Create a unique temp directory under 'target/' for each run. The TMPDIR variable is
# recognized by the tempfile module to override the default system temp directory.
tmp_dir = os.path.join(target_dir, str(uuid.uuid4()))
while os.path.isdir(tmp_dir):
tmp_dir = os.path.join(target_dir, str(uuid.uuid4()))
os.mkdir(tmp_dir)
env["TMPDIR"] = tmp_dir
# Also override the JVM's temp directory by setting driver and executor options.
spark_args = [
"--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
"--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
"pyspark-shell"
]
env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args)
LOGGER.info("Starting test(%s): %s", pyspark_python, test_name)
start_time = time.time()
try:
@ -84,6 +103,7 @@ def run_individual_python_test(test_name, pyspark_python):
retcode = subprocess.Popen(
[os.path.join(SPARK_HOME, "bin/pyspark"), test_name],
stderr=per_test_output, stdout=per_test_output, env=env).wait()
shutil.rmtree(tmp_dir, ignore_errors=True)
except:
LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python)
# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if
@ -238,6 +258,11 @@ def main():
priority = 100
task_queue.put((priority, (python_exec, test_goal)))
# Create the target directory before starting tasks to avoid races.
target_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'target'))
if not os.path.isdir(target_dir):
os.mkdir(target_dir)
def process_queue(task_queue):
while True:
try:
@ -245,7 +270,7 @@ def main():
except Queue.Empty:
break
try:
run_individual_python_test(test_goal, python_exec)
run_individual_python_test(target_dir, test_goal, python_exec)
finally:
task_queue.task_done()