2013-01-16 22:15:14 -05:00
|
|
|
"""
|
|
|
|
Unit tests for PySpark; additional tests are implemented as doctests in
|
|
|
|
individual modules.
|
|
|
|
"""
|
|
|
|
import os
|
|
|
|
import shutil
|
2013-01-23 13:36:18 -05:00
|
|
|
import sys
|
2013-01-16 22:15:14 -05:00
|
|
|
from tempfile import NamedTemporaryFile
|
|
|
|
import time
|
|
|
|
import unittest
|
|
|
|
|
|
|
|
from pyspark.context import SparkContext
|
2013-01-23 13:36:18 -05:00
|
|
|
from pyspark.files import SparkFiles
|
2013-01-22 20:54:11 -05:00
|
|
|
from pyspark.java_gateway import SPARK_HOME
|
2013-05-10 18:48:48 -04:00
|
|
|
from pyspark.serializers import read_int
|
2013-01-16 22:15:14 -05:00
|
|
|
|
|
|
|
|
2013-01-22 20:54:11 -05:00
|
|
|
class PySparkTestCase(unittest.TestCase):
|
2013-01-16 22:15:14 -05:00
|
|
|
|
|
|
|
def setUp(self):
|
2013-01-23 13:36:18 -05:00
|
|
|
self._old_sys_path = list(sys.path)
|
2013-01-22 20:54:11 -05:00
|
|
|
class_name = self.__class__.__name__
|
|
|
|
self.sc = SparkContext('local[4]', class_name , batchSize=2)
|
2013-01-16 22:15:14 -05:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.sc.stop()
|
2013-01-23 13:36:18 -05:00
|
|
|
sys.path = self._old_sys_path
|
2013-01-20 16:59:45 -05:00
|
|
|
# To avoid Akka rebinding to the same port, since it doesn't unbind
|
|
|
|
# immediately on shutdown
|
2013-02-01 14:09:56 -05:00
|
|
|
self.sc._jvm.System.clearProperty("spark.driver.port")
|
2013-01-22 20:54:11 -05:00
|
|
|
|
|
|
|
|
|
|
|
class TestCheckpoint(PySparkTestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
PySparkTestCase.setUp(self)
|
|
|
|
self.checkpointDir = NamedTemporaryFile(delete=False)
|
|
|
|
os.unlink(self.checkpointDir.name)
|
|
|
|
self.sc.setCheckpointDir(self.checkpointDir.name)
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
PySparkTestCase.tearDown(self)
|
2013-01-20 18:38:11 -05:00
|
|
|
shutil.rmtree(self.checkpointDir.name)
|
2013-01-16 22:15:14 -05:00
|
|
|
|
|
|
|
def test_basic_checkpointing(self):
|
|
|
|
parCollection = self.sc.parallelize([1, 2, 3, 4])
|
|
|
|
flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
|
|
|
|
|
|
|
|
self.assertFalse(flatMappedRDD.isCheckpointed())
|
|
|
|
self.assertIsNone(flatMappedRDD.getCheckpointFile())
|
|
|
|
|
|
|
|
flatMappedRDD.checkpoint()
|
|
|
|
result = flatMappedRDD.collect()
|
|
|
|
time.sleep(1) # 1 second
|
|
|
|
self.assertTrue(flatMappedRDD.isCheckpointed())
|
|
|
|
self.assertEqual(flatMappedRDD.collect(), result)
|
2013-01-20 18:38:11 -05:00
|
|
|
self.assertEqual(self.checkpointDir.name,
|
2013-01-16 22:15:14 -05:00
|
|
|
os.path.dirname(flatMappedRDD.getCheckpointFile()))
|
|
|
|
|
2013-01-20 16:59:45 -05:00
|
|
|
def test_checkpoint_and_restore(self):
|
|
|
|
parCollection = self.sc.parallelize([1, 2, 3, 4])
|
|
|
|
flatMappedRDD = parCollection.flatMap(lambda x: [x])
|
|
|
|
|
|
|
|
self.assertFalse(flatMappedRDD.isCheckpointed())
|
|
|
|
self.assertIsNone(flatMappedRDD.getCheckpointFile())
|
|
|
|
|
|
|
|
flatMappedRDD.checkpoint()
|
|
|
|
flatMappedRDD.count() # forces a checkpoint to be computed
|
|
|
|
time.sleep(1) # 1 second
|
|
|
|
|
|
|
|
self.assertIsNotNone(flatMappedRDD.getCheckpointFile())
|
|
|
|
recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile())
|
|
|
|
self.assertEquals([1, 2, 3, 4], recovered.collect())
|
|
|
|
|
2013-01-16 22:15:14 -05:00
|
|
|
|
2013-01-22 20:54:11 -05:00
|
|
|
class TestAddFile(PySparkTestCase):
|
|
|
|
|
|
|
|
def test_add_py_file(self):
|
|
|
|
# To ensure that we're actually testing addPyFile's effects, check that
|
|
|
|
# this job fails due to `userlibrary` not being on the Python path:
|
|
|
|
def func(x):
|
|
|
|
from userlibrary import UserClass
|
|
|
|
return UserClass().hello()
|
|
|
|
self.assertRaises(Exception,
|
|
|
|
self.sc.parallelize(range(2)).map(func).first)
|
|
|
|
# Add the file, so the job should now succeed:
|
|
|
|
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
|
|
|
|
self.sc.addPyFile(path)
|
|
|
|
res = self.sc.parallelize(range(2)).map(func).first()
|
|
|
|
self.assertEqual("Hello World!", res)
|
|
|
|
|
2013-01-23 13:36:18 -05:00
|
|
|
def test_add_file_locally(self):
|
|
|
|
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
|
|
|
|
self.sc.addFile(path)
|
|
|
|
download_path = SparkFiles.get("hello.txt")
|
|
|
|
self.assertNotEqual(path, download_path)
|
|
|
|
with open(download_path) as test_file:
|
|
|
|
self.assertEquals("Hello World!\n", test_file.readline())
|
|
|
|
|
|
|
|
def test_add_py_file_locally(self):
|
|
|
|
# To ensure that we're actually testing addPyFile's effects, check that
|
|
|
|
# this fails due to `userlibrary` not being on the Python path:
|
|
|
|
def func():
|
|
|
|
from userlibrary import UserClass
|
|
|
|
self.assertRaises(ImportError, func)
|
|
|
|
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
|
|
|
|
self.sc.addFile(path)
|
|
|
|
from userlibrary import UserClass
|
|
|
|
self.assertEqual("Hello World!", UserClass().hello())
|
|
|
|
|
2013-01-22 20:54:11 -05:00
|
|
|
|
2013-02-01 03:25:19 -05:00
|
|
|
class TestIO(PySparkTestCase):
|
|
|
|
|
|
|
|
def test_stdout_redirection(self):
|
|
|
|
import subprocess
|
|
|
|
def func(x):
|
|
|
|
subprocess.check_call('ls', shell=True)
|
|
|
|
self.sc.parallelize([1]).foreach(func)
|
|
|
|
|
|
|
|
|
2013-05-10 18:48:48 -04:00
|
|
|
class TestDaemon(unittest.TestCase):
|
|
|
|
def connect(self, port):
|
|
|
|
from socket import socket, AF_INET, SOCK_STREAM
|
|
|
|
sock = socket(AF_INET, SOCK_STREAM)
|
|
|
|
sock.connect(('127.0.0.1', port))
|
|
|
|
# send a split index of -1 to shutdown the worker
|
|
|
|
sock.send("\xFF\xFF\xFF\xFF")
|
|
|
|
sock.close()
|
|
|
|
return True
|
|
|
|
|
|
|
|
def do_termination_test(self, terminator):
|
|
|
|
from subprocess import Popen, PIPE
|
|
|
|
from errno import ECONNREFUSED
|
|
|
|
|
|
|
|
# start daemon
|
|
|
|
daemon_path = os.path.join(os.path.dirname(__file__), "daemon.py")
|
|
|
|
daemon = Popen([sys.executable, daemon_path], stdin=PIPE, stdout=PIPE)
|
|
|
|
|
|
|
|
# read the port number
|
|
|
|
port = read_int(daemon.stdout)
|
|
|
|
|
|
|
|
# daemon should accept connections
|
|
|
|
self.assertTrue(self.connect(port))
|
|
|
|
|
|
|
|
# request shutdown
|
|
|
|
terminator(daemon)
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
# daemon should no longer accept connections
|
|
|
|
with self.assertRaises(EnvironmentError) as trap:
|
|
|
|
self.connect(port)
|
|
|
|
self.assertEqual(trap.exception.errno, ECONNREFUSED)
|
|
|
|
|
|
|
|
def test_termination_stdin(self):
|
|
|
|
"""Ensure that daemon and workers terminate when stdin is closed."""
|
|
|
|
self.do_termination_test(lambda daemon: daemon.stdin.close())
|
|
|
|
|
|
|
|
def test_termination_sigterm(self):
|
|
|
|
"""Ensure that daemon and workers terminate on SIGTERM."""
|
|
|
|
from signal import SIGTERM
|
|
|
|
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
|
|
|
|
|
2013-01-16 22:15:14 -05:00
|
|
|
if __name__ == "__main__":
|
|
|
|
unittest.main()
|