[SPARK-36464][CORE] Fix Underlying Size Variable Initialization in ChunkedByteBufferOutputStream for Writing Over 2GB Data

### What changes were proposed in this pull request?
The `size` method of `ChunkedByteBufferOutputStream` returns a `Long` value; however, the underlying `_size` variable is initialized as `Int`.
That causes an overflow and returns a negative size when over 2GB data is written into `ChunkedByteBufferOutputStream`

This PR proposes to change the underlying `_size` variable from `Int` to `Long` at the initialization

### Why are the changes needed?
Be cause the `size` method of `ChunkedByteBufferOutputStream` incorrectly returns a negative value when over 2GB data is written.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Passed existing tests
```
build/sbt "core/testOnly *ChunkedByteBufferOutputStreamSuite"
```
Also added a new unit test
```
build/sbt "core/testOnly *ChunkedByteBufferOutputStreamSuite – -z SPARK-36464"
```

Closes #33690 from kazuyukitanimura/SPARK-36464.

Authored-by: Kazuyuki Tanimura <ktanimura@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Kazuyuki Tanimura 2021-08-10 10:29:54 -07:00 committed by Dongjoon Hyun
parent 186815be1c
commit c888bad6a1
2 changed files with 12 additions and 1 deletions

View file

@ -48,7 +48,7 @@ private[spark] class ChunkedByteBufferOutputStream(
* This can also never be 0.
*/
private[this] var position = chunkSize
private[this] var _size = 0
private[this] var _size = 0L
private[this] var closed: Boolean = false
def size: Long = _size
@ -120,4 +120,5 @@ private[spark] class ChunkedByteBufferOutputStream(
new ChunkedByteBuffer(ret)
}
}
}

View file

@ -119,4 +119,14 @@ class ChunkedByteBufferOutputStreamSuite extends SparkFunSuite {
assert(arrays(1).toSeq === ref.slice(10, 20))
assert(arrays(2).toSeq === ref.slice(20, 30))
}
test("SPARK-36464: size returns correct positive number even with over 2GB data") {
val ref = new Array[Byte](1024 * 1024 * 1024)
val o = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
o.write(ref)
o.write(ref)
o.close()
assert(o.size > 0L) // make sure it is not overflowing
assert(o.size == ref.length.toLong * 2)
}
}