spark-instrumented-optimizer/python/pyspark/tests.py

228 lines
8.3 KiB
Python
Raw Normal View History

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
2013-01-16 22:15:14 -05:00
"""
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
"""
from fileinput import input
from glob import glob
2013-01-16 22:15:14 -05:00
import os
import shutil
import sys
2013-01-16 22:15:14 -05:00
from tempfile import NamedTemporaryFile
import time
import unittest
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.java_gateway import SPARK_HOME
from pyspark.serializers import read_int
2013-01-16 22:15:14 -05:00
class PySparkTestCase(unittest.TestCase):
2013-01-16 22:15:14 -05:00
def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
self.sc = SparkContext('local[4]', class_name , batchSize=2)
2013-01-16 22:15:14 -05:00
def tearDown(self):
self.sc.stop()
sys.path = self._old_sys_path
# To avoid Akka rebinding to the same port, since it doesn't unbind
# immediately on shutdown
self.sc._jvm.System.clearProperty("spark.driver.port")
class TestCheckpoint(PySparkTestCase):
def setUp(self):
PySparkTestCase.setUp(self)
self.checkpointDir = NamedTemporaryFile(delete=False)
os.unlink(self.checkpointDir.name)
self.sc.setCheckpointDir(self.checkpointDir.name)
def tearDown(self):
PySparkTestCase.tearDown(self)
shutil.rmtree(self.checkpointDir.name)
2013-01-16 22:15:14 -05:00
def test_basic_checkpointing(self):
parCollection = self.sc.parallelize([1, 2, 3, 4])
flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
self.assertFalse(flatMappedRDD.isCheckpointed())
2013-08-14 18:12:12 -04:00
self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
2013-01-16 22:15:14 -05:00
flatMappedRDD.checkpoint()
result = flatMappedRDD.collect()
time.sleep(1) # 1 second
self.assertTrue(flatMappedRDD.isCheckpointed())
self.assertEqual(flatMappedRDD.collect(), result)
self.assertEqual("file:" + self.checkpointDir.name,
os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile())))
2013-01-16 22:15:14 -05:00
def test_checkpoint_and_restore(self):
parCollection = self.sc.parallelize([1, 2, 3, 4])
flatMappedRDD = parCollection.flatMap(lambda x: [x])
self.assertFalse(flatMappedRDD.isCheckpointed())
2013-08-14 18:12:12 -04:00
self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
flatMappedRDD.checkpoint()
flatMappedRDD.count() # forces a checkpoint to be computed
time.sleep(1) # 1 second
2013-08-14 18:12:12 -04:00
self.assertTrue(flatMappedRDD.getCheckpointFile() is not None)
recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile(),
flatMappedRDD._jrdd_deserializer)
self.assertEquals([1, 2, 3, 4], recovered.collect())
2013-01-16 22:15:14 -05:00
class TestAddFile(PySparkTestCase):
def test_add_py_file(self):
# To ensure that we're actually testing addPyFile's effects, check that
# this job fails due to `userlibrary` not being on the Python path:
def func(x):
from userlibrary import UserClass
return UserClass().hello()
self.assertRaises(Exception,
self.sc.parallelize(range(2)).map(func).first)
# Add the file, so the job should now succeed:
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
self.sc.addPyFile(path)
res = self.sc.parallelize(range(2)).map(func).first()
self.assertEqual("Hello World!", res)
def test_add_file_locally(self):
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
self.sc.addFile(path)
download_path = SparkFiles.get("hello.txt")
self.assertNotEqual(path, download_path)
with open(download_path) as test_file:
self.assertEquals("Hello World!\n", test_file.readline())
def test_add_py_file_locally(self):
# To ensure that we're actually testing addPyFile's effects, check that
# this fails due to `userlibrary` not being on the Python path:
def func():
from userlibrary import UserClass
self.assertRaises(ImportError, func)
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
self.sc.addFile(path)
from userlibrary import UserClass
self.assertEqual("Hello World!", UserClass().hello())
def test_add_egg_file_locally(self):
# To ensure that we're actually testing addPyFile's effects, check that
# this fails due to `userlibrary` not being on the Python path:
def func():
from userlib import UserClass
self.assertRaises(ImportError, func)
path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg")
self.sc.addPyFile(path)
from userlib import UserClass
self.assertEqual("Hello World from inside a package!", UserClass().hello())
class TestRDDFunctions(PySparkTestCase):
def test_save_as_textfile_with_unicode(self):
# Regression test for SPARK-970
x = u"\u00A1Hola, mundo!"
data = self.sc.parallelize([x])
tempFile = NamedTemporaryFile(delete=True)
tempFile.close()
data.saveAsTextFile(tempFile.name)
raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
self.assertEqual(x, unicode(raw_contents.strip(), "utf-8"))
def test_transforming_cartesian_result(self):
# Regression test for SPARK-1034
rdd1 = self.sc.parallelize([1, 2])
rdd2 = self.sc.parallelize([3, 4])
cart = rdd1.cartesian(rdd2)
result = cart.map(lambda (x, y): x + y).collect()
def test_cartesian_on_textfile(self):
# Regression test for
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
a = self.sc.textFile(path)
result = a.cartesian(a).collect()
(x, y) = result[0]
self.assertEqual("Hello World!", x.strip())
self.assertEqual("Hello World!", y.strip())
2013-02-01 03:25:19 -05:00
class TestIO(PySparkTestCase):
def test_stdout_redirection(self):
import subprocess
def func(x):
subprocess.check_call('ls', shell=True)
self.sc.parallelize([1]).foreach(func)
class TestDaemon(unittest.TestCase):
def connect(self, port):
from socket import socket, AF_INET, SOCK_STREAM
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('127.0.0.1', port))
# send a split index of -1 to shutdown the worker
sock.send("\xFF\xFF\xFF\xFF")
sock.close()
return True
def do_termination_test(self, terminator):
from subprocess import Popen, PIPE
from errno import ECONNREFUSED
# start daemon
daemon_path = os.path.join(os.path.dirname(__file__), "daemon.py")
daemon = Popen([sys.executable, daemon_path], stdin=PIPE, stdout=PIPE)
# read the port number
port = read_int(daemon.stdout)
# daemon should accept connections
self.assertTrue(self.connect(port))
# request shutdown
terminator(daemon)
time.sleep(1)
# daemon should no longer accept connections
2013-08-14 18:12:12 -04:00
try:
self.connect(port)
2013-08-14 18:12:12 -04:00
except EnvironmentError as exception:
self.assertEqual(exception.errno, ECONNREFUSED)
else:
self.fail("Expected EnvironmentError to be raised")
def test_termination_stdin(self):
"""Ensure that daemon and workers terminate when stdin is closed."""
self.do_termination_test(lambda daemon: daemon.stdin.close())
def test_termination_sigterm(self):
"""Ensure that daemon and workers terminate on SIGTERM."""
from signal import SIGTERM
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
2013-01-16 22:15:14 -05:00
if __name__ == "__main__":
unittest.main()