From c888bad6a12b45f3eda8d898bdd90405985ee05c Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Tue, 10 Aug 2021 10:29:54 -0700 Subject: [PATCH] [SPARK-36464][CORE] Fix Underlying Size Variable Initialization in ChunkedByteBufferOutputStream for Writing Over 2GB Data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? The `size` method of `ChunkedByteBufferOutputStream` returns a `Long` value; however, the underlying `_size` variable is initialized as `Int`. That causes an overflow and returns a negative size when over 2GB data is written into `ChunkedByteBufferOutputStream` This PR proposes to change the underlying `_size` variable from `Int` to `Long` at the initialization ### Why are the changes needed? Be cause the `size` method of `ChunkedByteBufferOutputStream` incorrectly returns a negative value when over 2GB data is written. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Passed existing tests ``` build/sbt "core/testOnly *ChunkedByteBufferOutputStreamSuite" ``` Also added a new unit test ``` build/sbt "core/testOnly *ChunkedByteBufferOutputStreamSuite – -z SPARK-36464" ``` Closes #33690 from kazuyukitanimura/SPARK-36464. Authored-by: Kazuyuki Tanimura Signed-off-by: Dongjoon Hyun --- .../spark/util/io/ChunkedByteBufferOutputStream.scala | 3 ++- .../util/io/ChunkedByteBufferOutputStreamSuite.scala | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala index a625b32895..34d36655a6 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala @@ -48,7 +48,7 @@ private[spark] class ChunkedByteBufferOutputStream( * This can also never be 0. */ private[this] var position = chunkSize - private[this] var _size = 0 + private[this] var _size = 0L private[this] var closed: Boolean = false def size: Long = _size @@ -120,4 +120,5 @@ private[spark] class ChunkedByteBufferOutputStream( new ChunkedByteBuffer(ret) } } + } diff --git a/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala index 8696174567..29443e275f 100644 --- a/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala @@ -119,4 +119,14 @@ class ChunkedByteBufferOutputStreamSuite extends SparkFunSuite { assert(arrays(1).toSeq === ref.slice(10, 20)) assert(arrays(2).toSeq === ref.slice(20, 30)) } + + test("SPARK-36464: size returns correct positive number even with over 2GB data") { + val ref = new Array[Byte](1024 * 1024 * 1024) + val o = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate) + o.write(ref) + o.write(ref) + o.close() + assert(o.size > 0L) // make sure it is not overflowing + assert(o.size == ref.length.toLong * 2) + } }