Revert [SPARK-10399] [SPARK-23879] [SPARK-23762] [SPARK-25317]

## What changes were proposed in this pull request?

When running TPC-DS benchmarks on 2.4 release, npoggi and winglungngai  saw more than 10% performance regression on the following queries: q67, q24a and q24b. After we applying the PR https://github.com/apache/spark/pull/22338, the performance regression still exists. If we revert the changes in https://github.com/apache/spark/pull/19222, npoggi and winglungngai  found the performance regression was resolved. Thus, this PR is to revert the related changes for unblocking the 2.4 release.

In the future release, we still can continue the investigation and find out the root cause of the regression.

## How was this patch tested?

The existing test cases

Closes #22361 from gatorsmile/revertMemoryBlock.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
gatorsmile 2018-09-09 21:25:19 +08:00 committed by Wenchen Fan
parent 1cfda44825
commit 0b9ccd55c2
40 changed files with 377 additions and 1071 deletions

View file

@ -17,8 +17,7 @@
package org.apache.spark.sql.catalyst.expressions;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.Platform;
/**
* Simulates Hive's hashing function from Hive v1.2.1
@ -39,21 +38,12 @@ public class HiveHasher {
return (int) ((input >>> 32) ^ input);
}
public static int hashUnsafeBytesBlock(MemoryBlock mb) {
long lengthInBytes = mb.size();
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int result = 0;
for (long i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) mb.getByte(i);
for (int i = 0; i < lengthInBytes; i++) {
result = (result * 31) + (int) Platform.getByte(base, offset + i);
}
return result;
}
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
}
public static int hashUTF8String(UTF8String str) {
return hashUnsafeBytesBlock(str.getMemoryBlock());
}
}

View file

@ -18,7 +18,6 @@
package org.apache.spark.unsafe.array;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
public class ByteArrayMethods {
@ -53,25 +52,15 @@ public class ByteArrayMethods {
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
private static final boolean unaligned = Platform.unaligned();
/**
* MemoryBlock equality check for MemoryBlocks.
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEqualsBlock(
MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, long length) {
return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
}
/**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
*/
public static boolean arrayEquals(
Object leftBase, long leftOffset, Object rightBase, long rightOffset, long length) {
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
int i = 0;
// check if starts align and we can get both offsets to be aligned
// check if stars align and we can get both offsets to be aligned
if ((leftOffset % 8) == (rightOffset % 8)) {
while ((leftOffset + i) % 8 != 0 && i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=

View file

@ -17,6 +17,7 @@
package org.apache.spark.unsafe.array;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
/**
@ -32,12 +33,16 @@ public final class LongArray {
private static final long WIDTH = 8;
private final MemoryBlock memory;
private final Object baseObj;
private final long baseOffset;
private final long length;
public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();
this.length = memory.size() / WIDTH;
}
@ -46,11 +51,11 @@ public final class LongArray {
}
public Object getBaseObject() {
return memory.getBaseObject();
return baseObj;
}
public long getBaseOffset() {
return memory.getBaseOffset();
return baseOffset;
}
/**
@ -64,8 +69,8 @@ public final class LongArray {
* Fill this all with 0L.
*/
public void zeroOut() {
for (long off = 0; off < length * WIDTH; off += WIDTH) {
memory.putLong(off, 0);
for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
Platform.putLong(baseObj, off, 0);
}
}
@ -75,7 +80,7 @@ public final class LongArray {
public void set(int index, long value) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
memory.putLong(index * WIDTH, value);
Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
}
/**
@ -84,6 +89,6 @@ public final class LongArray {
public long get(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
return memory.getLong(index * WIDTH);
return Platform.getLong(baseObj, baseOffset + index * WIDTH);
}
}

View file

@ -17,11 +17,7 @@
package org.apache.spark.unsafe.hash;
import com.google.common.primitives.Ints;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
@ -53,74 +49,49 @@ public final class Murmur3_x86_32 {
}
public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}
public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
int h1 = hashBytesByIntBlock(base, lengthInBytes, seed);
return fmix(h1, lengthInBytes);
return hashUnsafeWords(base, offset, lengthInBytes, seed);
}
public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
return fmix(h1, lengthInBytes);
}
public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
return hashUnsafeBytesBlock(base, Ints.checkedCast(base.size()), seed);
}
private static int hashUnsafeBytesBlock(MemoryBlock base, int lengthInBytes, int seed) {
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByIntBlock(base, lengthAligned, seed);
long offset = base.getBaseOffset();
Object o = base.getBaseObject();
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
int halfWord = Platform.getByte(o, offset + i);
int halfWord = Platform.getByte(base, offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}
public static int hashUTF8String(UTF8String str, int seed) {
return hashUnsafeBytesBlock(str.getMemoryBlock(), str.numBytes(), seed);
}
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}
public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}
public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
// This is compatible with original and other implementations.
// This is compatible with original and another implementations.
// Use this method for new components after Spark 2.3.
int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByIntBlock(base, lengthAligned, seed);
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
k1 ^= (base.getByte(i) & 0xFF) << shift;
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}
private static int hashBytesByIntBlock(MemoryBlock base, int lengthInBytes, int seed) {
private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
assert (lengthInBytes % 4 == 0);
int h1 = seed;
for (int i = 0; i < lengthInBytes; i += 4) {
int halfWord = base.getInt(i);
int halfWord = Platform.getInt(base, offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}

View file

@ -1,128 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.unsafe.memory;
import com.google.common.primitives.Ints;
import org.apache.spark.unsafe.Platform;
/**
* A consecutive block of memory with a byte array on Java heap.
*/
public final class ByteArrayMemoryBlock extends MemoryBlock {
private final byte[] array;
public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
super(obj, offset, size);
this.array = obj;
assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
"The sum of size " + size + " and offset " + offset + " should not be larger than " +
"the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
}
public ByteArrayMemoryBlock(long length) {
this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length);
}
@Override
public MemoryBlock subBlock(long offset, long size) {
checkSubBlockRange(offset, size);
if (offset == 0 && size == this.size()) return this;
return new ByteArrayMemoryBlock(array, this.offset + offset, size);
}
public byte[] getByteArray() { return array; }
/**
* Creates a memory block pointing to the memory used by the byte array.
*/
public static ByteArrayMemoryBlock fromArray(final byte[] array) {
return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
}
@Override
public int getInt(long offset) {
return Platform.getInt(array, this.offset + offset);
}
@Override
public void putInt(long offset, int value) {
Platform.putInt(array, this.offset + offset, value);
}
@Override
public boolean getBoolean(long offset) {
return Platform.getBoolean(array, this.offset + offset);
}
@Override
public void putBoolean(long offset, boolean value) {
Platform.putBoolean(array, this.offset + offset, value);
}
@Override
public byte getByte(long offset) {
return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
}
@Override
public void putByte(long offset, byte value) {
array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value;
}
@Override
public short getShort(long offset) {
return Platform.getShort(array, this.offset + offset);
}
@Override
public void putShort(long offset, short value) {
Platform.putShort(array, this.offset + offset, value);
}
@Override
public long getLong(long offset) {
return Platform.getLong(array, this.offset + offset);
}
@Override
public void putLong(long offset, long value) {
Platform.putLong(array, this.offset + offset, value);
}
@Override
public float getFloat(long offset) {
return Platform.getFloat(array, this.offset + offset);
}
@Override
public void putFloat(long offset, float value) {
Platform.putFloat(array, this.offset + offset, value);
}
@Override
public double getDouble(long offset) {
return Platform.getDouble(array, this.offset + offset);
}
@Override
public void putDouble(long offset, double value) {
Platform.putDouble(array, this.offset + offset, value);
}
}

View file

@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.spark.unsafe.Platform;
/**
* A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
*/
@ -56,7 +58,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
final long[] array = arrayReference.get();
if (array != null) {
assert (array.length * 8L >= size);
MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@ -68,7 +70,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
}
}
long[] array = new long[numWords];
MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@ -77,13 +79,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
@Override
public void free(MemoryBlock memory) {
assert(memory instanceof OnHeapMemoryBlock);
assert (memory.getBaseObject() != null) :
assert (memory.obj != null) :
"baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
"free()";
@ -93,12 +94,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
}
// Mark the page as freed (so we can detect double-frees).
memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to null out its reference to the long[] array.
long[] array = ((OnHeapMemoryBlock)memory).getLongArray();
memory.resetObjAndOffset();
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {

View file

@ -38,7 +38,7 @@ public interface MemoryAllocator {
void free(MemoryBlock memory);
UnsafeMemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
HeapMemoryAllocator HEAP = new HeapMemoryAllocator();
MemoryAllocator HEAP = new HeapMemoryAllocator();
}

View file

@ -22,10 +22,10 @@ import javax.annotation.Nullable;
import org.apache.spark.unsafe.Platform;
/**
* A representation of a consecutive memory block in Spark. It defines the common interfaces
* for memory accessing and mutating.
* A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
*/
public abstract class MemoryBlock {
public class MemoryBlock extends MemoryLocation {
/** Special `pageNumber` value for pages which were not allocated by TaskMemoryManagers */
public static final int NO_PAGE_NUMBER = -1;
@ -45,163 +45,38 @@ public abstract class MemoryBlock {
*/
public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
@Nullable
protected Object obj;
protected long offset;
protected long length;
private final long length;
/**
* Optional page number; used when this MemoryBlock represents a page allocated by a
* TaskMemoryManager. This field can be updated using setPageNumber method so that
* this can be modified by the TaskMemoryManager, which lives in a different package.
* TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
* which lives in a different package.
*/
private int pageNumber = NO_PAGE_NUMBER;
public int pageNumber = NO_PAGE_NUMBER;
protected MemoryBlock(@Nullable Object obj, long offset, long length) {
if (offset < 0 || length < 0) {
throw new IllegalArgumentException(
"Length " + length + " and offset " + offset + "must be non-negative");
}
this.obj = obj;
this.offset = offset;
public MemoryBlock(@Nullable Object obj, long offset, long length) {
super(obj, offset);
this.length = length;
}
protected MemoryBlock() {
this(null, 0, 0);
}
public final Object getBaseObject() {
return obj;
}
public final long getBaseOffset() {
return offset;
}
public void resetObjAndOffset() {
this.obj = null;
this.offset = 0;
}
/**
* Returns the size of the memory block.
*/
public final long size() {
public long size() {
return length;
}
public final void setPageNumber(int pageNum) {
pageNumber = pageNum;
}
public final int getPageNumber() {
return pageNumber;
/**
* Creates a memory block pointing to the memory used by the long array.
*/
public static MemoryBlock fromLongArray(final long[] array) {
return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
}
/**
* Fills the memory block with the specified byte value.
*/
public final void fill(byte value) {
public void fill(byte value) {
Platform.setMemory(obj, offset, length, value);
}
/**
* Instantiate MemoryBlock for given object type with new offset
*/
public static final MemoryBlock allocateFromObject(Object obj, long offset, long length) {
MemoryBlock mb = null;
if (obj instanceof byte[]) {
byte[] array = (byte[])obj;
mb = new ByteArrayMemoryBlock(array, offset, length);
} else if (obj instanceof long[]) {
long[] array = (long[])obj;
mb = new OnHeapMemoryBlock(array, offset, length);
} else if (obj == null) {
// we assume that to pass null pointer means off-heap
mb = new OffHeapMemoryBlock(offset, length);
} else {
throw new UnsupportedOperationException(
"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now");
}
return mb;
}
/**
* Just instantiate the sub-block with the same type of MemoryBlock with the new size and relative
* offset from the original offset. The data is not copied.
* If parameters are invalid, an exception is thrown.
*/
public abstract MemoryBlock subBlock(long offset, long size);
protected void checkSubBlockRange(long offset, long size) {
if (offset < 0 || size < 0) {
throw new ArrayIndexOutOfBoundsException(
"Size " + size + " and offset " + offset + " must be non-negative");
}
if (offset + size > length) {
throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " +
offset + " should not be larger than the length " + length + " in the MemoryBlock");
}
}
/**
* getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal
* memory access, throw an exception, or etc.
* getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as
* JVM object header. The offset is 0-based and is expected as an logical offset in the memory
* block.
*/
public abstract int getInt(long offset);
public abstract void putInt(long offset, int value);
public abstract boolean getBoolean(long offset);
public abstract void putBoolean(long offset, boolean value);
public abstract byte getByte(long offset);
public abstract void putByte(long offset, byte value);
public abstract short getShort(long offset);
public abstract void putShort(long offset, short value);
public abstract long getLong(long offset);
public abstract void putLong(long offset, long value);
public abstract float getFloat(long offset);
public abstract void putFloat(long offset, float value);
public abstract double getDouble(long offset);
public abstract void putDouble(long offset, double value);
public static final void copyMemory(
MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) {
assert(srcOffset + length <= src.length && dstOffset + length <= dst.length);
Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset,
dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length);
}
public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) {
assert(length <= src.length && length <= dst.length);
Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(),
dst.getBaseObject(), dst.getBaseOffset(), length);
}
public final void copyFrom(Object src, long srcOffset, long dstOffset, long length) {
assert(length <= this.length - srcOffset);
Platform.copyMemory(src, srcOffset, obj, offset + dstOffset, length);
}
public final void writeTo(long srcOffset, Object dst, long dstOffset, long length) {
assert(length <= this.length - srcOffset);
Platform.copyMemory(obj, offset + srcOffset, dst, dstOffset, length);
}
}

View file

@ -15,28 +15,40 @@
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions.codegen
package org.apache.spark.unsafe.memory;
import org.apache.spark.SparkFunSuite
import org.apache.spark.unsafe.types.UTF8String
import javax.annotation.Nullable;
class UTF8StringBuilderSuite extends SparkFunSuite {
/**
* A memory location. Tracked either by a memory address (with off-heap allocation),
* or by an offset from a JVM object (in-heap allocation).
*/
public class MemoryLocation {
test("basic test") {
val sb = new UTF8StringBuilder()
assert(sb.build() === UTF8String.EMPTY_UTF8)
@Nullable
Object obj;
sb.append("")
assert(sb.build() === UTF8String.EMPTY_UTF8)
long offset;
sb.append("abcd")
assert(sb.build() === UTF8String.fromString("abcd"))
public MemoryLocation(@Nullable Object obj, long offset) {
this.obj = obj;
this.offset = offset;
}
sb.append(UTF8String.fromString("1234"))
assert(sb.build() === UTF8String.fromString("abcd1234"))
public MemoryLocation() {
this(null, 0);
}
// expect to grow an internal buffer
sb.append(UTF8String.fromString("efgijk567890"))
assert(sb.build() === UTF8String.fromString("abcd1234efgijk567890"))
public void setObjAndOffset(Object newObj, long newOffset) {
this.obj = newObj;
this.offset = newOffset;
}
public final Object getBaseObject() {
return obj;
}
public final long getBaseOffset() {
return offset;
}
}

View file

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.unsafe.memory;
import org.apache.spark.unsafe.Platform;
public class OffHeapMemoryBlock extends MemoryBlock {
public static final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 0);
public OffHeapMemoryBlock(long address, long size) {
super(null, address, size);
}
@Override
public MemoryBlock subBlock(long offset, long size) {
checkSubBlockRange(offset, size);
if (offset == 0 && size == this.size()) return this;
return new OffHeapMemoryBlock(this.offset + offset, size);
}
@Override
public final int getInt(long offset) {
return Platform.getInt(null, this.offset + offset);
}
@Override
public final void putInt(long offset, int value) {
Platform.putInt(null, this.offset + offset, value);
}
@Override
public final boolean getBoolean(long offset) {
return Platform.getBoolean(null, this.offset + offset);
}
@Override
public final void putBoolean(long offset, boolean value) {
Platform.putBoolean(null, this.offset + offset, value);
}
@Override
public final byte getByte(long offset) {
return Platform.getByte(null, this.offset + offset);
}
@Override
public final void putByte(long offset, byte value) {
Platform.putByte(null, this.offset + offset, value);
}
@Override
public final short getShort(long offset) {
return Platform.getShort(null, this.offset + offset);
}
@Override
public final void putShort(long offset, short value) {
Platform.putShort(null, this.offset + offset, value);
}
@Override
public final long getLong(long offset) {
return Platform.getLong(null, this.offset + offset);
}
@Override
public final void putLong(long offset, long value) {
Platform.putLong(null, this.offset + offset, value);
}
@Override
public final float getFloat(long offset) {
return Platform.getFloat(null, this.offset + offset);
}
@Override
public final void putFloat(long offset, float value) {
Platform.putFloat(null, this.offset + offset, value);
}
@Override
public final double getDouble(long offset) {
return Platform.getDouble(null, this.offset + offset);
}
@Override
public final void putDouble(long offset, double value) {
Platform.putDouble(null, this.offset + offset, value);
}
}

View file

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.unsafe.memory;
import com.google.common.primitives.Ints;
import org.apache.spark.unsafe.Platform;
/**
* A consecutive block of memory with a long array on Java heap.
*/
public final class OnHeapMemoryBlock extends MemoryBlock {
private final long[] array;
public OnHeapMemoryBlock(long[] obj, long offset, long size) {
super(obj, offset, size);
this.array = obj;
assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) :
"The sum of size " + size + " and offset " + offset + " should not be larger than " +
"the size of the given memory space " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET);
}
public OnHeapMemoryBlock(long size) {
this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size);
}
@Override
public MemoryBlock subBlock(long offset, long size) {
checkSubBlockRange(offset, size);
if (offset == 0 && size == this.size()) return this;
return new OnHeapMemoryBlock(array, this.offset + offset, size);
}
public long[] getLongArray() { return array; }
/**
* Creates a memory block pointing to the memory used by the long array.
*/
public static OnHeapMemoryBlock fromArray(final long[] array) {
return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
}
public static OnHeapMemoryBlock fromArray(final long[] array, long size) {
return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
}
@Override
public int getInt(long offset) {
return Platform.getInt(array, this.offset + offset);
}
@Override
public void putInt(long offset, int value) {
Platform.putInt(array, this.offset + offset, value);
}
@Override
public boolean getBoolean(long offset) {
return Platform.getBoolean(array, this.offset + offset);
}
@Override
public void putBoolean(long offset, boolean value) {
Platform.putBoolean(array, this.offset + offset, value);
}
@Override
public byte getByte(long offset) {
return Platform.getByte(array, this.offset + offset);
}
@Override
public void putByte(long offset, byte value) {
Platform.putByte(array, this.offset + offset, value);
}
@Override
public short getShort(long offset) {
return Platform.getShort(array, this.offset + offset);
}
@Override
public void putShort(long offset, short value) {
Platform.putShort(array, this.offset + offset, value);
}
@Override
public long getLong(long offset) {
return Platform.getLong(array, this.offset + offset);
}
@Override
public void putLong(long offset, long value) {
Platform.putLong(array, this.offset + offset, value);
}
@Override
public float getFloat(long offset) {
return Platform.getFloat(array, this.offset + offset);
}
@Override
public void putFloat(long offset, float value) {
Platform.putFloat(array, this.offset + offset, value);
}
@Override
public double getDouble(long offset) {
return Platform.getDouble(array, this.offset + offset);
}
@Override
public void putDouble(long offset, double value) {
Platform.putDouble(array, this.offset + offset, value);
}
}

View file

@ -25,9 +25,9 @@ import org.apache.spark.unsafe.Platform;
public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
public OffHeapMemoryBlock allocate(long size) throws OutOfMemoryError {
public MemoryBlock allocate(long size) throws OutOfMemoryError {
long address = Platform.allocateMemory(size);
OffHeapMemoryBlock memory = new OffHeapMemoryBlock(address, size);
MemoryBlock memory = new MemoryBlock(null, address, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@ -36,25 +36,22 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
public void free(MemoryBlock memory) {
assert(memory instanceof OffHeapMemoryBlock) :
"UnsafeMemoryAllocator can only free OffHeapMemoryBlock.";
if (memory == OffHeapMemoryBlock.NULL) return;
assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
assert (memory.obj == null) :
"baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
Platform.freeMemory(memory.offset);
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to reset its pointer.
memory.resetObjAndOffset();
memory.offset = 0;
// Mark the page as freed (so we can detect double-frees).
memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
}
}

View file

@ -34,8 +34,6 @@ import com.google.common.primitives.Ints;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
import org.apache.spark.unsafe.memory.MemoryBlock;
import static org.apache.spark.unsafe.Platform.*;
@ -53,13 +51,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
// These are only updated by readExternal() or read()
@Nonnull
private MemoryBlock base;
// While numBytes has the same value as base.size(), to keep as int avoids cast from long to int
private Object base;
private long offset;
private int numBytes;
public MemoryBlock getMemoryBlock() { return base; }
public Object getBaseObject() { return base.getBaseObject(); }
public long getBaseOffset() { return base.getBaseOffset(); }
public Object getBaseObject() { return base; }
public long getBaseOffset() { return offset; }
/**
* A char in UTF-8 encoding can take 1-4 bytes depending on the first byte which
@ -112,8 +109,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public static UTF8String fromBytes(byte[] bytes) {
if (bytes != null) {
return new UTF8String(
new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET, bytes.length));
return new UTF8String(bytes, BYTE_ARRAY_OFFSET, bytes.length);
} else {
return null;
}
@ -126,13 +122,19 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public static UTF8String fromBytes(byte[] bytes, int offset, int numBytes) {
if (bytes != null) {
return new UTF8String(
new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET + offset, numBytes));
return new UTF8String(bytes, BYTE_ARRAY_OFFSET + offset, numBytes);
} else {
return null;
}
}
/**
* Creates an UTF8String from given address (base and offset) and length.
*/
public static UTF8String fromAddress(Object base, long offset, int numBytes) {
return new UTF8String(base, offset, numBytes);
}
/**
* Creates an UTF8String from String.
*/
@ -149,13 +151,16 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
return fromBytes(spaces);
}
public UTF8String(MemoryBlock base) {
protected UTF8String(Object base, long offset, int numBytes) {
this.base = base;
this.numBytes = Ints.checkedCast(base.size());
this.offset = offset;
this.numBytes = numBytes;
}
// for serialization
public UTF8String() {}
public UTF8String() {
this(null, 0, 0);
}
/**
* Writes the content of this string into a memory address, identified by an object and an offset.
@ -163,7 +168,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
* bytes in this string.
*/
public void writeToMemory(Object target, long targetOffset) {
base.writeTo(0, target, targetOffset, numBytes);
Platform.copyMemory(base, offset, target, targetOffset, numBytes);
}
public void writeTo(ByteBuffer buffer) {
@ -183,9 +188,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
@Nonnull
public ByteBuffer getByteBuffer() {
long offset = base.getBaseOffset();
if (base instanceof ByteArrayMemoryBlock && offset >= BYTE_ARRAY_OFFSET) {
final byte[] bytes = ((ByteArrayMemoryBlock) base).getByteArray();
if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
final byte[] bytes = (byte[]) base;
// the offset includes an object header... this is only needed for unsafe copies
final long arrayOffset = offset - BYTE_ARRAY_OFFSET;
@ -252,12 +256,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
long mask = 0;
if (IS_LITTLE_ENDIAN) {
if (numBytes >= 8) {
p = base.getLong(0);
p = Platform.getLong(base, offset);
} else if (numBytes > 4) {
p = base.getLong(0);
p = Platform.getLong(base, offset);
mask = (1L << (8 - numBytes) * 8) - 1;
} else if (numBytes > 0) {
p = (long) base.getInt(0);
p = (long) Platform.getInt(base, offset);
mask = (1L << (8 - numBytes) * 8) - 1;
} else {
p = 0;
@ -266,12 +270,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
} else {
// byteOrder == ByteOrder.BIG_ENDIAN
if (numBytes >= 8) {
p = base.getLong(0);
p = Platform.getLong(base, offset);
} else if (numBytes > 4) {
p = base.getLong(0);
p = Platform.getLong(base, offset);
mask = (1L << (8 - numBytes) * 8) - 1;
} else if (numBytes > 0) {
p = ((long) base.getInt(0)) << 32;
p = ((long) Platform.getInt(base, offset)) << 32;
mask = (1L << (8 - numBytes) * 8) - 1;
} else {
p = 0;
@ -286,13 +290,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public byte[] getBytes() {
// avoid copy if `base` is `byte[]`
long offset = base.getBaseOffset();
if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock
&& (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) {
return ((ByteArrayMemoryBlock) base).getByteArray();
if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[]
&& ((byte[]) base).length == numBytes) {
return (byte[]) base;
} else {
byte[] bytes = new byte[numBytes];
base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
return bytes;
}
}
@ -322,7 +325,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (i > j) {
byte[] bytes = new byte[i - j];
base.writeTo(j, bytes, BYTE_ARRAY_OFFSET, i - j);
copyMemory(base, offset + j, bytes, BYTE_ARRAY_OFFSET, i - j);
return fromBytes(bytes);
} else {
return EMPTY_UTF8;
@ -363,14 +366,14 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
* Returns the byte at position `i`.
*/
private byte getByte(int i) {
return base.getByte(i);
return Platform.getByte(base, offset + i);
}
private boolean matchAt(final UTF8String s, int pos) {
if (s.numBytes + pos > numBytes || pos < 0) {
return false;
}
return ByteArrayMethods.arrayEqualsBlock(base, pos, s.base, 0, s.numBytes);
return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, s.numBytes);
}
public boolean startsWith(final UTF8String prefix) {
@ -497,7 +500,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
for (int i = 0; i < numBytes; i++) {
if (getByte(i) == (byte) ',') {
if (i - (lastComma + 1) == match.numBytes &&
ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
match.numBytes)) {
return n;
}
lastComma = i;
@ -505,7 +509,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
}
if (numBytes - (lastComma + 1) == match.numBytes &&
ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
match.numBytes)) {
return n;
}
return 0;
@ -520,7 +525,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private UTF8String copyUTF8String(int start, int end) {
int len = end - start + 1;
byte[] newBytes = new byte[len];
base.writeTo(start, newBytes, BYTE_ARRAY_OFFSET, len);
copyMemory(base, offset + start, newBytes, BYTE_ARRAY_OFFSET, len);
return UTF8String.fromBytes(newBytes);
}
@ -667,7 +672,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int i = 0; // position in byte
while (i < numBytes) {
int len = numBytesForFirstByte(getByte(i));
base.writeTo(i, result, BYTE_ARRAY_OFFSET + result.length - i - len, len);
copyMemory(this.base, this.offset + i, result,
BYTE_ARRAY_OFFSET + result.length - i - len, len);
i += len;
}
@ -681,7 +687,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
byte[] newBytes = new byte[numBytes * times];
base.writeTo(0, newBytes, BYTE_ARRAY_OFFSET, numBytes);
copyMemory(this.base, this.offset, newBytes, BYTE_ARRAY_OFFSET, numBytes);
int copied = 1;
while (copied < times) {
@ -718,7 +724,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (i + v.numBytes > numBytes) {
return -1;
}
if (ByteArrayMethods.arrayEqualsBlock(base, i, v.base, 0, v.numBytes)) {
if (ByteArrayMethods.arrayEquals(base, offset + i, v.base, v.offset, v.numBytes)) {
return c;
}
i += numBytesForFirstByte(getByte(i));
@ -734,7 +740,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private int find(UTF8String str, int start) {
assert (str.numBytes > 0);
while (start <= numBytes - str.numBytes) {
if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
return start;
}
start += 1;
@ -748,7 +754,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private int rfind(UTF8String str, int start) {
assert (str.numBytes > 0);
while (start >= 0) {
if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
return start;
}
start -= 1;
@ -781,7 +787,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
return EMPTY_UTF8;
}
byte[] bytes = new byte[idx];
base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, idx);
copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, idx);
return fromBytes(bytes);
} else {
@ -801,7 +807,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
int size = numBytes - delim.numBytes - idx;
byte[] bytes = new byte[size];
base.writeTo(idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
copyMemory(base, offset + idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
return fromBytes(bytes);
}
}
@ -824,15 +830,15 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
UTF8String remain = pad.substring(0, spaces - padChars * count);
byte[] data = new byte[this.numBytes + pad.numBytes * count + remain.numBytes];
base.writeTo(0, data, BYTE_ARRAY_OFFSET, this.numBytes);
copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET, this.numBytes);
int offset = this.numBytes;
int idx = 0;
while (idx < count) {
pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
++ idx;
offset += pad.numBytes;
}
remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
return UTF8String.fromBytes(data);
}
@ -860,13 +866,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int offset = 0;
int idx = 0;
while (idx < count) {
pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
++ idx;
offset += pad.numBytes;
}
remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
offset += remain.numBytes;
base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, numBytes());
copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET + offset, numBytes());
return UTF8String.fromBytes(data);
}
@ -891,8 +897,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int offset = 0;
for (int i = 0; i < inputs.length; i++) {
int len = inputs[i].numBytes;
inputs[i].base.writeTo(
0,
copyMemory(
inputs[i].base, inputs[i].offset,
result, BYTE_ARRAY_OFFSET + offset,
len);
offset += len;
@ -931,8 +937,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
for (int i = 0, j = 0; i < inputs.length; i++) {
if (inputs[i] != null) {
int len = inputs[i].numBytes;
inputs[i].base.writeTo(
0,
copyMemory(
inputs[i].base, inputs[i].offset,
result, BYTE_ARRAY_OFFSET + offset,
len);
offset += len;
@ -940,8 +946,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
j++;
// Add separator if this is not the last input.
if (j < numInputs) {
separator.base.writeTo(
0,
copyMemory(
separator.base, separator.offset,
result, BYTE_ARRAY_OFFSET + offset,
separator.numBytes);
offset += separator.numBytes;
@ -1215,7 +1221,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
public UTF8String copy() {
byte[] bytes = new byte[numBytes];
base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
return fromBytes(bytes);
}
@ -1223,10 +1229,11 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
public int compareTo(@Nonnull final UTF8String other) {
int len = Math.min(numBytes, other.numBytes);
int wordMax = (len / 8) * 8;
MemoryBlock rbase = other.base;
long roffset = other.offset;
Object rbase = other.base;
for (int i = 0; i < wordMax; i += 8) {
long left = base.getLong(i);
long right = rbase.getLong(i);
long left = getLong(base, offset + i);
long right = getLong(rbase, roffset + i);
if (left != right) {
if (IS_LITTLE_ENDIAN) {
return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right));
@ -1237,7 +1244,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
for (int i = wordMax; i < len; i++) {
// In UTF-8, the byte should be unsigned, so we should compare them as unsigned int.
int res = (getByte(i) & 0xFF) - (rbase.getByte(i) & 0xFF);
int res = (getByte(i) & 0xFF) - (Platform.getByte(rbase, roffset + i) & 0xFF);
if (res != 0) {
return res;
}
@ -1256,7 +1263,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (numBytes != o.numBytes) {
return false;
}
return ByteArrayMethods.arrayEqualsBlock(base, 0, o.base, 0, numBytes);
return ByteArrayMethods.arrayEquals(base, offset, o.base, o.offset, numBytes);
} else {
return false;
}
@ -1312,8 +1319,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
num_bytes_j != numBytesForFirstByte(s.getByte(i_bytes))) {
cost = 1;
} else {
cost = (ByteArrayMethods.arrayEqualsBlock(t.base, j_bytes, s.base,
i_bytes, num_bytes_j)) ? 0 : 1;
cost = (ByteArrayMethods.arrayEquals(t.base, t.offset + j_bytes, s.base,
s.offset + i_bytes, num_bytes_j)) ? 0 : 1;
}
d[i + 1] = Math.min(Math.min(d[i] + 1, p[i + 1] + 1), p[i] + cost);
}
@ -1328,7 +1335,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
@Override
public int hashCode() {
return Murmur3_x86_32.hashUnsafeBytesBlock(base,42);
return Murmur3_x86_32.hashUnsafeBytes(base, offset, numBytes, 42);
}
/**
@ -1391,10 +1398,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
offset = BYTE_ARRAY_OFFSET;
numBytes = in.readInt();
byte[] bytes = new byte[numBytes];
in.readFully(bytes);
base = ByteArrayMemoryBlock.fromArray(bytes);
base = new byte[numBytes];
in.readFully((byte[]) base);
}
@Override
@ -1406,10 +1413,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
@Override
public void read(Kryo kryo, Input in) {
numBytes = in.readInt();
byte[] bytes = new byte[numBytes];
in.read(bytes);
base = ByteArrayMemoryBlock.fromArray(bytes);
this.offset = BYTE_ARRAY_OFFSET;
this.numBytes = in.readInt();
this.base = new byte[numBytes];
in.read((byte[]) base);
}
}

View file

@ -81,7 +81,7 @@ public class PlatformUtilSuite {
MemoryAllocator.HEAP.free(block);
Assert.assertNull(block.getBaseObject());
Assert.assertEquals(0, block.getBaseOffset());
Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
}
@Test
@ -92,7 +92,7 @@ public class PlatformUtilSuite {
MemoryAllocator.UNSAFE.free(block);
Assert.assertNull(block.getBaseObject());
Assert.assertEquals(0, block.getBaseOffset());
Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
}
@Test(expected = AssertionError.class)

View file

@ -20,13 +20,14 @@ package org.apache.spark.unsafe.array;
import org.junit.Assert;
import org.junit.Test;
import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
import org.apache.spark.unsafe.memory.MemoryBlock;
public class LongArraySuite {
@Test
public void basicTest() {
LongArray arr = new LongArray(new OnHeapMemoryBlock(16));
long[] bytes = new long[2];
LongArray arr = new LongArray(MemoryBlock.fromLongArray(bytes));
arr.set(0, 1L);
arr.set(1, 2L);
arr.set(1, 3L);

View file

@ -70,24 +70,6 @@ public class Murmur3_x86_32Suite {
Murmur3_x86_32.hashUnsafeBytes2(tes, Platform.BYTE_ARRAY_OFFSET, tes.length, 0));
}
@Test
public void testKnownWordsInputs() {
byte[] bytes = new byte[16];
long offset = Platform.BYTE_ARRAY_OFFSET;
for (int i = 0; i < 16; i++) {
bytes[i] = 0;
}
Assert.assertEquals(-300363099, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
for (int i = 0; i < 16; i++) {
bytes[i] = -1;
}
Assert.assertEquals(-1210324667, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
for (int i = 0; i < 16; i++) {
bytes[i] = (byte)i;
}
Assert.assertEquals(-634919701, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
}
@Test
public void randomizedStressTest() {
int size = 65536;

View file

@ -1,179 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.unsafe.memory;
import org.apache.spark.unsafe.Platform;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteOrder;
import static org.hamcrest.core.StringContains.containsString;
public class MemoryBlockSuite {
private static final boolean bigEndianPlatform =
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
private void check(MemoryBlock memory, Object obj, long offset, int length) {
memory.setPageNumber(1);
memory.fill((byte)-1);
memory.putBoolean(0, true);
memory.putByte(1, (byte)127);
memory.putShort(2, (short)257);
memory.putInt(4, 0x20000002);
memory.putLong(8, 0x1234567089ABCDEFL);
memory.putFloat(16, 1.0F);
memory.putLong(20, 0x1234567089ABCDEFL);
memory.putDouble(28, 2.0);
MemoryBlock.copyMemory(memory, 0L, memory, 36, 4);
int[] a = new int[2];
a[0] = 0x12345678;
a[1] = 0x13579BDF;
memory.copyFrom(a, Platform.INT_ARRAY_OFFSET, 40, 8);
byte[] b = new byte[8];
memory.writeTo(40, b, Platform.BYTE_ARRAY_OFFSET, 8);
Assert.assertEquals(obj, memory.getBaseObject());
Assert.assertEquals(offset, memory.getBaseOffset());
Assert.assertEquals(length, memory.size());
Assert.assertEquals(1, memory.getPageNumber());
Assert.assertEquals(true, memory.getBoolean(0));
Assert.assertEquals((byte)127, memory.getByte(1 ));
Assert.assertEquals((short)257, memory.getShort(2));
Assert.assertEquals(0x20000002, memory.getInt(4));
Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(8));
Assert.assertEquals(1.0F, memory.getFloat(16), 0);
Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(20));
Assert.assertEquals(2.0, memory.getDouble(28), 0);
Assert.assertEquals(true, memory.getBoolean(36));
Assert.assertEquals((byte)127, memory.getByte(37 ));
Assert.assertEquals((short)257, memory.getShort(38));
Assert.assertEquals(a[0], memory.getInt(40));
Assert.assertEquals(a[1], memory.getInt(44));
if (bigEndianPlatform) {
Assert.assertEquals(a[0],
((int)b[0] & 0xff) << 24 | ((int)b[1] & 0xff) << 16 |
((int)b[2] & 0xff) << 8 | ((int)b[3] & 0xff));
Assert.assertEquals(a[1],
((int)b[4] & 0xff) << 24 | ((int)b[5] & 0xff) << 16 |
((int)b[6] & 0xff) << 8 | ((int)b[7] & 0xff));
} else {
Assert.assertEquals(a[0],
((int)b[3] & 0xff) << 24 | ((int)b[2] & 0xff) << 16 |
((int)b[1] & 0xff) << 8 | ((int)b[0] & 0xff));
Assert.assertEquals(a[1],
((int)b[7] & 0xff) << 24 | ((int)b[6] & 0xff) << 16 |
((int)b[5] & 0xff) << 8 | ((int)b[4] & 0xff));
}
for (int i = 48; i < memory.size(); i++) {
Assert.assertEquals((byte) -1, memory.getByte(i));
}
assert(memory.subBlock(0, memory.size()) == memory);
try {
memory.subBlock(-8, 8);
Assert.fail();
} catch (Exception expected) {
Assert.assertThat(expected.getMessage(), containsString("non-negative"));
}
try {
memory.subBlock(0, -8);
Assert.fail();
} catch (Exception expected) {
Assert.assertThat(expected.getMessage(), containsString("non-negative"));
}
try {
memory.subBlock(0, length + 8);
Assert.fail();
} catch (Exception expected) {
Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
}
try {
memory.subBlock(8, length - 4);
Assert.fail();
} catch (Exception expected) {
Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
}
try {
memory.subBlock(length + 8, 4);
Assert.fail();
} catch (Exception expected) {
Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
}
memory.setPageNumber(MemoryBlock.NO_PAGE_NUMBER);
}
@Test
public void testByteArrayMemoryBlock() {
byte[] obj = new byte[56];
long offset = Platform.BYTE_ARRAY_OFFSET;
int length = obj.length;
MemoryBlock memory = new ByteArrayMemoryBlock(obj, offset, length);
check(memory, obj, offset, length);
memory = ByteArrayMemoryBlock.fromArray(obj);
check(memory, obj, offset, length);
obj = new byte[112];
memory = new ByteArrayMemoryBlock(obj, offset, length);
check(memory, obj, offset, length);
}
@Test
public void testOnHeapMemoryBlock() {
long[] obj = new long[7];
long offset = Platform.LONG_ARRAY_OFFSET;
int length = obj.length * 8;
MemoryBlock memory = new OnHeapMemoryBlock(obj, offset, length);
check(memory, obj, offset, length);
memory = OnHeapMemoryBlock.fromArray(obj);
check(memory, obj, offset, length);
obj = new long[14];
memory = new OnHeapMemoryBlock(obj, offset, length);
check(memory, obj, offset, length);
}
@Test
public void testOffHeapArrayMemoryBlock() {
MemoryAllocator memoryAllocator = new UnsafeMemoryAllocator();
MemoryBlock memory = memoryAllocator.allocate(56);
Object obj = memory.getBaseObject();
long offset = memory.getBaseOffset();
int length = 56;
check(memory, obj, offset, length);
memoryAllocator.free(memory);
long address = Platform.allocateMemory(112);
memory = new OffHeapMemoryBlock(address, length);
obj = memory.getBaseObject();
offset = memory.getBaseOffset();
check(memory, obj, offset, length);
Platform.freeMemory(address);
}
}

View file

@ -25,8 +25,7 @@ import java.nio.charset.StandardCharsets;
import java.util.*;
import com.google.common.collect.ImmutableMap;
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
import org.apache.spark.unsafe.Platform;
import org.junit.Test;
import static org.junit.Assert.*;
@ -513,6 +512,21 @@ public class UTF8StringSuite {
assertEquals(fromString("世界千世").soundex(), fromString("世界千世"));
}
@Test
public void writeToOutputStreamUnderflow() throws IOException {
// offset underflow is apparently supported?
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)
.writeTo(outputStream);
final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
outputStream.reset();
}
}
@Test
public void writeToOutputStreamSlice() throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
@ -520,7 +534,7 @@ public class UTF8StringSuite {
for (int i = 0; i < test.length; ++i) {
for (int j = 0; j < test.length - i; ++j) {
new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(i, j))
UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET + i, j)
.writeTo(outputStream);
assertArrayEquals(Arrays.copyOfRange(test, i, i + j), outputStream.toByteArray());
@ -551,7 +565,7 @@ public class UTF8StringSuite {
for (final long offset : offsets) {
try {
new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(offset, test.length))
fromAddress(test, BYTE_ARRAY_OFFSET + offset, test.length)
.writeTo(outputStream);
throw new IllegalStateException(Long.toString(offset));
@ -578,25 +592,26 @@ public class UTF8StringSuite {
}
@Test
public void writeToOutputStreamLongArray() throws IOException {
public void writeToOutputStreamIntArray() throws IOException {
// verify that writes work on objects that are not byte arrays
final ByteBuffer buffer = StandardCharsets.UTF_8.encode("3千大千世界");
final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大千世界");
buffer.position(0);
buffer.order(ByteOrder.nativeOrder());
final int length = buffer.limit();
assertEquals(16, length);
assertEquals(12, length);
final int longs = length / 8;
final long[] array = new long[longs];
final int ints = length / 4;
final int[] array = new int[ints];
for (int i = 0; i < longs; ++i) {
array[i] = buffer.getLong();
for (int i = 0; i < ints; ++i) {
array[i] = buffer.getInt();
}
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
new UTF8String(OnHeapMemoryBlock.fromArray(array)).writeTo(outputStream);
assertEquals("3千大千世界", outputStream.toString("UTF-8"));
fromAddress(array, Platform.INT_ARRAY_OFFSET, length)
.writeTo(outputStream);
assertEquals("大千世界", outputStream.toString("UTF-8"));
}
@Test

View file

@ -311,7 +311,7 @@ public class TaskMemoryManager {
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.setPageNumber(pageNumber);
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
@ -323,25 +323,25 @@ public class TaskMemoryManager {
* Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}.
*/
public void freePage(MemoryBlock page, MemoryConsumer consumer) {
assert (page.getPageNumber() != MemoryBlock.NO_PAGE_NUMBER) :
assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
assert (page.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
assert (page.getPageNumber() != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
assert(allocatedPages.get(page.getPageNumber()));
pageTable[page.getPageNumber()] = null;
assert(allocatedPages.get(page.pageNumber));
pageTable[page.pageNumber] = null;
synchronized (this) {
allocatedPages.clear(page.getPageNumber());
allocatedPages.clear(page.pageNumber);
}
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.getPageNumber(), page.size());
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
long pageSize = page.size();
// Clear the page number before passing the block to the MemoryAllocator's free().
// Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
// page has been inappropriately directly freed without calling TMM.freePage().
page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER);
page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize, consumer);
}
@ -363,7 +363,7 @@ public class TaskMemoryManager {
// relative to the page's base offset; this relative offset will fit in 51 bits.
offsetInPage -= page.getBaseOffset();
}
return encodePageNumberAndOffset(page.getPageNumber(), offsetInPage);
return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}
@VisibleForTesting
@ -434,7 +434,7 @@ public class TaskMemoryManager {
for (MemoryBlock page : pageTable) {
if (page != null) {
logger.debug("unreleased page: " + page + " in task " + taskAttemptId);
page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER);
page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
memoryManager.tungstenMemoryAllocator().free(page);
}
}

View file

@ -20,6 +20,7 @@ package org.apache.spark.shuffle.sort;
import java.util.Comparator;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.Sorter;
@ -112,7 +113,13 @@ final class ShuffleInMemorySorter {
public void expandPointerArray(LongArray newArray) {
assert(newArray.size() > array.size());
MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L);
Platform.copyMemory(
array.getBaseObject(),
array.getBaseOffset(),
newArray.getBaseObject(),
newArray.getBaseOffset(),
pos * 8L
);
consumer.freeArray(array);
array = newArray;
usableCapacity = getUsableCapacity();
@ -181,7 +188,10 @@ final class ShuffleInMemorySorter {
PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
} else {
MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos) * 8L);
MemoryBlock unused = new MemoryBlock(
array.getBaseObject(),
array.getBaseOffset() + pos * 8L,
(array.size() - pos) * 8L);
LongArray buffer = new LongArray(unused);
Sorter<PackedRecordPointer, LongArray> sorter =
new Sorter<>(new ShuffleSortDataFormat(buffer));

View file

@ -17,8 +17,8 @@
package org.apache.spark.shuffle.sort;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.SortDataFormat;
final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, LongArray> {
@ -60,8 +60,13 @@ final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, Lo
@Override
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
MemoryBlock.copyMemory(src.memoryBlock(), srcPos * 8L,
dst.memoryBlock(),dstPos * 8L,length * 8L);
Platform.copyMemory(
src.getBaseObject(),
src.getBaseOffset() + srcPos * 8L,
dst.getBaseObject(),
dst.getBaseOffset() + dstPos * 8L,
length * 8L
);
}
@Override

View file

@ -544,7 +544,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// is accessing the current record. We free this page in that caller's next loadNext()
// call.
for (MemoryBlock page : allocatedPages) {
if (!loaded || page.getPageNumber() !=
if (!loaded || page.pageNumber !=
((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
released += page.size();
freePage(page);

View file

@ -26,6 +26,7 @@ import org.apache.spark.TaskContext;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
@ -215,7 +216,12 @@ public final class UnsafeInMemorySorter {
if (newArray.size() < array.size()) {
throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
}
MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L);
Platform.copyMemory(
array.getBaseObject(),
array.getBaseOffset(),
newArray.getBaseObject(),
newArray.getBaseOffset(),
pos * 8L);
consumer.freeArray(array);
array = newArray;
usableCapacity = getUsableCapacity();
@ -342,7 +348,10 @@ public final class UnsafeInMemorySorter {
array, nullBoundaryPos, (pos - nullBoundaryPos) / 2L, 0, 7,
radixSortSupport.sortDescending(), radixSortSupport.sortSigned());
} else {
MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos) * 8L);
MemoryBlock unused = new MemoryBlock(
array.getBaseObject(),
array.getBaseOffset() + pos * 8L,
(array.size() - pos) * 8L);
LongArray buffer = new LongArray(unused);
Sorter<RecordPointerAndKeyPrefix, LongArray> sorter =
new Sorter<>(new UnsafeSortDataFormat(buffer));

View file

@ -76,7 +76,7 @@ public class TaskMemoryManagerSuite {
final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
final MemoryBlock dataPage = manager.allocatePage(256, c);
c.freePage(dataPage);
Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.getPageNumber());
Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.pageNumber);
}
@Test(expected = AssertionError.class)

View file

@ -26,7 +26,7 @@ import org.apache.spark._
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.unsafe.array.LongArray
import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat}
class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
@ -105,8 +105,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
// the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999]
// that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi()
val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i }
val buf = new LongArray(OnHeapMemoryBlock.fromArray(ref))
val tmpBuf = new LongArray(new OnHeapMemoryBlock((size/2) * 8L))
val buf = new LongArray(MemoryBlock.fromLongArray(ref))
val tmp = new Array[Long](size/2)
val tmpBuf = new LongArray(MemoryBlock.fromLongArray(tmp))
new Sorter(new UnsafeSortDataFormat(tmpBuf)).sort(
buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {

View file

@ -27,7 +27,7 @@ import com.google.common.primitives.Ints
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.unsafe.array.LongArray
import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.collection.Sorter
import org.apache.spark.util.random.XORShiftRandom
@ -78,14 +78,14 @@ class RadixSortSuite extends SparkFunSuite with Logging {
private def generateTestData(size: Long, rand: => Long): (Array[JLong], LongArray) = {
val ref = Array.tabulate[Long](Ints.checkedCast(size)) { i => rand }
val extended = ref ++ Array.fill[Long](Ints.checkedCast(size))(0)
(ref.map(i => new JLong(i)), new LongArray(OnHeapMemoryBlock.fromArray(extended)))
(ref.map(i => new JLong(i)), new LongArray(MemoryBlock.fromLongArray(extended)))
}
private def generateKeyPrefixTestData(size: Long, rand: => Long): (LongArray, LongArray) = {
val ref = Array.tabulate[Long](Ints.checkedCast(size * 2)) { i => rand }
val extended = ref ++ Array.fill[Long](Ints.checkedCast(size * 2))(0)
(new LongArray(OnHeapMemoryBlock.fromArray(ref)),
new LongArray(OnHeapMemoryBlock.fromArray(extended)))
(new LongArray(MemoryBlock.fromLongArray(ref)),
new LongArray(MemoryBlock.fromLongArray(extended)))
}
private def collectToArray(array: LongArray, offset: Int, length: Long): Array[Long] = {
@ -110,7 +110,7 @@ class RadixSortSuite extends SparkFunSuite with Logging {
}
private def referenceKeyPrefixSort(buf: LongArray, lo: Long, hi: Long, refCmp: PrefixComparator) {
val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L))
val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
buf, Ints.checkedCast(lo), Ints.checkedCast(hi), new Comparator[RecordPointerAndKeyPrefix] {
override def compare(

View file

@ -29,7 +29,7 @@ import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2Block}
import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.OpenHashMap
@ -244,7 +244,8 @@ object FeatureHasher extends DefaultParamsReadable[FeatureHasher] {
case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed)
case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
case s: String =>
hashUnsafeBytes2Block(UTF8String.fromString(s).getMemoryBlock, seed)
val utf8 = UTF8String.fromString(s)
hashUnsafeBytes2(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed)
case _ => throw new SparkException("FeatureHasher with murmur3 algorithm does not " +
s"support type ${term.getClass.getCanonicalName} of input data.")
}

View file

@ -160,7 +160,7 @@ object HashingTF {
case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
case s: String =>
val utf8 = UTF8String.fromString(s)
hashUnsafeBytesBlock(utf8.getMemoryBlock(), seed)
hashUnsafeBytes(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed)
case _ => throw new SparkException("HashingTF with murmur3 algorithm does not " +
s"support type ${term.getClass.getCanonicalName} of input data.")
}

View file

@ -27,7 +27,6 @@ import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@ -241,8 +240,7 @@ public final class UnsafeArrayData extends ArrayData {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size);
return new UTF8String(mb);
return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
}
@Override

View file

@ -37,7 +37,6 @@ import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@ -417,8 +416,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size);
return new UTF8String(mb);
return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
}
@Override

View file

@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.catalyst.expressions;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.UTF8String;
// scalastyle: off
@ -72,13 +72,13 @@ public final class XXH64 {
return fmix(hash);
}
public long hashUnsafeWordsBlock(MemoryBlock mb) {
return hashUnsafeWordsBlock(mb, seed);
public long hashUnsafeWords(Object base, long offset, int length) {
return hashUnsafeWords(base, offset, length, seed);
}
public static long hashUnsafeWordsBlock(MemoryBlock mb, long seed) {
assert (mb.size() % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
long hash = hashBytesByWordsBlock(mb, seed);
public static long hashUnsafeWords(Object base, long offset, int length, long seed) {
assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
long hash = hashBytesByWords(base, offset, length, seed);
return fmix(hash);
}
@ -86,22 +86,20 @@ public final class XXH64 {
return hashUnsafeBytes(base, offset, length, seed);
}
public static long hashUnsafeBytesBlock(MemoryBlock mb, long seed) {
long offset = 0;
long length = mb.size();
public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
assert (length >= 0) : "lengthInBytes cannot be negative";
long hash = hashBytesByWordsBlock(mb, seed);
long hash = hashBytesByWords(base, offset, length, seed);
long end = offset + length;
offset += length & -8;
if (offset + 4L <= end) {
hash ^= (mb.getInt(offset) & 0xFFFFFFFFL) * PRIME64_1;
hash ^= (Platform.getInt(base, offset) & 0xFFFFFFFFL) * PRIME64_1;
hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
offset += 4L;
}
while (offset < end) {
hash ^= (mb.getByte(offset) & 0xFFL) * PRIME64_5;
hash ^= (Platform.getByte(base, offset) & 0xFFL) * PRIME64_5;
hash = Long.rotateLeft(hash, 11) * PRIME64_1;
offset++;
}
@ -109,11 +107,7 @@ public final class XXH64 {
}
public static long hashUTF8String(UTF8String str, long seed) {
return hashUnsafeBytesBlock(str.getMemoryBlock(), seed);
}
public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, length), seed);
return hashUnsafeBytes(str.getBaseObject(), str.getBaseOffset(), str.numBytes(), seed);
}
private static long fmix(long hash) {
@ -125,31 +119,30 @@ public final class XXH64 {
return hash;
}
private static long hashBytesByWordsBlock(MemoryBlock mb, long seed) {
long offset = 0;
long length = mb.size();
private static long hashBytesByWords(Object base, long offset, int length, long seed) {
long end = offset + length;
long hash;
if (length >= 32) {
long limit = length - 32;
long limit = end - 32;
long v1 = seed + PRIME64_1 + PRIME64_2;
long v2 = seed + PRIME64_2;
long v3 = seed;
long v4 = seed - PRIME64_1;
do {
v1 += mb.getLong(offset) * PRIME64_2;
v1 += Platform.getLong(base, offset) * PRIME64_2;
v1 = Long.rotateLeft(v1, 31);
v1 *= PRIME64_1;
v2 += mb.getLong(offset + 8) * PRIME64_2;
v2 += Platform.getLong(base, offset + 8) * PRIME64_2;
v2 = Long.rotateLeft(v2, 31);
v2 *= PRIME64_1;
v3 += mb.getLong(offset + 16) * PRIME64_2;
v3 += Platform.getLong(base, offset + 16) * PRIME64_2;
v3 = Long.rotateLeft(v3, 31);
v3 *= PRIME64_1;
v4 += mb.getLong(offset + 24) * PRIME64_2;
v4 += Platform.getLong(base, offset + 24) * PRIME64_2;
v4 = Long.rotateLeft(v4, 31);
v4 *= PRIME64_1;
@ -190,9 +183,9 @@ public final class XXH64 {
hash += length;
long limit = length - 8;
long limit = end - 8;
while (offset <= limit) {
long k1 = mb.getLong(offset);
long k1 = Platform.getLong(base, offset);
hash ^= Long.rotateLeft(k1 * PRIME64_2, 31) * PRIME64_1;
hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
offset += 8L;

View file

@ -19,8 +19,6 @@ package org.apache.spark.sql.catalyst.expressions.codegen;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
/**
@ -31,34 +29,43 @@ public class UTF8StringBuilder {
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
private ByteArrayMemoryBlock buffer;
private int length = 0;
private byte[] buffer;
private int cursor = Platform.BYTE_ARRAY_OFFSET;
public UTF8StringBuilder() {
// Since initial buffer size is 16 in `StringBuilder`, we set the same size here
this.buffer = new ByteArrayMemoryBlock(16);
this.buffer = new byte[16];
}
// Grows the buffer by at least `neededSize`
private void grow(int neededSize) {
if (neededSize > ARRAY_MAX - length) {
if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + ARRAY_MAX);
}
final int requestedSize = length + neededSize;
if (buffer.size() < requestedSize) {
int newLength = requestedSize < ARRAY_MAX / 2 ? requestedSize * 2 : ARRAY_MAX;
final ByteArrayMemoryBlock tmp = new ByteArrayMemoryBlock(newLength);
MemoryBlock.copyMemory(buffer, tmp, length);
final int length = totalSize() + neededSize;
if (buffer.length < length) {
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
final byte[] tmp = new byte[newLength];
Platform.copyMemory(
buffer,
Platform.BYTE_ARRAY_OFFSET,
tmp,
Platform.BYTE_ARRAY_OFFSET,
totalSize());
buffer = tmp;
}
}
private int totalSize() {
return cursor - Platform.BYTE_ARRAY_OFFSET;
}
public void append(UTF8String value) {
grow(value.numBytes());
value.writeToMemory(buffer.getByteArray(), length + Platform.BYTE_ARRAY_OFFSET);
length += value.numBytes();
value.writeToMemory(buffer, cursor);
cursor += value.numBytes();
}
public void append(String value) {
@ -66,6 +73,6 @@ public class UTF8StringBuilder {
}
public UTF8String build() {
return UTF8String.fromBytes(buffer.getByteArray(), 0, length);
return UTF8String.fromBytes(buffer, 0, totalSize());
}
}

View file

@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
////////////////////////////////////////////////////////////////////////////////////////////////////
@ -362,7 +361,10 @@ abstract class HashExpression[E] extends Expression {
}
protected def genHashString(input: String, result: String): String = {
s"$result = $hasherClassName.hashUTF8String($input, $result);"
val baseObject = s"$input.getBaseObject()"
val baseOffset = s"$input.getBaseOffset()"
val numBytes = s"$input.numBytes()"
s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);"
}
protected def genHashForMap(
@ -469,8 +471,6 @@ abstract class InterpretedHashFunction {
protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long
/**
* Computes hash of a given `value` of type `dataType`. The caller needs to check the validity
* of input `value`.
@ -496,7 +496,8 @@ abstract class InterpretedHashFunction {
case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed))
case a: Array[Byte] =>
hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
case s: UTF8String => hashUnsafeBytesBlock(s.getMemoryBlock(), seed)
case s: UTF8String =>
hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed)
case array: ArrayData =>
val elementType = dataType match {
@ -583,15 +584,9 @@ object Murmur3HashFunction extends InterpretedHashFunction {
Murmur3_x86_32.hashLong(l, seed.toInt)
}
override protected def hashUnsafeBytes(
base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
Murmur3_x86_32.hashUnsafeBytes(base, offset, len, seed.toInt)
}
override protected def hashUnsafeBytesBlock(
base: MemoryBlock, seed: Long): Long = {
Murmur3_x86_32.hashUnsafeBytesBlock(base, seed.toInt)
}
}
/**
@ -616,14 +611,9 @@ object XxHash64Function extends InterpretedHashFunction {
override protected def hashLong(l: Long, seed: Long): Long = XXH64.hashLong(l, seed)
override protected def hashUnsafeBytes(
base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
XXH64.hashUnsafeBytes(base, offset, len, seed)
}
override protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long = {
XXH64.hashUnsafeBytesBlock(base, seed)
}
}
/**
@ -730,7 +720,10 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
"""
override protected def genHashString(input: String, result: String): String = {
s"$result = $hasherClassName.hashUTF8String($input);"
val baseObject = s"$input.getBaseObject()"
val baseOffset = s"$input.getBaseOffset()"
val numBytes = s"$input.numBytes()"
s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);"
}
override protected def genHashForArray(
@ -824,14 +817,10 @@ object HiveHashFunction extends InterpretedHashFunction {
HiveHasher.hashLong(l)
}
override protected def hashUnsafeBytes(
base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
HiveHasher.hashUnsafeBytes(base, offset, len)
}
override protected def hashUnsafeBytesBlock(
base: MemoryBlock, seed: Long): Long = HiveHasher.hashUnsafeBytesBlock(base)
private val HIVE_DECIMAL_MAX_PRECISION = 38
private val HIVE_DECIMAL_MAX_SCALE = 38

View file

@ -17,8 +17,7 @@
package org.apache.spark.sql.catalyst.expressions;
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.UTF8String;
import org.junit.Assert;
import org.junit.Test;
@ -54,7 +53,7 @@ public class HiveHasherSuite {
for (int i = 0; i < inputs.length; i++) {
UTF8String s = UTF8String.fromString("val_" + inputs[i]);
int hash = HiveHasher.hashUnsafeBytesBlock(s.getMemoryBlock());
int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes());
Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash));
}
}
@ -90,13 +89,13 @@ public class HiveHasherSuite {
int byteArrSize = rand.nextInt(100) * 8;
byte[] bytes = new byte[byteArrSize];
rand.nextBytes(bytes);
MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes);
Assert.assertEquals(
HiveHasher.hashUnsafeBytesBlock(mb),
HiveHasher.hashUnsafeBytesBlock(mb));
HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb));
hashcodes.add(HiveHasher.hashUnsafeBytes(
bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
}
// A very loose bound.
@ -113,13 +112,13 @@ public class HiveHasherSuite {
byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
byte[] paddedBytes = new byte[byteArrSize];
System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes);
Assert.assertEquals(
HiveHasher.hashUnsafeBytesBlock(mb),
HiveHasher.hashUnsafeBytesBlock(mb));
HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb));
hashcodes.add(HiveHasher.hashUnsafeBytes(
paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
}
// A very loose bound.

View file

@ -24,8 +24,6 @@ import java.util.Random;
import java.util.Set;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.junit.Assert;
import org.junit.Test;
@ -144,13 +142,13 @@ public class XXH64Suite {
int byteArrSize = rand.nextInt(100) * 8;
byte[] bytes = new byte[byteArrSize];
rand.nextBytes(bytes);
MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes);
Assert.assertEquals(
hasher.hashUnsafeWordsBlock(mb),
hasher.hashUnsafeWordsBlock(mb));
hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
hashcodes.add(hasher.hashUnsafeWordsBlock(mb));
hashcodes.add(hasher.hashUnsafeWords(
bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
}
// A very loose bound.
@ -167,13 +165,13 @@ public class XXH64Suite {
byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
byte[] paddedBytes = new byte[byteArrSize];
System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes);
Assert.assertEquals(
hasher.hashUnsafeWordsBlock(mb),
hasher.hashUnsafeWordsBlock(mb));
hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
hashcodes.add(hasher.hashUnsafeWordsBlock(mb));
hashcodes.add(hasher.hashUnsafeWords(
paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
}
// A very loose bound.

View file

@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.OffHeapMemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
/**
@ -207,7 +206,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
@Override
protected UTF8String getBytesAsUTF8String(int rowId, int count) {
return new UTF8String(new OffHeapMemoryBlock(data + rowId, count));
return UTF8String.fromAddress(null, data + rowId, count);
}
//

View file

@ -25,7 +25,6 @@ import org.apache.arrow.vector.holders.NullableVarCharHolder;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.memory.OffHeapMemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
/**
@ -378,10 +377,9 @@ public final class ArrowColumnVector extends ColumnVector {
if (stringResult.isSet == 0) {
return null;
} else {
return new UTF8String(new OffHeapMemoryBlock(
return UTF8String.fromAddress(null,
stringResult.buffer.memoryAddress() + stringResult.start,
stringResult.end - stringResult.start
));
stringResult.end - stringResult.start);
}
}
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark
import java.util.{Arrays, Comparator}
import org.apache.spark.unsafe.array.LongArray
import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.Benchmark
import org.apache.spark.util.collection.Sorter
import org.apache.spark.util.collection.unsafe.sort._
@ -36,7 +36,7 @@ import org.apache.spark.util.random.XORShiftRandom
class SortBenchmark extends BenchmarkBase {
private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) {
val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L))
val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] {
override def compare(
@ -50,8 +50,8 @@ class SortBenchmark extends BenchmarkBase {
private def generateKeyPrefixTestData(size: Int, rand: => Long): (LongArray, LongArray) = {
val ref = Array.tabulate[Long](size * 2) { i => rand }
val extended = ref ++ Array.fill[Long](size * 2)(0)
(new LongArray(OnHeapMemoryBlock.fromArray(ref)),
new LongArray(OnHeapMemoryBlock.fromArray(extended)))
(new LongArray(MemoryBlock.fromLongArray(ref)),
new LongArray(MemoryBlock.fromLongArray(extended)))
}
ignore("sort") {
@ -60,7 +60,7 @@ class SortBenchmark extends BenchmarkBase {
val benchmark = new Benchmark("radix sort " + size, size)
benchmark.addTimerCase("reference TimSort key prefix array") { timer =>
val array = Array.tabulate[Long](size * 2) { i => rand.nextLong }
val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
val buf = new LongArray(MemoryBlock.fromLongArray(array))
timer.startTiming()
referenceKeyPrefixSort(buf, 0, size, PrefixComparators.BINARY)
timer.stopTiming()
@ -78,7 +78,7 @@ class SortBenchmark extends BenchmarkBase {
array(i) = rand.nextLong & 0xff
i += 1
}
val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
val buf = new LongArray(MemoryBlock.fromLongArray(array))
timer.startTiming()
RadixSort.sort(buf, size, 0, 7, false, false)
timer.stopTiming()
@ -90,7 +90,7 @@ class SortBenchmark extends BenchmarkBase {
array(i) = rand.nextLong & 0xffff
i += 1
}
val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
val buf = new LongArray(MemoryBlock.fromLongArray(array))
timer.startTiming()
RadixSort.sort(buf, size, 0, 7, false, false)
timer.stopTiming()
@ -102,7 +102,7 @@ class SortBenchmark extends BenchmarkBase {
array(i) = rand.nextLong
i += 1
}
val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
val buf = new LongArray(MemoryBlock.fromLongArray(array))
timer.startTiming()
RadixSort.sort(buf, size, 0, 7, false, false)
timer.stopTiming()

View file

@ -22,13 +22,13 @@ import java.io.File
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.Utils
class RowQueueSuite extends SparkFunSuite {
test("in-memory queue") {
val page = new OnHeapMemoryBlock((1<<10) * 8L)
val page = MemoryBlock.fromLongArray(new Array[Long](1<<10))
val queue = new InMemoryRowQueue(page, 1) {
override def close() {}
}