[SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python row with empty bytearray

## What changes were proposed in this pull request?

`PickleException` is thrown when creating dataframe from python row with empty bytearray

    spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x: {"abc": x.xx})).show()

    net.razorvine.pickle.PickleException: invalid pickle data for bytearray; expected 1 or 2 args, got 0
    	at net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java
        ...

`ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19085 from viirya/SPARK-21534.
This commit is contained in:
Liang-Chi Hsieh 2017-08-31 12:55:38 +09:00 committed by hyukjinkwon
parent 4482ff23ad
commit ecf437a648
2 changed files with 17 additions and 1 deletions

View file

@ -35,6 +35,16 @@ import org.apache.spark.rdd.RDD
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[spark] object SerDeUtil extends Logging {
class ByteArrayConstructor extends net.razorvine.pickle.objects.ByteArrayConstructor {
override def construct(args: Array[Object]): Object = {
// Deal with an empty byte array pickled by Python 3.
if (args.length == 0) {
Array.emptyByteArray
} else {
super.construct(args)
}
}
}
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
@ -108,6 +118,10 @@ private[spark] object SerDeUtil extends Logging {
synchronized{
if (!initialized) {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
Unpickler.registerConstructor("__builtin__", "bytearray", new ByteArrayConstructor())
Unpickler.registerConstructor("builtins", "bytearray", new ByteArrayConstructor())
Unpickler.registerConstructor("__builtin__", "bytes", new ByteArrayConstructor())
Unpickler.registerConstructor("_codecs", "encode", new ByteArrayConstructor())
initialized = true
}
}

View file

@ -2383,9 +2383,11 @@ class SQLTests(ReusedPySparkTestCase):
def test_BinaryType_serialization(self):
# Pyrolite version <= 4.9 could not serialize BinaryType with Python3 SPARK-17808
# The empty bytearray is test for SPARK-21534.
schema = StructType([StructField('mybytes', BinaryType())])
data = [[bytearray(b'here is my data')],
[bytearray(b'and here is some more')]]
[bytearray(b'and here is some more')],
[bytearray(b'')]]
df = self.spark.createDataFrame(data, schema=schema)
df.collect()