[SPARK-12222] [CORE] Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception
Jira: https://issues.apache.org/jira/browse/SPARK-12222 Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception: ``` com.esotericsoftware.kryo.KryoException: Buffer underflow. at com.esotericsoftware.kryo.io.Input.require(Input.java:156) at com.esotericsoftware.kryo.io.Input.skip(Input.java:131) at com.esotericsoftware.kryo.io.Input.skip(Input.java:264) ``` This is caused by a bug of kryo's `Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) and we call this method in `KryoInputDataInputBridge`. Instead of upgrade kryo's version, this pr bypass the kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method. more detail link to https://github.com/apache/spark/pull/9748#issuecomment-162860246 Author: Fei Wang <wangfei1@huawei.com> Closes #10213 from scwf/patch-1.
This commit is contained in:
parent
a0046e379b
commit
3934562d34
|
@ -400,7 +400,15 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat
|
||||||
override def readUTF(): String = input.readString() // readString in kryo does utf8
|
override def readUTF(): String = input.readString() // readString in kryo does utf8
|
||||||
override def readInt(): Int = input.readInt()
|
override def readInt(): Int = input.readInt()
|
||||||
override def readUnsignedShort(): Int = input.readShortUnsigned()
|
override def readUnsignedShort(): Int = input.readShortUnsigned()
|
||||||
override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
|
override def skipBytes(n: Int): Int = {
|
||||||
|
var remaining: Long = n
|
||||||
|
while (remaining > 0) {
|
||||||
|
val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
|
||||||
|
input.skip(skip)
|
||||||
|
remaining -= skip
|
||||||
|
}
|
||||||
|
n
|
||||||
|
}
|
||||||
override def readFully(b: Array[Byte]): Unit = input.read(b)
|
override def readFully(b: Array[Byte]): Unit = input.read(b)
|
||||||
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
|
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
|
||||||
override def readLine(): String = throw new UnsupportedOperationException("readLine")
|
override def readLine(): String = throw new UnsupportedOperationException("readLine")
|
||||||
|
|
|
@ -17,17 +17,21 @@
|
||||||
|
|
||||||
package org.apache.spark.serializer
|
package org.apache.spark.serializer
|
||||||
|
|
||||||
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
|
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream, FileInputStream}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import com.esotericsoftware.kryo.Kryo
|
import com.esotericsoftware.kryo.Kryo
|
||||||
|
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
|
||||||
|
|
||||||
|
import org.roaringbitmap.RoaringBitmap
|
||||||
|
|
||||||
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
|
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
|
||||||
import org.apache.spark.scheduler.HighlyCompressedMapStatus
|
import org.apache.spark.scheduler.HighlyCompressedMapStatus
|
||||||
import org.apache.spark.serializer.KryoTest._
|
import org.apache.spark.serializer.KryoTest._
|
||||||
|
import org.apache.spark.util.Utils
|
||||||
import org.apache.spark.storage.BlockManagerId
|
import org.apache.spark.storage.BlockManagerId
|
||||||
|
|
||||||
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
|
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
|
||||||
|
@ -350,6 +354,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
|
||||||
assert(thrown.getMessage.contains(kryoBufferMaxProperty))
|
assert(thrown.getMessage.contains(kryoBufferMaxProperty))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") {
|
||||||
|
val dir = Utils.createTempDir()
|
||||||
|
val tmpfile = dir.toString + "/RoaringBitmap"
|
||||||
|
val outStream = new FileOutputStream(tmpfile)
|
||||||
|
val output = new KryoOutput(outStream)
|
||||||
|
val bitmap = new RoaringBitmap
|
||||||
|
bitmap.add(1)
|
||||||
|
bitmap.add(3)
|
||||||
|
bitmap.add(5)
|
||||||
|
bitmap.serialize(new KryoOutputDataOutputBridge(output))
|
||||||
|
output.flush()
|
||||||
|
output.close()
|
||||||
|
|
||||||
|
val inStream = new FileInputStream(tmpfile)
|
||||||
|
val input = new KryoInput(inStream)
|
||||||
|
val ret = new RoaringBitmap
|
||||||
|
ret.deserialize(new KryoInputDataInputBridge(input))
|
||||||
|
input.close()
|
||||||
|
assert(ret == bitmap)
|
||||||
|
Utils.deleteRecursively(dir)
|
||||||
|
}
|
||||||
|
|
||||||
test("getAutoReset") {
|
test("getAutoReset") {
|
||||||
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
|
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
|
||||||
assert(ser.getAutoReset)
|
assert(ser.getAutoReset)
|
||||||
|
|
Loading…
Reference in a new issue