[PySpark] [SPARK-2954] [SPARK-2948] [SPARK-2910] [SPARK-2101] Python 2.6 Fixes

- Modify python/run-tests to test with Python 2.6
- Use unittest2 when running on Python 2.6.
- Fix issue with namedtuple.
- Skip TestOutputFormat.test_newhadoop on Python 2.6 until SPARK-2951 is fixed.
- Fix MLlib _deserialize_double on Python 2.6.

Closes #1868.  Closes #1042.

Author: Josh Rosen <joshrosen@apache.org>

Closes #1874 from JoshRosen/python2.6 and squashes the following commits:

983d259 [Josh Rosen] [SPARK-2954] Fix MLlib _deserialize_double on Python 2.6.
5d18fd7 [Josh Rosen] [SPARK-2948] [SPARK-2910] [SPARK-2101] Python 2.6 fixes
This commit is contained in:
Josh Rosen 2014-08-11 11:54:09 -07:00
parent ba28a8fcbc
commit db06a81fb7
5 changed files with 36 additions and 7 deletions

View file

@ -16,6 +16,7 @@
#
import struct
import sys
import numpy
from numpy import ndarray, float64, int64, int32, array_equal, array
from pyspark import SparkContext, RDD
@ -78,6 +79,14 @@ DENSE_MATRIX_MAGIC = 3
LABELED_POINT_MAGIC = 4
# Workaround for SPARK-2954: before Python 2.7, struct.unpack couldn't unpack bytearray()s.
if sys.version_info[:2] <= (2, 6):
def _unpack(fmt, string):
return struct.unpack(fmt, buffer(string))
else:
_unpack = struct.unpack
def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
"""
Deserialize a numpy array of the given type from an offset in
@ -191,7 +200,7 @@ def _deserialize_double(ba, offset=0):
raise TypeError("_deserialize_double called on a %s; wanted bytearray" % type(ba))
if len(ba) - offset != 8:
raise TypeError("_deserialize_double called on a %d-byte array; wanted 8 bytes." % nb)
return struct.unpack("d", ba[offset:])[0]
return _unpack("d", ba[offset:])[0]
def _deserialize_double_vector(ba, offset=0):

View file

@ -19,8 +19,13 @@
Fuller unit tests for Python MLlib.
"""
import sys
from numpy import array, array_equal
import unittest
if sys.version_info[:2] <= (2, 6):
import unittest2 as unittest
else:
import unittest
from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
_deserialize_double_vector, _dot, _squared_distance

View file

@ -314,8 +314,8 @@ def _hijack_namedtuple():
_old_namedtuple = _copy_func(collections.namedtuple)
def namedtuple(name, fields, verbose=False, rename=False):
cls = _old_namedtuple(name, fields, verbose, rename)
def namedtuple(*args, **kwargs):
cls = _old_namedtuple(*args, **kwargs)
return _hack_namedtuple(cls)
# replace namedtuple with new one

View file

@ -29,9 +29,14 @@ import subprocess
import sys
import tempfile
import time
import unittest
import zipfile
if sys.version_info[:2] <= (2, 6):
import unittest2 as unittest
else:
import unittest
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.serializers import read_int
@ -605,6 +610,7 @@ class TestOutputFormat(PySparkTestCase):
conf=input_conf).collect())
self.assertEqual(old_dataset, dict_data)
@unittest.skipIf(sys.version_info[:2] <= (2, 6), "Skipped on 2.6 until SPARK-2951 is fixed")
def test_newhadoop(self):
basepath = self.tempdir.name
# use custom ArrayWritable types and converters to handle arrays
@ -905,8 +911,9 @@ class TestSparkSubmit(unittest.TestCase):
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
path = os.path.join(self.programDir, name + ".zip")
with zipfile.ZipFile(path, 'w') as zip:
zip.writestr(name, content)
zip = zipfile.ZipFile(path, 'w')
zip.writestr(name, content)
zip.close()
return path
def test_single_script(self):

View file

@ -48,6 +48,14 @@ function run_test() {
echo "Running PySpark tests. Output is in python/unit-tests.log."
# Try to test with Python 2.6, since that's the minimum version that we support:
if [ $(which python2.6) ]; then
export PYSPARK_PYTHON="python2.6"
fi
echo "Testing with Python version:"
$PYSPARK_PYTHON --version
run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"