[SPARK-16962][CORE][SQL] Fix misaligned record accesses for SPARC architectures

## What changes were proposed in this pull request?

Made changes to record length offsets to make them uniform throughout various areas of Spark core and unsafe

## How was this patch tested?

This change affects only SPARC architectures and was tested on X86 architectures as well for regression.

Author: sumansomasundar <suman.somasundar@oracle.com>

Closes #14762 from sumansomasundar/master.
This commit is contained in:
sumansomasundar 2016-10-04 10:31:56 +01:00 committed by Sean Owen
parent 8e8de0073d
commit 7d51608835
No known key found for this signature in database
GPG key ID: BEB3956D6717BDDC
6 changed files with 144 additions and 46 deletions

View file

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.unsafe;
/**
* Class to make changes to record length offsets uniform through out
* various areas of Apache Spark core and unsafe. The SPARC platform
* requires this because using a 4 byte Int for record lengths causes
* the entire record of 8 byte Items to become misaligned by 4 bytes.
* Using a 8 byte long for record length keeps things 8 byte aligned.
*/
public class UnsafeAlignedOffset {
private static final int UAO_SIZE = Platform.unaligned() ? 4 : 8;
public static int getUaoSize() {
return UAO_SIZE;
}
public static int getSize(Object object, long offset) {
switch (UAO_SIZE) {
case 4:
return Platform.getInt(object, offset);
case 8:
return (int)Platform.getLong(object, offset);
default:
throw new AssertionError("Illegal UAO_SIZE");
}
}
public static void putSize(Object object, long offset, int value) {
switch (UAO_SIZE) {
case 4:
Platform.putInt(object, offset, value);
break;
case 8:
Platform.putLong(object, offset, value);
break;
default:
throw new AssertionError("Illegal UAO_SIZE");
}
}
}

View file

@ -40,6 +40,7 @@ public class ByteArrayMethods {
}
}
private static final boolean unaligned = Platform.unaligned();
/**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
@ -47,17 +48,33 @@ public class ByteArrayMethods {
public static boolean arrayEquals(
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
int i = 0;
while (i <= length - 8) {
if (Platform.getLong(leftBase, leftOffset + i) !=
Platform.getLong(rightBase, rightOffset + i)) {
return false;
// check if stars align and we can get both offsets to be aligned
if ((leftOffset % 8) == (rightOffset % 8)) {
while ((leftOffset + i) % 8 != 0 && i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
Platform.getByte(rightBase, rightOffset + i)) {
return false;
}
i += 1;
}
i += 8;
}
// for architectures that suport unaligned accesses, chew it up 8 bytes at a time
if (unaligned || (((leftOffset + i) % 8 == 0) && ((rightOffset + i) % 8 == 0))) {
while (i <= length - 8) {
if (Platform.getLong(leftBase, leftOffset + i) !=
Platform.getLong(rightBase, rightOffset + i)) {
return false;
}
i += 8;
}
}
// this will finish off the unaligned comparisons, or do the entire aligned
// comparison whichever is needed.
while (i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
Platform.getByte(rightBase, rightOffset + i)) {
return false;
Platform.getByte(rightBase, rightOffset + i)) {
return false;
}
i += 1;
}

View file

@ -35,6 +35,7 @@ import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
@ -273,8 +274,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
currentPage = dataPages.get(nextIdx);
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
recordsInPage = Platform.getInt(pageBaseObject, offsetInPage);
offsetInPage += 4;
recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
offsetInPage += UnsafeAlignedOffset.getUaoSize();
} else {
currentPage = null;
if (reader != null) {
@ -321,10 +322,10 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
numRecords--;
if (currentPage != null) {
int totalLength = Platform.getInt(pageBaseObject, offsetInPage);
int totalLength = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
loc.with(currentPage, offsetInPage);
// [total size] [key size] [key] [value] [pointer to next]
offsetInPage += 4 + totalLength + 8;
offsetInPage += UnsafeAlignedOffset.getUaoSize() + totalLength + 8;
recordsInPage --;
return loc;
} else {
@ -367,14 +368,15 @@ public final class BytesToBytesMap extends MemoryConsumer {
Object base = block.getBaseObject();
long offset = block.getBaseOffset();
int numRecords = Platform.getInt(base, offset);
offset += 4;
int numRecords = UnsafeAlignedOffset.getSize(base, offset);
int uaoSize = UnsafeAlignedOffset.getUaoSize();
offset += uaoSize;
final UnsafeSorterSpillWriter writer =
new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
while (numRecords > 0) {
int length = Platform.getInt(base, offset);
writer.write(base, offset + 4, length, 0);
offset += 4 + length + 8;
int length = UnsafeAlignedOffset.getSize(base, offset);
writer.write(base, offset + uaoSize, length, 0);
offset += uaoSize + length + 8;
numRecords--;
}
writer.close();
@ -530,13 +532,14 @@ public final class BytesToBytesMap extends MemoryConsumer {
private void updateAddressesAndSizes(final Object base, long offset) {
baseObject = base;
final int totalLength = Platform.getInt(base, offset);
offset += 4;
keyLength = Platform.getInt(base, offset);
offset += 4;
final int totalLength = UnsafeAlignedOffset.getSize(base, offset);
int uaoSize = UnsafeAlignedOffset.getUaoSize();
offset += uaoSize;
keyLength = UnsafeAlignedOffset.getSize(base, offset);
offset += uaoSize;
keyOffset = offset;
valueOffset = offset + keyLength;
valueLength = totalLength - keyLength - 4;
valueLength = totalLength - keyLength - uaoSize;
}
private Location with(int pos, int keyHashcode, boolean isDefined) {
@ -565,10 +568,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
this.isDefined = true;
this.memoryPage = null;
baseObject = base;
keyOffset = offset + 4;
keyLength = Platform.getInt(base, offset);
valueOffset = offset + 4 + keyLength;
valueLength = length - 4 - keyLength;
int uaoSize = UnsafeAlignedOffset.getUaoSize();
keyOffset = offset + uaoSize;
keyLength = UnsafeAlignedOffset.getSize(base, offset);
valueOffset = offset + uaoSize + keyLength;
valueLength = length - uaoSize - keyLength;
return this;
}
@ -699,9 +703,10 @@ public final class BytesToBytesMap extends MemoryConsumer {
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (value) (8 byte pointer to next value)
final long recordLength = 8 + klen + vlen + 8;
int uaoSize = UnsafeAlignedOffset.getUaoSize();
final long recordLength = (2 * uaoSize) + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + 4L)) {
if (!acquireNewPage(recordLength + uaoSize)) {
return false;
}
}
@ -710,9 +715,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
final Object base = currentPage.getBaseObject();
long offset = currentPage.getBaseOffset() + pageCursor;
final long recordOffset = offset;
Platform.putInt(base, offset, klen + vlen + 4);
Platform.putInt(base, offset + 4, klen);
offset += 8;
UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize);
UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen);
offset += (2 * uaoSize);
Platform.copyMemory(kbase, koff, base, offset, klen);
offset += klen;
Platform.copyMemory(vbase, voff, base, offset, vlen);
@ -722,7 +727,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
// --- Update bookkeeping data structures ----------------------------------------------------
offset = currentPage.getBaseOffset();
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
UnsafeAlignedOffset.putSize(base, offset, UnsafeAlignedOffset.getSize(base, offset) + 1);
pageCursor += recordLength;
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
@ -757,8 +762,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
return false;
}
dataPages.add(currentPage);
Platform.putInt(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
pageCursor = 4;
UnsafeAlignedOffset.putSize(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
pageCursor = UnsafeAlignedOffset.getUaoSize();
return true;
}

View file

@ -34,6 +34,7 @@ import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.TaskCompletionListener;
@ -392,14 +393,15 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
growPointerArrayIfNecessary();
int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 bytes to store the record length.
final int required = length + 4;
final int required = length + uaoSize;
acquireNewPageIfNecessary(required);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
Platform.putInt(base, pageCursor, length);
pageCursor += 4;
UnsafeAlignedOffset.putSize(base, pageCursor, length);
pageCursor += uaoSize;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
@ -418,15 +420,16 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
throws IOException {
growPointerArrayIfNecessary();
final int required = keyLen + valueLen + 4 + 4;
int uaoSize = UnsafeAlignedOffset.getUaoSize();
final int required = keyLen + valueLen + (2 * uaoSize);
acquireNewPageIfNecessary(required);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
Platform.putInt(base, pageCursor, keyLen + valueLen + 4);
pageCursor += 4;
Platform.putInt(base, pageCursor, keyLen);
pageCursor += 4;
UnsafeAlignedOffset.putSize(base, pageCursor, keyLen + valueLen + uaoSize);
pageCursor += uaoSize;
UnsafeAlignedOffset.putSize(base, pageCursor, keyLen);
pageCursor += uaoSize;
Platform.copyMemory(keyBase, keyOffset, base, pageCursor, keyLen);
pageCursor += keyLen;
Platform.copyMemory(valueBase, valueOffset, base, pageCursor, valueLen);

View file

@ -25,6 +25,7 @@ import org.apache.avro.reflect.Nullable;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.Sorter;
@ -56,11 +57,14 @@ public final class UnsafeInMemorySorter {
@Override
public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
final int prefixComparisonResult = prefixComparator.compare(r1.keyPrefix, r2.keyPrefix);
int uaoSize = UnsafeAlignedOffset.getUaoSize();
if (prefixComparisonResult == 0) {
final Object baseObject1 = memoryManager.getPage(r1.recordPointer);
final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + 4; // skip length
// skip length
final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + uaoSize;
final Object baseObject2 = memoryManager.getPage(r2.recordPointer);
final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + 4; // skip length
// skip length
final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + uaoSize;
return recordComparator.compare(baseObject1, baseOffset1, baseObject2, baseOffset2);
} else {
return prefixComparisonResult;
@ -282,9 +286,11 @@ public final class UnsafeInMemorySorter {
// This pointer points to a 4-byte record length, followed by the record's bytes
final long recordPointer = array.get(offset + position);
currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer);
int uaoSize = UnsafeAlignedOffset.getUaoSize();
baseObject = memoryManager.getPage(recordPointer);
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
recordLength = Platform.getInt(baseObject, baseOffset - 4);
// Skip over record length
baseOffset = memoryManager.getOffsetInPage(recordPointer) + uaoSize;
recordLength = UnsafeAlignedOffset.getSize(baseObject, baseOffset - uaoSize);
keyPrefix = array.get(offset + position + 1);
position += 2;
}

View file

@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.columnar.{ColumnBuilder, NativeColumnBuilder}
import org.apache.spark.sql.types.AtomicType
import org.apache.spark.unsafe.Platform
/**
* A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of
@ -61,8 +62,12 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType]
super.initialize(initialSize, columnName, useCompression)
}
// The various compression schemes, while saving memory use, cause all of the data within
// the row to become unaligned, thus causing crashes. Until a way of fixing the compression
// is found to also allow aligned accesses this must be disabled for SPARC.
protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
CompressibleColumnBuilder.unaligned && encoder.compressionRatio < 0.8
}
private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
@ -103,3 +108,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType]
encoder.compress(nonNullBuffer, compressedBuffer)
}
}
private[columnar] object CompressibleColumnBuilder {
val unaligned = Platform.unaligned()
}