[SPARK-2951] [PySpark] support unpickle array.array for Python 2.6
Pyrolite can not unpickle array.array which pickled by Python 2.6, this patch fix it by extend Pyrolite. There is a bug in Pyrolite when unpickle array of float/double, this patch workaround it by reverse the endianness for float/double. This workaround should be removed after Pyrolite have a new release to fix this issue. I had send an PR to Pyrolite to fix it: https://github.com/irmen/Pyrolite/pull/11 Author: Davies Liu <davies.liu@gmail.com> Closes #2365 from davies/pickle and squashes the following commits: f44f771 [Davies Liu] enable tests about array 3908f5c [Davies Liu] Merge branch 'master' into pickle c77c87b [Davies Liu] cleanup debugging code 60e4e2f [Davies Liu] support unpickle array.array for Python 2.6
This commit is contained in:
parent
fdb302f49c
commit
da33acb8b6
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.api.python
|
||||
|
||||
import java.nio.ByteOrder
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.util.Failure
|
||||
import scala.util.Try
|
||||
|
@ -28,6 +30,55 @@ import org.apache.spark.rdd.RDD
|
|||
|
||||
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
|
||||
private[python] object SerDeUtil extends Logging {
|
||||
// Unpickle array.array generated by Python 2.6
|
||||
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
|
||||
// /* Description of types */
|
||||
// static struct arraydescr descriptors[] = {
|
||||
// {'c', sizeof(char), c_getitem, c_setitem},
|
||||
// {'b', sizeof(char), b_getitem, b_setitem},
|
||||
// {'B', sizeof(char), BB_getitem, BB_setitem},
|
||||
// #ifdef Py_USING_UNICODE
|
||||
// {'u', sizeof(Py_UNICODE), u_getitem, u_setitem},
|
||||
// #endif
|
||||
// {'h', sizeof(short), h_getitem, h_setitem},
|
||||
// {'H', sizeof(short), HH_getitem, HH_setitem},
|
||||
// {'i', sizeof(int), i_getitem, i_setitem},
|
||||
// {'I', sizeof(int), II_getitem, II_setitem},
|
||||
// {'l', sizeof(long), l_getitem, l_setitem},
|
||||
// {'L', sizeof(long), LL_getitem, LL_setitem},
|
||||
// {'f', sizeof(float), f_getitem, f_setitem},
|
||||
// {'d', sizeof(double), d_getitem, d_setitem},
|
||||
// {'\0', 0, 0, 0} /* Sentinel */
|
||||
// };
|
||||
// TODO: support Py_UNICODE with 2 bytes
|
||||
// FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the
|
||||
// machine code for float/double here to workaround it.
|
||||
// we should fix this after Pyrolite fix them
|
||||
val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
|
||||
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
|
||||
'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21
|
||||
)
|
||||
} else {
|
||||
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
|
||||
'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20
|
||||
)
|
||||
}
|
||||
override def construct(args: Array[Object]): Object = {
|
||||
if (args.length == 1) {
|
||||
construct(args ++ Array(""))
|
||||
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
|
||||
val typecode = args(0).asInstanceOf[String].charAt(0)
|
||||
val data: String = args(1).asInstanceOf[String]
|
||||
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
|
||||
} else {
|
||||
super.construct(args)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def initialize() = {
|
||||
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
|
||||
}
|
||||
|
||||
private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
|
||||
val pickle = new Pickler
|
||||
|
|
|
@ -214,6 +214,7 @@ class SparkContext(object):
|
|||
SparkContext._gateway = gateway or launch_gateway()
|
||||
SparkContext._jvm = SparkContext._gateway.jvm
|
||||
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
|
||||
SparkContext._jvm.SerDeUtil.initialize()
|
||||
|
||||
if instance:
|
||||
if (SparkContext._active_spark_context and
|
||||
|
|
|
@ -956,8 +956,6 @@ class TestOutputFormat(PySparkTestCase):
|
|||
conf=input_conf).collect())
|
||||
self.assertEqual(new_dataset, data)
|
||||
|
||||
@unittest.skipIf(sys.version_info[:2] <= (2, 6) or python_implementation() == "PyPy",
|
||||
"Skipped on 2.6 and PyPy until SPARK-2951 is fixed")
|
||||
def test_newhadoop_with_array(self):
|
||||
basepath = self.tempdir.name
|
||||
# use custom ArrayWritable types and converters to handle arrays
|
||||
|
|
Loading…
Reference in a new issue