[SPARK-22033][CORE] BufferHolder, other size checks should account for the specific VM array size limitations

## What changes were proposed in this pull request?

Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19266 from srowen/SPARK-22033.
This commit is contained in:
Sean Owen 2017-09-23 15:40:59 +01:00
parent 3920af7d1d
commit 50ada2a4d3
6 changed files with 32 additions and 19 deletions

View file

@ -39,7 +39,7 @@ public final class LongArray {
private final long length;
public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 billion elements";
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();

View file

@ -30,11 +30,17 @@ public interface HashMapGrowthStrategy {
HashMapGrowthStrategy DOUBLING = new Doubling();
class Doubling implements HashMapGrowthStrategy {
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
@Override
public int nextCapacity(int currentCapacity) {
assert (currentCapacity > 0);
int doubleCapacity = currentCapacity * 2;
// Guard against overflow
return (currentCapacity * 2 > 0) ? (currentCapacity * 2) : Integer.MAX_VALUE;
return (doubleCapacity > 0 && doubleCapacity <= ARRAY_MAX) ? doubleCapacity : ARRAY_MAX;
}
}

View file

@ -126,22 +126,22 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
/** Increase our size to newSize and grow the backing array if needed. */
private def growToSize(newSize: Int): Unit = {
if (newSize < 0) {
throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
val arrayMax = Int.MaxValue - 8
if (newSize < 0 || newSize - 2 > arrayMax) {
throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")
}
val capacity = if (otherElements != null) otherElements.length + 2 else 2
if (newSize > capacity) {
var newArrayLen = 8
var newArrayLen = 8L
while (newSize - 2 > newArrayLen) {
newArrayLen *= 2
if (newArrayLen == Int.MinValue) {
// Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
// Note that we set the new array length to Int.MaxValue - 2 so that our capacity
// calculation above still gives a positive integer.
newArrayLen = Int.MaxValue - 2
}
if (newArrayLen > arrayMax) {
newArrayLen = arrayMax
}
val newArray = new Array[T](newArrayLen)
val newArray = new Array[T](newArrayLen.toInt)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}

View file

@ -25,7 +25,7 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._
* Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
* of its estimated size in bytes.
*
* The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements.
* The buffer can support up to 1073741819 elements.
*/
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
extends WritablePartitionedPairCollection[K, V] with SizeTracker
@ -59,7 +59,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
}
val newCapacity =
if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
MAXIMUM_CAPACITY
} else {
capacity * 2
@ -96,5 +96,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
}
private object PartitionedPairBuffer {
val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2
}

View file

@ -35,6 +35,11 @@ import org.apache.spark.unsafe.Platform;
* if the fields of row are all fixed-length, as the size of result row is also fixed.
*/
public class BufferHolder {
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
private final UnsafeRow row;
@ -61,15 +66,15 @@ public class BufferHolder {
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
public void grow(int neededSize) {
if (neededSize > Integer.MAX_VALUE - totalSize()) {
if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + Integer.MAX_VALUE);
"exceeds size limitation " + ARRAY_MAX);
}
final int length = totalSize() + neededSize;
if (buffer.length < length) {
// This will not happen frequently, because the buffer is re-used.
int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : Integer.MAX_VALUE;
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
final byte[] tmp = new byte[newLength];
Platform.copyMemory(
buffer,

View file

@ -559,7 +559,7 @@ public abstract class WritableColumnVector extends ColumnVector {
* Upper limit for the maximum capacity for this column.
*/
@VisibleForTesting
protected int MAX_CAPACITY = Integer.MAX_VALUE;
protected int MAX_CAPACITY = Integer.MAX_VALUE - 8;
/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.