[SPARK-16123] Avoid NegativeArraySizeException while reserving additional capacity in VectorizedColumnReader

## What changes were proposed in this pull request?

This patch fixes an overflow bug in vectorized parquet reader where both off-heap and on-heap variants of `ColumnVector.reserve()` can unfortunately overflow while reserving additional capacity during reads.

## How was this patch tested?

Manual Tests

Author: Sameer Agarwal <sameer@databricks.com>

Closes #13832 from sameeragarwal/negative-array.
This commit is contained in:
Sameer Agarwal 2016-06-23 18:21:41 -07:00 committed by Herman van Hovell
parent 264bc63623
commit cc71d4fa37
4 changed files with 47 additions and 14 deletions

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized;
import java.math.BigDecimal;
import java.math.BigInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;
@ -27,6 +28,7 @@ import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@ -277,11 +279,25 @@ public abstract class ColumnVector implements AutoCloseable {
*/
public abstract void close();
/*
public void reserve(int requiredCapacity) {
if (requiredCapacity > capacity) {
int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
if (requiredCapacity <= newCapacity) {
reserveInternal(newCapacity);
} else {
throw new RuntimeException("Cannot reserve more than " + newCapacity +
" bytes in the vectorized reader (requested = " + requiredCapacity + " bytes). As a " +
"workaround, you can disable the vectorized reader by setting "
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + " to false.");
}
}
}
/**
* Ensures that there is enough storage to store capcity elements. That is, the put() APIs
* must work for all rowIds < capcity.
*/
public abstract void reserve(int capacity);
protected abstract void reserveInternal(int capacity);
/**
* Returns the number of nulls in this column.
@ -846,6 +862,12 @@ public abstract class ColumnVector implements AutoCloseable {
*/
protected int capacity;
/**
* Upper limit for the maximum capacity for this column.
*/
@VisibleForTesting
protected int MAX_CAPACITY = Integer.MAX_VALUE;
/**
* Data type for this column.
*/

View file

@ -422,13 +422,9 @@ public final class OffHeapColumnVector extends ColumnVector {
array.byteArrayOffset = 0;
}
@Override
public void reserve(int requiredCapacity) {
if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
}
// Split out the slow path.
private void reserveInternal(int newCapacity) {
@Override
protected void reserveInternal(int newCapacity) {
if (this.resultArray != null) {
this.lengthData =
Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4);

View file

@ -392,13 +392,9 @@ public final class OnHeapColumnVector extends ColumnVector {
return result;
}
@Override
public void reserve(int requiredCapacity) {
if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
}
// Spilt this function out since it is the slow path.
private void reserveInternal(int newCapacity) {
@Override
protected void reserveInternal(int newCapacity) {
if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
int[] newLengths = new int[newCapacity];
int[] newOffsets = new int[newCapacity];

View file

@ -787,4 +787,23 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}
}
test("exceeding maximum capacity should throw an error") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
val column = ColumnVector.allocate(1, ByteType, memMode)
column.MAX_CAPACITY = 15
column.appendBytes(5, 0.toByte)
// Successfully allocate twice the requested capacity
assert(column.capacity == 10)
column.appendBytes(10, 0.toByte)
// Allocated capacity doesn't exceed MAX_CAPACITY
assert(column.capacity == 15)
val ex = intercept[RuntimeException] {
// Over-allocating beyond MAX_CAPACITY throws an exception
column.appendBytes(10, 0.toByte)
}
assert(ex.getMessage.contains(s"Cannot reserve more than ${column.MAX_CAPACITY} bytes in " +
s"the vectorized reader"))
}
}
}