diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 62d445f3d7..cb2ac5ea16 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -400,7 +400,15 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat override def readUTF(): String = input.readString() // readString in kryo does utf8 override def readInt(): Int = input.readInt() override def readUnsignedShort(): Int = input.readShortUnsigned() - override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt + override def skipBytes(n: Int): Int = { + var remaining: Long = n + while (remaining > 0) { + val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int] + input.skip(skip) + remaining -= skip + } + n + } override def readFully(b: Array[Byte]): Unit = input.read(b) override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len) override def readLine(): String = throw new UnsupportedOperationException("readLine") diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index f81fe31131..9fcc22b608 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -17,17 +17,21 @@ package org.apache.spark.serializer -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream, FileInputStream} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} + +import org.roaringbitmap.RoaringBitmap import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite} import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.util.Utils import org.apache.spark.storage.BlockManagerId class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { @@ -350,6 +354,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { assert(thrown.getMessage.contains(kryoBufferMaxProperty)) } + test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") { + val dir = Utils.createTempDir() + val tmpfile = dir.toString + "/RoaringBitmap" + val outStream = new FileOutputStream(tmpfile) + val output = new KryoOutput(outStream) + val bitmap = new RoaringBitmap + bitmap.add(1) + bitmap.add(3) + bitmap.add(5) + bitmap.serialize(new KryoOutputDataOutputBridge(output)) + output.flush() + output.close() + + val inStream = new FileInputStream(tmpfile) + val input = new KryoInput(inStream) + val ret = new RoaringBitmap + ret.deserialize(new KryoInputDataInputBridge(input)) + input.close() + assert(ret == bitmap) + Utils.deleteRecursively(dir) + } + test("getAutoReset") { val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance] assert(ser.getAutoReset)