[SPARK-25786][CORE] If the ByteBuffer.hasArray is false , it will throw UnsupportedOperationException for Kryo

## What changes were proposed in this pull request?
`deserialize` for kryo,  the type of input parameter is ByteBuffer, if it is not backed by an accessible byte array. it will throw `UnsupportedOperationException`

Exception Info:
```
java.lang.UnsupportedOperationException was thrown.
java.lang.UnsupportedOperationException
    at java.nio.ByteBuffer.array(ByteBuffer.java:994)
    at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:362)
```

## How was this patch tested?

Added a unit test

Closes #22779 from 10110346/InputStreamKryo.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
liuxian 2018-11-24 09:10:15 -06:00 committed by Sean Owen
parent de84899204
commit 7f5f7a967d
2 changed files with 25 additions and 3 deletions

View file

@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.collection.CompactBuffer
/**
@ -417,7 +417,12 @@ private[spark] class KryoSerializerInstance(
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val kryo = borrowKryo()
try {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
if (bytes.hasArray) {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
} else {
input.setBuffer(new Array[Byte](4096))
input.setInputStream(new ByteBufferInputStream(bytes))
}
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
releaseKryo(kryo)
@ -429,7 +434,12 @@ private[spark] class KryoSerializerInstance(
val oldClassLoader = kryo.getClassLoader
try {
kryo.setClassLoader(loader)
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
if (bytes.hasArray) {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
} else {
input.setBuffer(new Array[Byte](4096))
input.setInputStream(new ByteBufferInputStream(bytes))
}
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
kryo.setClassLoader(oldClassLoader)

View file

@ -18,6 +18,7 @@
package org.apache.spark.serializer
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileInputStream, FileOutputStream}
import java.nio.ByteBuffer
import java.util.concurrent.Executors
import scala.collection.JavaConverters._
@ -551,6 +552,17 @@ class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSpar
deserializationStream.close()
assert(serInstance.deserialize[Any](helloHello) === ((hello, hello)))
}
test("SPARK-25786: ByteBuffer.array -- UnsupportedOperationException") {
val serInstance = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
val obj = "UnsupportedOperationException"
val serObj = serInstance.serialize(obj)
val byteBuffer = ByteBuffer.allocateDirect(serObj.array().length)
byteBuffer.put(serObj.array())
byteBuffer.flip()
assert(serInstance.deserialize[Any](serObj) === (obj))
assert(serInstance.deserialize[Any](byteBuffer) === (obj))
}
}
class ClassLoaderTestingObject