[SPARK-8354] [SQL] Fix off-by-factor-of-8 error when allocating scratch space in UnsafeFixedWidthAggregationMap
UnsafeFixedWidthAggregationMap contains an off-by-factor-of-8 error when allocating row conversion scratch space: we take a size requirement, measured in bytes, then allocate a long array of that size. This means that we end up allocating 8x too much conversion space. This patch fixes this by allocating a `byte[]` array instead. This doesn't impose any new limitations on the maximum sizes of UnsafeRows, since UnsafeRowConverter already used integers when calculating the size requirements for rows. Author: Josh Rosen <joshrosen@databricks.com> Closes #6809 from JoshRosen/sql-bytes-vs-words-fix and squashes the following commits: 6520339 [Josh Rosen] Updates to reflect fact that UnsafeRow max size is constrained by max byte[] size
This commit is contained in:
parent
cb7ada1196
commit
ea7fd2ff64
|
@ -39,7 +39,7 @@ public final class UnsafeFixedWidthAggregationMap {
|
|||
* An empty aggregation buffer, encoded in UnsafeRow format. When inserting a new key into the
|
||||
* map, we copy this buffer and use it as the value.
|
||||
*/
|
||||
private final long[] emptyAggregationBuffer;
|
||||
private final byte[] emptyAggregationBuffer;
|
||||
|
||||
private final StructType aggregationBufferSchema;
|
||||
|
||||
|
@ -63,10 +63,10 @@ public final class UnsafeFixedWidthAggregationMap {
|
|||
/**
|
||||
* Scratch space that is used when encoding grouping keys into UnsafeRow format.
|
||||
*
|
||||
* By default, this is a 1MB array, but it will grow as necessary in case larger keys are
|
||||
* By default, this is a 8 kb array, but it will grow as necessary in case larger keys are
|
||||
* encountered.
|
||||
*/
|
||||
private long[] groupingKeyConversionScratchSpace = new long[1024 / 8];
|
||||
private byte[] groupingKeyConversionScratchSpace = new byte[1024 * 8];
|
||||
|
||||
private final boolean enablePerfMetrics;
|
||||
|
||||
|
@ -123,13 +123,13 @@ public final class UnsafeFixedWidthAggregationMap {
|
|||
}
|
||||
|
||||
/**
|
||||
* Convert a Java object row into an UnsafeRow, allocating it into a new long array.
|
||||
* Convert a Java object row into an UnsafeRow, allocating it into a new byte array.
|
||||
*/
|
||||
private static long[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
|
||||
private static byte[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
|
||||
final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
|
||||
final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)];
|
||||
final long writtenLength =
|
||||
converter.writeRow(javaRow, unsafeRow, PlatformDependent.LONG_ARRAY_OFFSET);
|
||||
final byte[] unsafeRow = new byte[converter.getSizeRequirement(javaRow)];
|
||||
final int writtenLength =
|
||||
converter.writeRow(javaRow, unsafeRow, PlatformDependent.BYTE_ARRAY_OFFSET);
|
||||
assert (writtenLength == unsafeRow.length): "Size requirement calculation was wrong!";
|
||||
return unsafeRow;
|
||||
}
|
||||
|
@ -143,34 +143,34 @@ public final class UnsafeFixedWidthAggregationMap {
|
|||
// Make sure that the buffer is large enough to hold the key. If it's not, grow it:
|
||||
if (groupingKeySize > groupingKeyConversionScratchSpace.length) {
|
||||
// This new array will be initially zero, so there's no need to zero it out here
|
||||
groupingKeyConversionScratchSpace = new long[groupingKeySize];
|
||||
groupingKeyConversionScratchSpace = new byte[groupingKeySize];
|
||||
} else {
|
||||
// Zero out the buffer that's used to hold the current row. This is necessary in order
|
||||
// to ensure that rows hash properly, since garbage data from the previous row could
|
||||
// otherwise end up as padding in this row. As a performance optimization, we only zero out
|
||||
// the portion of the buffer that we'll actually write to.
|
||||
Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, 0);
|
||||
Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, (byte) 0);
|
||||
}
|
||||
final long actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
|
||||
final int actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
|
||||
groupingKey,
|
||||
groupingKeyConversionScratchSpace,
|
||||
PlatformDependent.LONG_ARRAY_OFFSET);
|
||||
PlatformDependent.BYTE_ARRAY_OFFSET);
|
||||
assert (groupingKeySize == actualGroupingKeySize) : "Size requirement calculation was wrong!";
|
||||
|
||||
// Probe our map using the serialized key
|
||||
final BytesToBytesMap.Location loc = map.lookup(
|
||||
groupingKeyConversionScratchSpace,
|
||||
PlatformDependent.LONG_ARRAY_OFFSET,
|
||||
PlatformDependent.BYTE_ARRAY_OFFSET,
|
||||
groupingKeySize);
|
||||
if (!loc.isDefined()) {
|
||||
// This is the first time that we've seen this grouping key, so we'll insert a copy of the
|
||||
// empty aggregation buffer into the map:
|
||||
loc.putNewKey(
|
||||
groupingKeyConversionScratchSpace,
|
||||
PlatformDependent.LONG_ARRAY_OFFSET,
|
||||
PlatformDependent.BYTE_ARRAY_OFFSET,
|
||||
groupingKeySize,
|
||||
emptyAggregationBuffer,
|
||||
PlatformDependent.LONG_ARRAY_OFFSET,
|
||||
PlatformDependent.BYTE_ARRAY_OFFSET,
|
||||
emptyAggregationBuffer.length
|
||||
);
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
|
|||
* @param baseOffset the base offset of the destination address
|
||||
* @return the number of bytes written. This should be equal to `getSizeRequirement(row)`.
|
||||
*/
|
||||
def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Long = {
|
||||
def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Int = {
|
||||
unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
|
||||
var fieldNumber = 0
|
||||
var appendCursor: Int = fixedLengthSize
|
||||
|
|
Loading…
Reference in a new issue