diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java index 62b75ae8aa..73577437ac 100644 --- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java +++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java @@ -17,8 +17,7 @@ package org.apache.spark.sql.catalyst.expressions; -import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.Platform; /** * Simulates Hive's hashing function from Hive v1.2.1 @@ -39,21 +38,12 @@ public class HiveHasher { return (int) ((input >>> 32) ^ input); } - public static int hashUnsafeBytesBlock(MemoryBlock mb) { - long lengthInBytes = mb.size(); + public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; int result = 0; - for (long i = 0; i < lengthInBytes; i++) { - result = (result * 31) + (int) mb.getByte(i); + for (int i = 0; i < lengthInBytes; i++) { + result = (result * 31) + (int) Platform.getByte(base, offset + i); } return result; } - - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { - return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes)); - } - - public static int hashUTF8String(UTF8String str) { - return hashUnsafeBytesBlock(str.getMemoryBlock()); - } } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 54dcadf3a7..aca6fca00c 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -187,7 +187,7 @@ public final class Platform { } public static void copyMemory( - Object src, long srcOffset, Object dst, long dstOffset, long length) { + Object src, long srcOffset, Object dst, long dstOffset, long length) { // Check if dstOffset is before or after srcOffset to determine if we should copy // forward or backwards. This is necessary in case src and dst overlap. if (dstOffset < srcOffset) { diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java index ef0f78d95d..cec8c30887 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java @@ -18,7 +18,6 @@ package org.apache.spark.unsafe.array; import org.apache.spark.unsafe.Platform; -import org.apache.spark.unsafe.memory.MemoryBlock; public class ByteArrayMethods { @@ -53,25 +52,15 @@ public class ByteArrayMethods { public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; private static final boolean unaligned = Platform.unaligned(); - /** - * MemoryBlock equality check for MemoryBlocks. - * @return true if the arrays are equal, false otherwise - */ - public static boolean arrayEqualsBlock( - MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, long length) { - return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset, - rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length); - } - /** * Optimized byte array equality check for byte arrays. * @return true if the arrays are equal, false otherwise */ public static boolean arrayEquals( - Object leftBase, long leftOffset, Object rightBase, long rightOffset, long length) { + Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) { int i = 0; - // check if starts align and we can get both offsets to be aligned + // check if stars align and we can get both offsets to be aligned if ((leftOffset % 8) == (rightOffset % 8)) { while ((leftOffset + i) % 8 != 0 && i < length) { if (Platform.getByte(leftBase, leftOffset + i) != diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java index b74d2de069..2cd39bd60c 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java @@ -17,6 +17,7 @@ package org.apache.spark.unsafe.array; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.memory.MemoryBlock; /** @@ -32,12 +33,16 @@ public final class LongArray { private static final long WIDTH = 8; private final MemoryBlock memory; + private final Object baseObj; + private final long baseOffset; private final long length; public LongArray(MemoryBlock memory) { assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements"; this.memory = memory; + this.baseObj = memory.getBaseObject(); + this.baseOffset = memory.getBaseOffset(); this.length = memory.size() / WIDTH; } @@ -46,11 +51,11 @@ public final class LongArray { } public Object getBaseObject() { - return memory.getBaseObject(); + return baseObj; } public long getBaseOffset() { - return memory.getBaseOffset(); + return baseOffset; } /** @@ -64,8 +69,8 @@ public final class LongArray { * Fill this all with 0L. */ public void zeroOut() { - for (long off = 0; off < length * WIDTH; off += WIDTH) { - memory.putLong(off, 0); + for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) { + Platform.putLong(baseObj, off, 0); } } @@ -75,7 +80,7 @@ public final class LongArray { public void set(int index, long value) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; - memory.putLong(index * WIDTH, value); + Platform.putLong(baseObj, baseOffset + index * WIDTH, value); } /** @@ -84,6 +89,6 @@ public final class LongArray { public long get(int index) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; - return memory.getLong(index * WIDTH); + return Platform.getLong(baseObj, baseOffset + index * WIDTH); } } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java index 566f116154..d239de6083 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java @@ -17,11 +17,7 @@ package org.apache.spark.unsafe.hash; -import com.google.common.primitives.Ints; - import org.apache.spark.unsafe.Platform; -import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.unsafe.types.UTF8String; /** * 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction. @@ -53,74 +49,49 @@ public final class Murmur3_x86_32 { } public int hashUnsafeWords(Object base, long offset, int lengthInBytes) { - return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); - } - - public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) { - // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. - int lengthInBytes = Ints.checkedCast(base.size()); - assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; - int h1 = hashBytesByIntBlock(base, lengthInBytes, seed); - return fmix(h1, lengthInBytes); + return hashUnsafeWords(base, offset, lengthInBytes, seed); } public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) { // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method. - return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); + assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)"; + int h1 = hashBytesByInt(base, offset, lengthInBytes, seed); + return fmix(h1, lengthInBytes); } - public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) { - return hashUnsafeBytesBlock(base, Ints.checkedCast(base.size()), seed); - } - - private static int hashUnsafeBytesBlock(MemoryBlock base, int lengthInBytes, int seed) { + public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { // This is not compatible with original and another implementations. // But remain it for backward compatibility for the components existing before 2.3. assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; int lengthAligned = lengthInBytes - lengthInBytes % 4; - int h1 = hashBytesByIntBlock(base, lengthAligned, seed); - long offset = base.getBaseOffset(); - Object o = base.getBaseObject(); + int h1 = hashBytesByInt(base, offset, lengthAligned, seed); for (int i = lengthAligned; i < lengthInBytes; i++) { - int halfWord = Platform.getByte(o, offset + i); + int halfWord = Platform.getByte(base, offset + i); int k1 = mixK1(halfWord); h1 = mixH1(h1, k1); } return fmix(h1, lengthInBytes); } - public static int hashUTF8String(UTF8String str, int seed) { - return hashUnsafeBytesBlock(str.getMemoryBlock(), str.numBytes(), seed); - } - - public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { - return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); - } - public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) { - return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed); - } - - public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) { - // This is compatible with original and other implementations. + // This is compatible with original and another implementations. // Use this method for new components after Spark 2.3. - int lengthInBytes = Ints.checkedCast(base.size()); - assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative"; + assert (lengthInBytes >= 0): "lengthInBytes cannot be negative"; int lengthAligned = lengthInBytes - lengthInBytes % 4; - int h1 = hashBytesByIntBlock(base, lengthAligned, seed); + int h1 = hashBytesByInt(base, offset, lengthAligned, seed); int k1 = 0; for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) { - k1 ^= (base.getByte(i) & 0xFF) << shift; + k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift; } h1 ^= mixK1(k1); return fmix(h1, lengthInBytes); } - private static int hashBytesByIntBlock(MemoryBlock base, int lengthInBytes, int seed) { + private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) { assert (lengthInBytes % 4 == 0); int h1 = seed; for (int i = 0; i < lengthInBytes; i += 4) { - int halfWord = base.getInt(i); + int halfWord = Platform.getInt(base, offset + i); int k1 = mixK1(halfWord); h1 = mixH1(h1, k1); } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java deleted file mode 100644 index 9f238632bc..0000000000 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.unsafe.memory; - -import com.google.common.primitives.Ints; - -import org.apache.spark.unsafe.Platform; - -/** - * A consecutive block of memory with a byte array on Java heap. - */ -public final class ByteArrayMemoryBlock extends MemoryBlock { - - private final byte[] array; - - public ByteArrayMemoryBlock(byte[] obj, long offset, long size) { - super(obj, offset, size); - this.array = obj; - assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) : - "The sum of size " + size + " and offset " + offset + " should not be larger than " + - "the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET); - } - - public ByteArrayMemoryBlock(long length) { - this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length); - } - - @Override - public MemoryBlock subBlock(long offset, long size) { - checkSubBlockRange(offset, size); - if (offset == 0 && size == this.size()) return this; - return new ByteArrayMemoryBlock(array, this.offset + offset, size); - } - - public byte[] getByteArray() { return array; } - - /** - * Creates a memory block pointing to the memory used by the byte array. - */ - public static ByteArrayMemoryBlock fromArray(final byte[] array) { - return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length); - } - - @Override - public int getInt(long offset) { - return Platform.getInt(array, this.offset + offset); - } - - @Override - public void putInt(long offset, int value) { - Platform.putInt(array, this.offset + offset, value); - } - - @Override - public boolean getBoolean(long offset) { - return Platform.getBoolean(array, this.offset + offset); - } - - @Override - public void putBoolean(long offset, boolean value) { - Platform.putBoolean(array, this.offset + offset, value); - } - - @Override - public byte getByte(long offset) { - return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)]; - } - - @Override - public void putByte(long offset, byte value) { - array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value; - } - - @Override - public short getShort(long offset) { - return Platform.getShort(array, this.offset + offset); - } - - @Override - public void putShort(long offset, short value) { - Platform.putShort(array, this.offset + offset, value); - } - - @Override - public long getLong(long offset) { - return Platform.getLong(array, this.offset + offset); - } - - @Override - public void putLong(long offset, long value) { - Platform.putLong(array, this.offset + offset, value); - } - - @Override - public float getFloat(long offset) { - return Platform.getFloat(array, this.offset + offset); - } - - @Override - public void putFloat(long offset, float value) { - Platform.putFloat(array, this.offset + offset, value); - } - - @Override - public double getDouble(long offset) { - return Platform.getDouble(array, this.offset + offset); - } - - @Override - public void putDouble(long offset, double value) { - Platform.putDouble(array, this.offset + offset, value); - } -} diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java index 36caf80888..2733760dd1 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import org.apache.spark.unsafe.Platform; + /** * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array. */ @@ -56,7 +58,7 @@ public class HeapMemoryAllocator implements MemoryAllocator { final long[] array = arrayReference.get(); if (array != null) { assert (array.length * 8L >= size); - MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size); + MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); } @@ -68,7 +70,7 @@ public class HeapMemoryAllocator implements MemoryAllocator { } } long[] array = new long[numWords]; - MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size); + MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); } @@ -77,13 +79,12 @@ public class HeapMemoryAllocator implements MemoryAllocator { @Override public void free(MemoryBlock memory) { - assert(memory instanceof OnHeapMemoryBlock); - assert (memory.getBaseObject() != null) : + assert (memory.obj != null) : "baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?"; - assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : + assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : "page has already been freed"; - assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER) - || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : + assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER) + || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : "TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " + "free()"; @@ -93,12 +94,12 @@ public class HeapMemoryAllocator implements MemoryAllocator { } // Mark the page as freed (so we can detect double-frees). - memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER); + memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER; // As an additional layer of defense against use-after-free bugs, we mutate the // MemoryBlock to null out its reference to the long[] array. - long[] array = ((OnHeapMemoryBlock)memory).getLongArray(); - memory.resetObjAndOffset(); + long[] array = (long[]) memory.obj; + memory.setObjAndOffset(null, 0); long alignedSize = ((size + 7) / 8) * 8; if (shouldPool(alignedSize)) { diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java index 38315fb97b..7b588681d9 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java @@ -38,7 +38,7 @@ public interface MemoryAllocator { void free(MemoryBlock memory); - UnsafeMemoryAllocator UNSAFE = new UnsafeMemoryAllocator(); + MemoryAllocator UNSAFE = new UnsafeMemoryAllocator(); - HeapMemoryAllocator HEAP = new HeapMemoryAllocator(); + MemoryAllocator HEAP = new HeapMemoryAllocator(); } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java index ca7213bbf9..c333857358 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java @@ -22,10 +22,10 @@ import javax.annotation.Nullable; import org.apache.spark.unsafe.Platform; /** - * A representation of a consecutive memory block in Spark. It defines the common interfaces - * for memory accessing and mutating. + * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size. */ -public abstract class MemoryBlock { +public class MemoryBlock extends MemoryLocation { + /** Special `pageNumber` value for pages which were not allocated by TaskMemoryManagers */ public static final int NO_PAGE_NUMBER = -1; @@ -45,163 +45,38 @@ public abstract class MemoryBlock { */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - @Nullable - protected Object obj; - - protected long offset; - - protected long length; + private final long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field can be updated using setPageNumber method so that - * this can be modified by the TaskMemoryManager, which lives in a different package. + * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, + * which lives in a different package. */ - private int pageNumber = NO_PAGE_NUMBER; + public int pageNumber = NO_PAGE_NUMBER; - protected MemoryBlock(@Nullable Object obj, long offset, long length) { - if (offset < 0 || length < 0) { - throw new IllegalArgumentException( - "Length " + length + " and offset " + offset + "must be non-negative"); - } - this.obj = obj; - this.offset = offset; + public MemoryBlock(@Nullable Object obj, long offset, long length) { + super(obj, offset); this.length = length; } - protected MemoryBlock() { - this(null, 0, 0); - } - - public final Object getBaseObject() { - return obj; - } - - public final long getBaseOffset() { - return offset; - } - - public void resetObjAndOffset() { - this.obj = null; - this.offset = 0; - } - /** * Returns the size of the memory block. */ - public final long size() { + public long size() { return length; } - public final void setPageNumber(int pageNum) { - pageNumber = pageNum; - } - - public final int getPageNumber() { - return pageNumber; + /** + * Creates a memory block pointing to the memory used by the long array. + */ + public static MemoryBlock fromLongArray(final long[] array) { + return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); } /** * Fills the memory block with the specified byte value. */ - public final void fill(byte value) { + public void fill(byte value) { Platform.setMemory(obj, offset, length, value); } - - /** - * Instantiate MemoryBlock for given object type with new offset - */ - public static final MemoryBlock allocateFromObject(Object obj, long offset, long length) { - MemoryBlock mb = null; - if (obj instanceof byte[]) { - byte[] array = (byte[])obj; - mb = new ByteArrayMemoryBlock(array, offset, length); - } else if (obj instanceof long[]) { - long[] array = (long[])obj; - mb = new OnHeapMemoryBlock(array, offset, length); - } else if (obj == null) { - // we assume that to pass null pointer means off-heap - mb = new OffHeapMemoryBlock(offset, length); - } else { - throw new UnsupportedOperationException( - "Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); - } - return mb; - } - - /** - * Just instantiate the sub-block with the same type of MemoryBlock with the new size and relative - * offset from the original offset. The data is not copied. - * If parameters are invalid, an exception is thrown. - */ - public abstract MemoryBlock subBlock(long offset, long size); - - protected void checkSubBlockRange(long offset, long size) { - if (offset < 0 || size < 0) { - throw new ArrayIndexOutOfBoundsException( - "Size " + size + " and offset " + offset + " must be non-negative"); - } - if (offset + size > length) { - throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + - offset + " should not be larger than the length " + length + " in the MemoryBlock"); - } - } - - /** - * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal - * memory access, throw an exception, or etc. - * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as - * JVM object header. The offset is 0-based and is expected as an logical offset in the memory - * block. - */ - public abstract int getInt(long offset); - - public abstract void putInt(long offset, int value); - - public abstract boolean getBoolean(long offset); - - public abstract void putBoolean(long offset, boolean value); - - public abstract byte getByte(long offset); - - public abstract void putByte(long offset, byte value); - - public abstract short getShort(long offset); - - public abstract void putShort(long offset, short value); - - public abstract long getLong(long offset); - - public abstract void putLong(long offset, long value); - - public abstract float getFloat(long offset); - - public abstract void putFloat(long offset, float value); - - public abstract double getDouble(long offset); - - public abstract void putDouble(long offset, double value); - - public static final void copyMemory( - MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) { - assert(srcOffset + length <= src.length && dstOffset + length <= dst.length); - Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset, - dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length); - } - - public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) { - assert(length <= src.length && length <= dst.length); - Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(), - dst.getBaseObject(), dst.getBaseOffset(), length); - } - - public final void copyFrom(Object src, long srcOffset, long dstOffset, long length) { - assert(length <= this.length - srcOffset); - Platform.copyMemory(src, srcOffset, obj, offset + dstOffset, length); - } - - public final void writeTo(long srcOffset, Object dst, long dstOffset, long length) { - assert(length <= this.length - srcOffset); - Platform.copyMemory(obj, offset + srcOffset, dst, dstOffset, length); - } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java similarity index 51% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala rename to common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java index 1b25a4b191..74ebc87dc9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java @@ -1,42 +1,54 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.codegen - -import org.apache.spark.SparkFunSuite -import org.apache.spark.unsafe.types.UTF8String - -class UTF8StringBuilderSuite extends SparkFunSuite { - - test("basic test") { - val sb = new UTF8StringBuilder() - assert(sb.build() === UTF8String.EMPTY_UTF8) - - sb.append("") - assert(sb.build() === UTF8String.EMPTY_UTF8) - - sb.append("abcd") - assert(sb.build() === UTF8String.fromString("abcd")) - - sb.append(UTF8String.fromString("1234")) - assert(sb.build() === UTF8String.fromString("abcd1234")) - - // expect to grow an internal buffer - sb.append(UTF8String.fromString("efgijk567890")) - assert(sb.build() === UTF8String.fromString("abcd1234efgijk567890")) - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import javax.annotation.Nullable; + +/** + * A memory location. Tracked either by a memory address (with off-heap allocation), + * or by an offset from a JVM object (in-heap allocation). + */ +public class MemoryLocation { + + @Nullable + Object obj; + + long offset; + + public MemoryLocation(@Nullable Object obj, long offset) { + this.obj = obj; + this.offset = offset; + } + + public MemoryLocation() { + this(null, 0); + } + + public void setObjAndOffset(Object newObj, long newOffset) { + this.obj = newObj; + this.offset = newOffset; + } + + public final Object getBaseObject() { + return obj; + } + + public final long getBaseOffset() { + return offset; + } +} diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java deleted file mode 100644 index 3431b08980..0000000000 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.unsafe.memory; - -import org.apache.spark.unsafe.Platform; - -public class OffHeapMemoryBlock extends MemoryBlock { - public static final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 0); - - public OffHeapMemoryBlock(long address, long size) { - super(null, address, size); - } - - @Override - public MemoryBlock subBlock(long offset, long size) { - checkSubBlockRange(offset, size); - if (offset == 0 && size == this.size()) return this; - return new OffHeapMemoryBlock(this.offset + offset, size); - } - - @Override - public final int getInt(long offset) { - return Platform.getInt(null, this.offset + offset); - } - - @Override - public final void putInt(long offset, int value) { - Platform.putInt(null, this.offset + offset, value); - } - - @Override - public final boolean getBoolean(long offset) { - return Platform.getBoolean(null, this.offset + offset); - } - - @Override - public final void putBoolean(long offset, boolean value) { - Platform.putBoolean(null, this.offset + offset, value); - } - - @Override - public final byte getByte(long offset) { - return Platform.getByte(null, this.offset + offset); - } - - @Override - public final void putByte(long offset, byte value) { - Platform.putByte(null, this.offset + offset, value); - } - - @Override - public final short getShort(long offset) { - return Platform.getShort(null, this.offset + offset); - } - - @Override - public final void putShort(long offset, short value) { - Platform.putShort(null, this.offset + offset, value); - } - - @Override - public final long getLong(long offset) { - return Platform.getLong(null, this.offset + offset); - } - - @Override - public final void putLong(long offset, long value) { - Platform.putLong(null, this.offset + offset, value); - } - - @Override - public final float getFloat(long offset) { - return Platform.getFloat(null, this.offset + offset); - } - - @Override - public final void putFloat(long offset, float value) { - Platform.putFloat(null, this.offset + offset, value); - } - - @Override - public final double getDouble(long offset) { - return Platform.getDouble(null, this.offset + offset); - } - - @Override - public final void putDouble(long offset, double value) { - Platform.putDouble(null, this.offset + offset, value); - } -} diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java deleted file mode 100644 index ee42bc27c9..0000000000 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.unsafe.memory; - -import com.google.common.primitives.Ints; - -import org.apache.spark.unsafe.Platform; - -/** - * A consecutive block of memory with a long array on Java heap. - */ -public final class OnHeapMemoryBlock extends MemoryBlock { - - private final long[] array; - - public OnHeapMemoryBlock(long[] obj, long offset, long size) { - super(obj, offset, size); - this.array = obj; - assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) : - "The sum of size " + size + " and offset " + offset + " should not be larger than " + - "the size of the given memory space " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET); - } - - public OnHeapMemoryBlock(long size) { - this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size); - } - - @Override - public MemoryBlock subBlock(long offset, long size) { - checkSubBlockRange(offset, size); - if (offset == 0 && size == this.size()) return this; - return new OnHeapMemoryBlock(array, this.offset + offset, size); - } - - public long[] getLongArray() { return array; } - - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static OnHeapMemoryBlock fromArray(final long[] array) { - return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); - } - - public static OnHeapMemoryBlock fromArray(final long[] array, long size) { - return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); - } - - @Override - public int getInt(long offset) { - return Platform.getInt(array, this.offset + offset); - } - - @Override - public void putInt(long offset, int value) { - Platform.putInt(array, this.offset + offset, value); - } - - @Override - public boolean getBoolean(long offset) { - return Platform.getBoolean(array, this.offset + offset); - } - - @Override - public void putBoolean(long offset, boolean value) { - Platform.putBoolean(array, this.offset + offset, value); - } - - @Override - public byte getByte(long offset) { - return Platform.getByte(array, this.offset + offset); - } - - @Override - public void putByte(long offset, byte value) { - Platform.putByte(array, this.offset + offset, value); - } - - @Override - public short getShort(long offset) { - return Platform.getShort(array, this.offset + offset); - } - - @Override - public void putShort(long offset, short value) { - Platform.putShort(array, this.offset + offset, value); - } - - @Override - public long getLong(long offset) { - return Platform.getLong(array, this.offset + offset); - } - - @Override - public void putLong(long offset, long value) { - Platform.putLong(array, this.offset + offset, value); - } - - @Override - public float getFloat(long offset) { - return Platform.getFloat(array, this.offset + offset); - } - - @Override - public void putFloat(long offset, float value) { - Platform.putFloat(array, this.offset + offset, value); - } - - @Override - public double getDouble(long offset) { - return Platform.getDouble(array, this.offset + offset); - } - - @Override - public void putDouble(long offset, double value) { - Platform.putDouble(array, this.offset + offset, value); - } -} diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java index 5310bdf277..4368fb615b 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java @@ -25,9 +25,9 @@ import org.apache.spark.unsafe.Platform; public class UnsafeMemoryAllocator implements MemoryAllocator { @Override - public OffHeapMemoryBlock allocate(long size) throws OutOfMemoryError { + public MemoryBlock allocate(long size) throws OutOfMemoryError { long address = Platform.allocateMemory(size); - OffHeapMemoryBlock memory = new OffHeapMemoryBlock(address, size); + MemoryBlock memory = new MemoryBlock(null, address, size); if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); } @@ -36,25 +36,22 @@ public class UnsafeMemoryAllocator implements MemoryAllocator { @Override public void free(MemoryBlock memory) { - assert(memory instanceof OffHeapMemoryBlock) : - "UnsafeMemoryAllocator can only free OffHeapMemoryBlock."; - if (memory == OffHeapMemoryBlock.NULL) return; - assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : + assert (memory.obj == null) : + "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?"; + assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : "page has already been freed"; - assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER) - || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : + assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER) + || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) : "TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()"; if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); } - Platform.freeMemory(memory.offset); - // As an additional layer of defense against use-after-free bugs, we mutate the // MemoryBlock to reset its pointer. - memory.resetObjAndOffset(); + memory.offset = 0; // Mark the page as freed (so we can detect double-frees). - memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER); + memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER; } } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index e91fc43914..dff4a73f3e 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -34,8 +34,6 @@ import com.google.common.primitives.Ints; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.hash.Murmur3_x86_32; -import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock; -import org.apache.spark.unsafe.memory.MemoryBlock; import static org.apache.spark.unsafe.Platform.*; @@ -53,13 +51,12 @@ public final class UTF8String implements Comparable, Externalizable, // These are only updated by readExternal() or read() @Nonnull - private MemoryBlock base; - // While numBytes has the same value as base.size(), to keep as int avoids cast from long to int + private Object base; + private long offset; private int numBytes; - public MemoryBlock getMemoryBlock() { return base; } - public Object getBaseObject() { return base.getBaseObject(); } - public long getBaseOffset() { return base.getBaseOffset(); } + public Object getBaseObject() { return base; } + public long getBaseOffset() { return offset; } /** * A char in UTF-8 encoding can take 1-4 bytes depending on the first byte which @@ -112,8 +109,7 @@ public final class UTF8String implements Comparable, Externalizable, */ public static UTF8String fromBytes(byte[] bytes) { if (bytes != null) { - return new UTF8String( - new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET, bytes.length)); + return new UTF8String(bytes, BYTE_ARRAY_OFFSET, bytes.length); } else { return null; } @@ -126,13 +122,19 @@ public final class UTF8String implements Comparable, Externalizable, */ public static UTF8String fromBytes(byte[] bytes, int offset, int numBytes) { if (bytes != null) { - return new UTF8String( - new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET + offset, numBytes)); + return new UTF8String(bytes, BYTE_ARRAY_OFFSET + offset, numBytes); } else { return null; } } + /** + * Creates an UTF8String from given address (base and offset) and length. + */ + public static UTF8String fromAddress(Object base, long offset, int numBytes) { + return new UTF8String(base, offset, numBytes); + } + /** * Creates an UTF8String from String. */ @@ -149,13 +151,16 @@ public final class UTF8String implements Comparable, Externalizable, return fromBytes(spaces); } - public UTF8String(MemoryBlock base) { + protected UTF8String(Object base, long offset, int numBytes) { this.base = base; - this.numBytes = Ints.checkedCast(base.size()); + this.offset = offset; + this.numBytes = numBytes; } // for serialization - public UTF8String() {} + public UTF8String() { + this(null, 0, 0); + } /** * Writes the content of this string into a memory address, identified by an object and an offset. @@ -163,7 +168,7 @@ public final class UTF8String implements Comparable, Externalizable, * bytes in this string. */ public void writeToMemory(Object target, long targetOffset) { - base.writeTo(0, target, targetOffset, numBytes); + Platform.copyMemory(base, offset, target, targetOffset, numBytes); } public void writeTo(ByteBuffer buffer) { @@ -183,9 +188,8 @@ public final class UTF8String implements Comparable, Externalizable, */ @Nonnull public ByteBuffer getByteBuffer() { - long offset = base.getBaseOffset(); - if (base instanceof ByteArrayMemoryBlock && offset >= BYTE_ARRAY_OFFSET) { - final byte[] bytes = ((ByteArrayMemoryBlock) base).getByteArray(); + if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) { + final byte[] bytes = (byte[]) base; // the offset includes an object header... this is only needed for unsafe copies final long arrayOffset = offset - BYTE_ARRAY_OFFSET; @@ -252,12 +256,12 @@ public final class UTF8String implements Comparable, Externalizable, long mask = 0; if (IS_LITTLE_ENDIAN) { if (numBytes >= 8) { - p = base.getLong(0); + p = Platform.getLong(base, offset); } else if (numBytes > 4) { - p = base.getLong(0); + p = Platform.getLong(base, offset); mask = (1L << (8 - numBytes) * 8) - 1; } else if (numBytes > 0) { - p = (long) base.getInt(0); + p = (long) Platform.getInt(base, offset); mask = (1L << (8 - numBytes) * 8) - 1; } else { p = 0; @@ -266,12 +270,12 @@ public final class UTF8String implements Comparable, Externalizable, } else { // byteOrder == ByteOrder.BIG_ENDIAN if (numBytes >= 8) { - p = base.getLong(0); + p = Platform.getLong(base, offset); } else if (numBytes > 4) { - p = base.getLong(0); + p = Platform.getLong(base, offset); mask = (1L << (8 - numBytes) * 8) - 1; } else if (numBytes > 0) { - p = ((long) base.getInt(0)) << 32; + p = ((long) Platform.getInt(base, offset)) << 32; mask = (1L << (8 - numBytes) * 8) - 1; } else { p = 0; @@ -286,13 +290,12 @@ public final class UTF8String implements Comparable, Externalizable, */ public byte[] getBytes() { // avoid copy if `base` is `byte[]` - long offset = base.getBaseOffset(); - if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock - && (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) { - return ((ByteArrayMemoryBlock) base).getByteArray(); + if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[] + && ((byte[]) base).length == numBytes) { + return (byte[]) base; } else { byte[] bytes = new byte[numBytes]; - base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes); + copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes); return bytes; } } @@ -322,7 +325,7 @@ public final class UTF8String implements Comparable, Externalizable, if (i > j) { byte[] bytes = new byte[i - j]; - base.writeTo(j, bytes, BYTE_ARRAY_OFFSET, i - j); + copyMemory(base, offset + j, bytes, BYTE_ARRAY_OFFSET, i - j); return fromBytes(bytes); } else { return EMPTY_UTF8; @@ -363,14 +366,14 @@ public final class UTF8String implements Comparable, Externalizable, * Returns the byte at position `i`. */ private byte getByte(int i) { - return base.getByte(i); + return Platform.getByte(base, offset + i); } private boolean matchAt(final UTF8String s, int pos) { if (s.numBytes + pos > numBytes || pos < 0) { return false; } - return ByteArrayMethods.arrayEqualsBlock(base, pos, s.base, 0, s.numBytes); + return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, s.numBytes); } public boolean startsWith(final UTF8String prefix) { @@ -497,7 +500,8 @@ public final class UTF8String implements Comparable, Externalizable, for (int i = 0; i < numBytes; i++) { if (getByte(i) == (byte) ',') { if (i - (lastComma + 1) == match.numBytes && - ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) { + ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset, + match.numBytes)) { return n; } lastComma = i; @@ -505,7 +509,8 @@ public final class UTF8String implements Comparable, Externalizable, } } if (numBytes - (lastComma + 1) == match.numBytes && - ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) { + ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset, + match.numBytes)) { return n; } return 0; @@ -520,7 +525,7 @@ public final class UTF8String implements Comparable, Externalizable, private UTF8String copyUTF8String(int start, int end) { int len = end - start + 1; byte[] newBytes = new byte[len]; - base.writeTo(start, newBytes, BYTE_ARRAY_OFFSET, len); + copyMemory(base, offset + start, newBytes, BYTE_ARRAY_OFFSET, len); return UTF8String.fromBytes(newBytes); } @@ -667,7 +672,8 @@ public final class UTF8String implements Comparable, Externalizable, int i = 0; // position in byte while (i < numBytes) { int len = numBytesForFirstByte(getByte(i)); - base.writeTo(i, result, BYTE_ARRAY_OFFSET + result.length - i - len, len); + copyMemory(this.base, this.offset + i, result, + BYTE_ARRAY_OFFSET + result.length - i - len, len); i += len; } @@ -681,7 +687,7 @@ public final class UTF8String implements Comparable, Externalizable, } byte[] newBytes = new byte[numBytes * times]; - base.writeTo(0, newBytes, BYTE_ARRAY_OFFSET, numBytes); + copyMemory(this.base, this.offset, newBytes, BYTE_ARRAY_OFFSET, numBytes); int copied = 1; while (copied < times) { @@ -718,7 +724,7 @@ public final class UTF8String implements Comparable, Externalizable, if (i + v.numBytes > numBytes) { return -1; } - if (ByteArrayMethods.arrayEqualsBlock(base, i, v.base, 0, v.numBytes)) { + if (ByteArrayMethods.arrayEquals(base, offset + i, v.base, v.offset, v.numBytes)) { return c; } i += numBytesForFirstByte(getByte(i)); @@ -734,7 +740,7 @@ public final class UTF8String implements Comparable, Externalizable, private int find(UTF8String str, int start) { assert (str.numBytes > 0); while (start <= numBytes - str.numBytes) { - if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) { + if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) { return start; } start += 1; @@ -748,7 +754,7 @@ public final class UTF8String implements Comparable, Externalizable, private int rfind(UTF8String str, int start) { assert (str.numBytes > 0); while (start >= 0) { - if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) { + if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) { return start; } start -= 1; @@ -781,7 +787,7 @@ public final class UTF8String implements Comparable, Externalizable, return EMPTY_UTF8; } byte[] bytes = new byte[idx]; - base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, idx); + copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, idx); return fromBytes(bytes); } else { @@ -801,7 +807,7 @@ public final class UTF8String implements Comparable, Externalizable, } int size = numBytes - delim.numBytes - idx; byte[] bytes = new byte[size]; - base.writeTo(idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size); + copyMemory(base, offset + idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size); return fromBytes(bytes); } } @@ -824,15 +830,15 @@ public final class UTF8String implements Comparable, Externalizable, UTF8String remain = pad.substring(0, spaces - padChars * count); byte[] data = new byte[this.numBytes + pad.numBytes * count + remain.numBytes]; - base.writeTo(0, data, BYTE_ARRAY_OFFSET, this.numBytes); + copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET, this.numBytes); int offset = this.numBytes; int idx = 0; while (idx < count) { - pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes); + copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes); ++ idx; offset += pad.numBytes; } - remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes); + copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes); return UTF8String.fromBytes(data); } @@ -860,13 +866,13 @@ public final class UTF8String implements Comparable, Externalizable, int offset = 0; int idx = 0; while (idx < count) { - pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes); + copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes); ++ idx; offset += pad.numBytes; } - remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes); + copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes); offset += remain.numBytes; - base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, numBytes()); + copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET + offset, numBytes()); return UTF8String.fromBytes(data); } @@ -891,8 +897,8 @@ public final class UTF8String implements Comparable, Externalizable, int offset = 0; for (int i = 0; i < inputs.length; i++) { int len = inputs[i].numBytes; - inputs[i].base.writeTo( - 0, + copyMemory( + inputs[i].base, inputs[i].offset, result, BYTE_ARRAY_OFFSET + offset, len); offset += len; @@ -931,8 +937,8 @@ public final class UTF8String implements Comparable, Externalizable, for (int i = 0, j = 0; i < inputs.length; i++) { if (inputs[i] != null) { int len = inputs[i].numBytes; - inputs[i].base.writeTo( - 0, + copyMemory( + inputs[i].base, inputs[i].offset, result, BYTE_ARRAY_OFFSET + offset, len); offset += len; @@ -940,8 +946,8 @@ public final class UTF8String implements Comparable, Externalizable, j++; // Add separator if this is not the last input. if (j < numInputs) { - separator.base.writeTo( - 0, + copyMemory( + separator.base, separator.offset, result, BYTE_ARRAY_OFFSET + offset, separator.numBytes); offset += separator.numBytes; @@ -1215,7 +1221,7 @@ public final class UTF8String implements Comparable, Externalizable, public UTF8String copy() { byte[] bytes = new byte[numBytes]; - base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes); + copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes); return fromBytes(bytes); } @@ -1223,10 +1229,11 @@ public final class UTF8String implements Comparable, Externalizable, public int compareTo(@Nonnull final UTF8String other) { int len = Math.min(numBytes, other.numBytes); int wordMax = (len / 8) * 8; - MemoryBlock rbase = other.base; + long roffset = other.offset; + Object rbase = other.base; for (int i = 0; i < wordMax; i += 8) { - long left = base.getLong(i); - long right = rbase.getLong(i); + long left = getLong(base, offset + i); + long right = getLong(rbase, roffset + i); if (left != right) { if (IS_LITTLE_ENDIAN) { return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right)); @@ -1237,7 +1244,7 @@ public final class UTF8String implements Comparable, Externalizable, } for (int i = wordMax; i < len; i++) { // In UTF-8, the byte should be unsigned, so we should compare them as unsigned int. - int res = (getByte(i) & 0xFF) - (rbase.getByte(i) & 0xFF); + int res = (getByte(i) & 0xFF) - (Platform.getByte(rbase, roffset + i) & 0xFF); if (res != 0) { return res; } @@ -1256,7 +1263,7 @@ public final class UTF8String implements Comparable, Externalizable, if (numBytes != o.numBytes) { return false; } - return ByteArrayMethods.arrayEqualsBlock(base, 0, o.base, 0, numBytes); + return ByteArrayMethods.arrayEquals(base, offset, o.base, o.offset, numBytes); } else { return false; } @@ -1312,8 +1319,8 @@ public final class UTF8String implements Comparable, Externalizable, num_bytes_j != numBytesForFirstByte(s.getByte(i_bytes))) { cost = 1; } else { - cost = (ByteArrayMethods.arrayEqualsBlock(t.base, j_bytes, s.base, - i_bytes, num_bytes_j)) ? 0 : 1; + cost = (ByteArrayMethods.arrayEquals(t.base, t.offset + j_bytes, s.base, + s.offset + i_bytes, num_bytes_j)) ? 0 : 1; } d[i + 1] = Math.min(Math.min(d[i] + 1, p[i + 1] + 1), p[i] + cost); } @@ -1328,7 +1335,7 @@ public final class UTF8String implements Comparable, Externalizable, @Override public int hashCode() { - return Murmur3_x86_32.hashUnsafeBytesBlock(base,42); + return Murmur3_x86_32.hashUnsafeBytes(base, offset, numBytes, 42); } /** @@ -1391,10 +1398,10 @@ public final class UTF8String implements Comparable, Externalizable, } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + offset = BYTE_ARRAY_OFFSET; numBytes = in.readInt(); - byte[] bytes = new byte[numBytes]; - in.readFully(bytes); - base = ByteArrayMemoryBlock.fromArray(bytes); + base = new byte[numBytes]; + in.readFully((byte[]) base); } @Override @@ -1406,10 +1413,10 @@ public final class UTF8String implements Comparable, Externalizable, @Override public void read(Kryo kryo, Input in) { - numBytes = in.readInt(); - byte[] bytes = new byte[numBytes]; - in.read(bytes); - base = ByteArrayMemoryBlock.fromArray(bytes); + this.offset = BYTE_ARRAY_OFFSET; + this.numBytes = in.readInt(); + this.base = new byte[numBytes]; + in.read((byte[]) base); } } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java index 583a148b38..3ad9ac7b4d 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java @@ -81,7 +81,7 @@ public class PlatformUtilSuite { MemoryAllocator.HEAP.free(block); Assert.assertNull(block.getBaseObject()); Assert.assertEquals(0, block.getBaseOffset()); - Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber()); + Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber); } @Test @@ -92,7 +92,7 @@ public class PlatformUtilSuite { MemoryAllocator.UNSAFE.free(block); Assert.assertNull(block.getBaseObject()); Assert.assertEquals(0, block.getBaseOffset()); - Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber()); + Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber); } @Test(expected = AssertionError.class) diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java index 8c2e98c2bf..fb8e53b334 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java @@ -20,13 +20,14 @@ package org.apache.spark.unsafe.array; import org.junit.Assert; import org.junit.Test; -import org.apache.spark.unsafe.memory.OnHeapMemoryBlock; +import org.apache.spark.unsafe.memory.MemoryBlock; public class LongArraySuite { @Test public void basicTest() { - LongArray arr = new LongArray(new OnHeapMemoryBlock(16)); + long[] bytes = new long[2]; + LongArray arr = new LongArray(MemoryBlock.fromLongArray(bytes)); arr.set(0, 1L); arr.set(1, 2L); arr.set(1, 3L); diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java index d989877172..6348a73bf3 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java @@ -70,24 +70,6 @@ public class Murmur3_x86_32Suite { Murmur3_x86_32.hashUnsafeBytes2(tes, Platform.BYTE_ARRAY_OFFSET, tes.length, 0)); } - @Test - public void testKnownWordsInputs() { - byte[] bytes = new byte[16]; - long offset = Platform.BYTE_ARRAY_OFFSET; - for (int i = 0; i < 16; i++) { - bytes[i] = 0; - } - Assert.assertEquals(-300363099, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42)); - for (int i = 0; i < 16; i++) { - bytes[i] = -1; - } - Assert.assertEquals(-1210324667, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42)); - for (int i = 0; i < 16; i++) { - bytes[i] = (byte)i; - } - Assert.assertEquals(-634919701, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42)); - } - @Test public void randomizedStressTest() { int size = 65536; diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java deleted file mode 100644 index ef5ff8ee70..0000000000 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.unsafe.memory; - -import org.apache.spark.unsafe.Platform; -import org.junit.Assert; -import org.junit.Test; - -import java.nio.ByteOrder; - -import static org.hamcrest.core.StringContains.containsString; - -public class MemoryBlockSuite { - private static final boolean bigEndianPlatform = - ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); - - private void check(MemoryBlock memory, Object obj, long offset, int length) { - memory.setPageNumber(1); - memory.fill((byte)-1); - memory.putBoolean(0, true); - memory.putByte(1, (byte)127); - memory.putShort(2, (short)257); - memory.putInt(4, 0x20000002); - memory.putLong(8, 0x1234567089ABCDEFL); - memory.putFloat(16, 1.0F); - memory.putLong(20, 0x1234567089ABCDEFL); - memory.putDouble(28, 2.0); - MemoryBlock.copyMemory(memory, 0L, memory, 36, 4); - int[] a = new int[2]; - a[0] = 0x12345678; - a[1] = 0x13579BDF; - memory.copyFrom(a, Platform.INT_ARRAY_OFFSET, 40, 8); - byte[] b = new byte[8]; - memory.writeTo(40, b, Platform.BYTE_ARRAY_OFFSET, 8); - - Assert.assertEquals(obj, memory.getBaseObject()); - Assert.assertEquals(offset, memory.getBaseOffset()); - Assert.assertEquals(length, memory.size()); - Assert.assertEquals(1, memory.getPageNumber()); - Assert.assertEquals(true, memory.getBoolean(0)); - Assert.assertEquals((byte)127, memory.getByte(1 )); - Assert.assertEquals((short)257, memory.getShort(2)); - Assert.assertEquals(0x20000002, memory.getInt(4)); - Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(8)); - Assert.assertEquals(1.0F, memory.getFloat(16), 0); - Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(20)); - Assert.assertEquals(2.0, memory.getDouble(28), 0); - Assert.assertEquals(true, memory.getBoolean(36)); - Assert.assertEquals((byte)127, memory.getByte(37 )); - Assert.assertEquals((short)257, memory.getShort(38)); - Assert.assertEquals(a[0], memory.getInt(40)); - Assert.assertEquals(a[1], memory.getInt(44)); - if (bigEndianPlatform) { - Assert.assertEquals(a[0], - ((int)b[0] & 0xff) << 24 | ((int)b[1] & 0xff) << 16 | - ((int)b[2] & 0xff) << 8 | ((int)b[3] & 0xff)); - Assert.assertEquals(a[1], - ((int)b[4] & 0xff) << 24 | ((int)b[5] & 0xff) << 16 | - ((int)b[6] & 0xff) << 8 | ((int)b[7] & 0xff)); - } else { - Assert.assertEquals(a[0], - ((int)b[3] & 0xff) << 24 | ((int)b[2] & 0xff) << 16 | - ((int)b[1] & 0xff) << 8 | ((int)b[0] & 0xff)); - Assert.assertEquals(a[1], - ((int)b[7] & 0xff) << 24 | ((int)b[6] & 0xff) << 16 | - ((int)b[5] & 0xff) << 8 | ((int)b[4] & 0xff)); - } - for (int i = 48; i < memory.size(); i++) { - Assert.assertEquals((byte) -1, memory.getByte(i)); - } - - assert(memory.subBlock(0, memory.size()) == memory); - - try { - memory.subBlock(-8, 8); - Assert.fail(); - } catch (Exception expected) { - Assert.assertThat(expected.getMessage(), containsString("non-negative")); - } - - try { - memory.subBlock(0, -8); - Assert.fail(); - } catch (Exception expected) { - Assert.assertThat(expected.getMessage(), containsString("non-negative")); - } - - try { - memory.subBlock(0, length + 8); - Assert.fail(); - } catch (Exception expected) { - Assert.assertThat(expected.getMessage(), containsString("should not be larger than")); - } - - try { - memory.subBlock(8, length - 4); - Assert.fail(); - } catch (Exception expected) { - Assert.assertThat(expected.getMessage(), containsString("should not be larger than")); - } - - try { - memory.subBlock(length + 8, 4); - Assert.fail(); - } catch (Exception expected) { - Assert.assertThat(expected.getMessage(), containsString("should not be larger than")); - } - - memory.setPageNumber(MemoryBlock.NO_PAGE_NUMBER); - } - - @Test - public void testByteArrayMemoryBlock() { - byte[] obj = new byte[56]; - long offset = Platform.BYTE_ARRAY_OFFSET; - int length = obj.length; - - MemoryBlock memory = new ByteArrayMemoryBlock(obj, offset, length); - check(memory, obj, offset, length); - - memory = ByteArrayMemoryBlock.fromArray(obj); - check(memory, obj, offset, length); - - obj = new byte[112]; - memory = new ByteArrayMemoryBlock(obj, offset, length); - check(memory, obj, offset, length); - } - - @Test - public void testOnHeapMemoryBlock() { - long[] obj = new long[7]; - long offset = Platform.LONG_ARRAY_OFFSET; - int length = obj.length * 8; - - MemoryBlock memory = new OnHeapMemoryBlock(obj, offset, length); - check(memory, obj, offset, length); - - memory = OnHeapMemoryBlock.fromArray(obj); - check(memory, obj, offset, length); - - obj = new long[14]; - memory = new OnHeapMemoryBlock(obj, offset, length); - check(memory, obj, offset, length); - } - - @Test - public void testOffHeapArrayMemoryBlock() { - MemoryAllocator memoryAllocator = new UnsafeMemoryAllocator(); - MemoryBlock memory = memoryAllocator.allocate(56); - Object obj = memory.getBaseObject(); - long offset = memory.getBaseOffset(); - int length = 56; - - check(memory, obj, offset, length); - memoryAllocator.free(memory); - - long address = Platform.allocateMemory(112); - memory = new OffHeapMemoryBlock(address, length); - obj = memory.getBaseObject(); - offset = memory.getBaseOffset(); - check(memory, obj, offset, length); - Platform.freeMemory(address); - } -} diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java index 42dda30480..dae13f03b0 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java @@ -25,8 +25,7 @@ import java.nio.charset.StandardCharsets; import java.util.*; import com.google.common.collect.ImmutableMap; -import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock; -import org.apache.spark.unsafe.memory.OnHeapMemoryBlock; +import org.apache.spark.unsafe.Platform; import org.junit.Test; import static org.junit.Assert.*; @@ -513,6 +512,21 @@ public class UTF8StringSuite { assertEquals(fromString("世界千世").soundex(), fromString("世界千世")); } + @Test + public void writeToOutputStreamUnderflow() throws IOException { + // offset underflow is apparently supported? + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); + + for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { + UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + .writeTo(outputStream); + final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length); + assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString()); + outputStream.reset(); + } + } + @Test public void writeToOutputStreamSlice() throws IOException { final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); @@ -520,7 +534,7 @@ public class UTF8StringSuite { for (int i = 0; i < test.length; ++i) { for (int j = 0; j < test.length - i; ++j) { - new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(i, j)) + UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET + i, j) .writeTo(outputStream); assertArrayEquals(Arrays.copyOfRange(test, i, i + j), outputStream.toByteArray()); @@ -551,7 +565,7 @@ public class UTF8StringSuite { for (final long offset : offsets) { try { - new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(offset, test.length)) + fromAddress(test, BYTE_ARRAY_OFFSET + offset, test.length) .writeTo(outputStream); throw new IllegalStateException(Long.toString(offset)); @@ -578,25 +592,26 @@ public class UTF8StringSuite { } @Test - public void writeToOutputStreamLongArray() throws IOException { + public void writeToOutputStreamIntArray() throws IOException { // verify that writes work on objects that are not byte arrays - final ByteBuffer buffer = StandardCharsets.UTF_8.encode("3千大千世界"); + final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大千世界"); buffer.position(0); buffer.order(ByteOrder.nativeOrder()); final int length = buffer.limit(); - assertEquals(16, length); + assertEquals(12, length); - final int longs = length / 8; - final long[] array = new long[longs]; + final int ints = length / 4; + final int[] array = new int[ints]; - for (int i = 0; i < longs; ++i) { - array[i] = buffer.getLong(); + for (int i = 0; i < ints; ++i) { + array[i] = buffer.getInt(); } final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - new UTF8String(OnHeapMemoryBlock.fromArray(array)).writeTo(outputStream); - assertEquals("3千大千世界", outputStream.toString("UTF-8")); + fromAddress(array, Platform.INT_ARRAY_OFFSET, length) + .writeTo(outputStream); + assertEquals("大千世界", outputStream.toString("UTF-8")); } @Test diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 8651a639c0..d07faf1da1 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -311,7 +311,7 @@ public class TaskMemoryManager { // this could trigger spilling to free some pages. return allocatePage(size, consumer); } - page.setPageNumber(pageNumber); + page.pageNumber = pageNumber; pageTable[pageNumber] = page; if (logger.isTraceEnabled()) { logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired); @@ -323,25 +323,25 @@ public class TaskMemoryManager { * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}. */ public void freePage(MemoryBlock page, MemoryConsumer consumer) { - assert (page.getPageNumber() != MemoryBlock.NO_PAGE_NUMBER) : + assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) : "Called freePage() on memory that wasn't allocated with allocatePage()"; - assert (page.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : + assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) : "Called freePage() on a memory block that has already been freed"; - assert (page.getPageNumber() != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) : + assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) : "Called freePage() on a memory block that has already been freed"; - assert(allocatedPages.get(page.getPageNumber())); - pageTable[page.getPageNumber()] = null; + assert(allocatedPages.get(page.pageNumber)); + pageTable[page.pageNumber] = null; synchronized (this) { - allocatedPages.clear(page.getPageNumber()); + allocatedPages.clear(page.pageNumber); } if (logger.isTraceEnabled()) { - logger.trace("Freed page number {} ({} bytes)", page.getPageNumber(), page.size()); + logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size()); } long pageSize = page.size(); // Clear the page number before passing the block to the MemoryAllocator's free(). // Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed // page has been inappropriately directly freed without calling TMM.freePage(). - page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER); + page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER; memoryManager.tungstenMemoryAllocator().free(page); releaseExecutionMemory(pageSize, consumer); } @@ -363,7 +363,7 @@ public class TaskMemoryManager { // relative to the page's base offset; this relative offset will fit in 51 bits. offsetInPage -= page.getBaseOffset(); } - return encodePageNumberAndOffset(page.getPageNumber(), offsetInPage); + return encodePageNumberAndOffset(page.pageNumber, offsetInPage); } @VisibleForTesting @@ -434,7 +434,7 @@ public class TaskMemoryManager { for (MemoryBlock page : pageTable) { if (page != null) { logger.debug("unreleased page: " + page + " in task " + taskAttemptId); - page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER); + page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER; memoryManager.tungstenMemoryAllocator().free(page); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index 4b48599ad3..0d069125dc 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -20,6 +20,7 @@ package org.apache.spark.shuffle.sort; import java.util.Comparator; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.Sorter; @@ -112,7 +113,13 @@ final class ShuffleInMemorySorter { public void expandPointerArray(LongArray newArray) { assert(newArray.size() > array.size()); - MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L); + Platform.copyMemory( + array.getBaseObject(), + array.getBaseOffset(), + newArray.getBaseObject(), + newArray.getBaseOffset(), + pos * 8L + ); consumer.freeArray(array); array = newArray; usableCapacity = getUsableCapacity(); @@ -181,7 +188,10 @@ final class ShuffleInMemorySorter { PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX, PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false); } else { - MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos) * 8L); + MemoryBlock unused = new MemoryBlock( + array.getBaseObject(), + array.getBaseOffset() + pos * 8L, + (array.size() - pos) * 8L); LongArray buffer = new LongArray(unused); Sorter sorter = new Sorter<>(new ShuffleSortDataFormat(buffer)); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java index 254449e954..717bdd79d4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java @@ -17,8 +17,8 @@ package org.apache.spark.shuffle.sort; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.SortDataFormat; final class ShuffleSortDataFormat extends SortDataFormat { @@ -60,8 +60,13 @@ final class ShuffleSortDataFormat extends SortDataFormat sorter = new Sorter<>(new UnsafeSortDataFormat(buffer)); diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index d7d2d0b012..a0664b30d6 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -76,7 +76,7 @@ public class TaskMemoryManagerSuite { final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); final MemoryBlock dataPage = manager.allocatePage(256, c); c.freePage(dataPage); - Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.getPageNumber()); + Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.pageNumber); } @Test(expected = AssertionError.class) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 3e56db5ea1..47173b89e9 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark._ import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.unsafe.array.LongArray -import org.apache.spark.unsafe.memory.OnHeapMemoryBlock +import org.apache.spark.unsafe.memory.MemoryBlock import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat} class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { @@ -105,8 +105,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { // the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999] // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi() val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i } - val buf = new LongArray(OnHeapMemoryBlock.fromArray(ref)) - val tmpBuf = new LongArray(new OnHeapMemoryBlock((size/2) * 8L)) + val buf = new LongArray(MemoryBlock.fromLongArray(ref)) + val tmp = new Array[Long](size/2) + val tmpBuf = new LongArray(MemoryBlock.fromLongArray(tmp)) new Sorter(new UnsafeSortDataFormat(tmpBuf)).sort( buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] { diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala index ddf3740e76..d5956ea320 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala @@ -27,7 +27,7 @@ import com.google.common.primitives.Ints import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.unsafe.array.LongArray -import org.apache.spark.unsafe.memory.OnHeapMemoryBlock +import org.apache.spark.unsafe.memory.MemoryBlock import org.apache.spark.util.collection.Sorter import org.apache.spark.util.random.XORShiftRandom @@ -78,14 +78,14 @@ class RadixSortSuite extends SparkFunSuite with Logging { private def generateTestData(size: Long, rand: => Long): (Array[JLong], LongArray) = { val ref = Array.tabulate[Long](Ints.checkedCast(size)) { i => rand } val extended = ref ++ Array.fill[Long](Ints.checkedCast(size))(0) - (ref.map(i => new JLong(i)), new LongArray(OnHeapMemoryBlock.fromArray(extended))) + (ref.map(i => new JLong(i)), new LongArray(MemoryBlock.fromLongArray(extended))) } private def generateKeyPrefixTestData(size: Long, rand: => Long): (LongArray, LongArray) = { val ref = Array.tabulate[Long](Ints.checkedCast(size * 2)) { i => rand } val extended = ref ++ Array.fill[Long](Ints.checkedCast(size * 2))(0) - (new LongArray(OnHeapMemoryBlock.fromArray(ref)), - new LongArray(OnHeapMemoryBlock.fromArray(extended))) + (new LongArray(MemoryBlock.fromLongArray(ref)), + new LongArray(MemoryBlock.fromLongArray(extended))) } private def collectToArray(array: LongArray, offset: Int, length: Long): Array[Long] = { @@ -110,7 +110,7 @@ class RadixSortSuite extends SparkFunSuite with Logging { } private def referenceKeyPrefixSort(buf: LongArray, lo: Long, hi: Long, refCmp: PrefixComparator) { - val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L)) + val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt))) new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort( buf, Ints.checkedCast(lo), Ints.checkedCast(hi), new Comparator[RecordPointerAndKeyPrefix] { override def compare( diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index dc38ee326e..dc18e1d348 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -29,7 +29,7 @@ import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2Block} +import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils import org.apache.spark.util.collection.OpenHashMap @@ -244,7 +244,8 @@ object FeatureHasher extends DefaultParamsReadable[FeatureHasher] { case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed) case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed) case s: String => - hashUnsafeBytes2Block(UTF8String.fromString(s).getMemoryBlock, seed) + val utf8 = UTF8String.fromString(s) + hashUnsafeBytes2(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed) case _ => throw new SparkException("FeatureHasher with murmur3 algorithm does not " + s"support type ${term.getClass.getCanonicalName} of input data.") } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala index 7b73b286fb..8935c8496c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala @@ -160,7 +160,7 @@ object HashingTF { case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed) case s: String => val utf8 = UTF8String.fromString(s) - hashUnsafeBytesBlock(utf8.getMemoryBlock(), seed) + hashUnsafeBytes(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed) case _ => throw new SparkException("HashingTF with murmur3 algorithm does not " + s"support type ${term.getClass.getCanonicalName} of input data.") } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java index 9e7b15d339..9002abdcfd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java @@ -27,7 +27,6 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.bitset.BitSetMethods; import org.apache.spark.unsafe.hash.Murmur3_x86_32; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -241,8 +240,7 @@ public final class UnsafeArrayData extends ArrayData { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); final int size = (int) offsetAndSize; - MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size); - return new UTF8String(mb); + return UTF8String.fromAddress(baseObject, baseOffset + offset, size); } @Override diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 469b0e60cc..a76e6ef8c9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -37,7 +37,6 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.bitset.BitSetMethods; import org.apache.spark.unsafe.hash.Murmur3_x86_32; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -417,8 +416,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); final int size = (int) offsetAndSize; - MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size); - return new UTF8String(mb); + return UTF8String.fromAddress(baseObject, baseOffset + offset, size); } @Override diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java index 8e9c0a2e9d..eb5051b284 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.catalyst.expressions; -import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.UTF8String; // scalastyle: off @@ -72,13 +72,13 @@ public final class XXH64 { return fmix(hash); } - public long hashUnsafeWordsBlock(MemoryBlock mb) { - return hashUnsafeWordsBlock(mb, seed); + public long hashUnsafeWords(Object base, long offset, int length) { + return hashUnsafeWords(base, offset, length, seed); } - public static long hashUnsafeWordsBlock(MemoryBlock mb, long seed) { - assert (mb.size() % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)"; - long hash = hashBytesByWordsBlock(mb, seed); + public static long hashUnsafeWords(Object base, long offset, int length, long seed) { + assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)"; + long hash = hashBytesByWords(base, offset, length, seed); return fmix(hash); } @@ -86,22 +86,20 @@ public final class XXH64 { return hashUnsafeBytes(base, offset, length, seed); } - public static long hashUnsafeBytesBlock(MemoryBlock mb, long seed) { - long offset = 0; - long length = mb.size(); + public static long hashUnsafeBytes(Object base, long offset, int length, long seed) { assert (length >= 0) : "lengthInBytes cannot be negative"; - long hash = hashBytesByWordsBlock(mb, seed); + long hash = hashBytesByWords(base, offset, length, seed); long end = offset + length; offset += length & -8; if (offset + 4L <= end) { - hash ^= (mb.getInt(offset) & 0xFFFFFFFFL) * PRIME64_1; + hash ^= (Platform.getInt(base, offset) & 0xFFFFFFFFL) * PRIME64_1; hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3; offset += 4L; } while (offset < end) { - hash ^= (mb.getByte(offset) & 0xFFL) * PRIME64_5; + hash ^= (Platform.getByte(base, offset) & 0xFFL) * PRIME64_5; hash = Long.rotateLeft(hash, 11) * PRIME64_1; offset++; } @@ -109,11 +107,7 @@ public final class XXH64 { } public static long hashUTF8String(UTF8String str, long seed) { - return hashUnsafeBytesBlock(str.getMemoryBlock(), seed); - } - - public static long hashUnsafeBytes(Object base, long offset, int length, long seed) { - return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, length), seed); + return hashUnsafeBytes(str.getBaseObject(), str.getBaseOffset(), str.numBytes(), seed); } private static long fmix(long hash) { @@ -125,31 +119,30 @@ public final class XXH64 { return hash; } - private static long hashBytesByWordsBlock(MemoryBlock mb, long seed) { - long offset = 0; - long length = mb.size(); + private static long hashBytesByWords(Object base, long offset, int length, long seed) { + long end = offset + length; long hash; if (length >= 32) { - long limit = length - 32; + long limit = end - 32; long v1 = seed + PRIME64_1 + PRIME64_2; long v2 = seed + PRIME64_2; long v3 = seed; long v4 = seed - PRIME64_1; do { - v1 += mb.getLong(offset) * PRIME64_2; + v1 += Platform.getLong(base, offset) * PRIME64_2; v1 = Long.rotateLeft(v1, 31); v1 *= PRIME64_1; - v2 += mb.getLong(offset + 8) * PRIME64_2; + v2 += Platform.getLong(base, offset + 8) * PRIME64_2; v2 = Long.rotateLeft(v2, 31); v2 *= PRIME64_1; - v3 += mb.getLong(offset + 16) * PRIME64_2; + v3 += Platform.getLong(base, offset + 16) * PRIME64_2; v3 = Long.rotateLeft(v3, 31); v3 *= PRIME64_1; - v4 += mb.getLong(offset + 24) * PRIME64_2; + v4 += Platform.getLong(base, offset + 24) * PRIME64_2; v4 = Long.rotateLeft(v4, 31); v4 *= PRIME64_1; @@ -190,9 +183,9 @@ public final class XXH64 { hash += length; - long limit = length - 8; + long limit = end - 8; while (offset <= limit) { - long k1 = mb.getLong(offset); + long k1 = Platform.getLong(base, offset); hash ^= Long.rotateLeft(k1 * PRIME64_2, 31) * PRIME64_1; hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4; offset += 8L; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java index f8000d78cd..f0f66bae24 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java @@ -19,8 +19,6 @@ package org.apache.spark.sql.catalyst.expressions.codegen; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; -import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.types.UTF8String; /** @@ -31,34 +29,43 @@ public class UTF8StringBuilder { private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; - private ByteArrayMemoryBlock buffer; - private int length = 0; + private byte[] buffer; + private int cursor = Platform.BYTE_ARRAY_OFFSET; public UTF8StringBuilder() { // Since initial buffer size is 16 in `StringBuilder`, we set the same size here - this.buffer = new ByteArrayMemoryBlock(16); + this.buffer = new byte[16]; } // Grows the buffer by at least `neededSize` private void grow(int neededSize) { - if (neededSize > ARRAY_MAX - length) { + if (neededSize > ARRAY_MAX - totalSize()) { throw new UnsupportedOperationException( "Cannot grow internal buffer by size " + neededSize + " because the size after growing " + "exceeds size limitation " + ARRAY_MAX); } - final int requestedSize = length + neededSize; - if (buffer.size() < requestedSize) { - int newLength = requestedSize < ARRAY_MAX / 2 ? requestedSize * 2 : ARRAY_MAX; - final ByteArrayMemoryBlock tmp = new ByteArrayMemoryBlock(newLength); - MemoryBlock.copyMemory(buffer, tmp, length); + final int length = totalSize() + neededSize; + if (buffer.length < length) { + int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX; + final byte[] tmp = new byte[newLength]; + Platform.copyMemory( + buffer, + Platform.BYTE_ARRAY_OFFSET, + tmp, + Platform.BYTE_ARRAY_OFFSET, + totalSize()); buffer = tmp; } } + private int totalSize() { + return cursor - Platform.BYTE_ARRAY_OFFSET; + } + public void append(UTF8String value) { grow(value.numBytes()); - value.writeToMemory(buffer.getByteArray(), length + Platform.BYTE_ARRAY_OFFSET); - length += value.numBytes(); + value.writeToMemory(buffer, cursor); + cursor += value.numBytes(); } public void append(String value) { @@ -66,6 +73,6 @@ public class UTF8StringBuilder { } public UTF8String build() { - return UTF8String.fromBytes(buffer.getByteArray(), 0, length); + return UTF8String.fromBytes(buffer, 0, totalSize()); } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index a754e87a17..742a4f87a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 -import org.apache.spark.unsafe.memory.MemoryBlock import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -362,7 +361,10 @@ abstract class HashExpression[E] extends Expression { } protected def genHashString(input: String, result: String): String = { - s"$result = $hasherClassName.hashUTF8String($input, $result);" + val baseObject = s"$input.getBaseObject()" + val baseOffset = s"$input.getBaseOffset()" + val numBytes = s"$input.numBytes()" + s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);" } protected def genHashForMap( @@ -469,8 +471,6 @@ abstract class InterpretedHashFunction { protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long - protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long - /** * Computes hash of a given `value` of type `dataType`. The caller needs to check the validity * of input `value`. @@ -496,7 +496,8 @@ abstract class InterpretedHashFunction { case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed)) case a: Array[Byte] => hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed) - case s: UTF8String => hashUnsafeBytesBlock(s.getMemoryBlock(), seed) + case s: UTF8String => + hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed) case array: ArrayData => val elementType = dataType match { @@ -583,15 +584,9 @@ object Murmur3HashFunction extends InterpretedHashFunction { Murmur3_x86_32.hashLong(l, seed.toInt) } - override protected def hashUnsafeBytes( - base: AnyRef, offset: Long, len: Int, seed: Long): Long = { + override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = { Murmur3_x86_32.hashUnsafeBytes(base, offset, len, seed.toInt) } - - override protected def hashUnsafeBytesBlock( - base: MemoryBlock, seed: Long): Long = { - Murmur3_x86_32.hashUnsafeBytesBlock(base, seed.toInt) - } } /** @@ -616,14 +611,9 @@ object XxHash64Function extends InterpretedHashFunction { override protected def hashLong(l: Long, seed: Long): Long = XXH64.hashLong(l, seed) - override protected def hashUnsafeBytes( - base: AnyRef, offset: Long, len: Int, seed: Long): Long = { + override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = { XXH64.hashUnsafeBytes(base, offset, len, seed) } - - override protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long = { - XXH64.hashUnsafeBytesBlock(base, seed) - } } /** @@ -730,7 +720,10 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { """ override protected def genHashString(input: String, result: String): String = { - s"$result = $hasherClassName.hashUTF8String($input);" + val baseObject = s"$input.getBaseObject()" + val baseOffset = s"$input.getBaseOffset()" + val numBytes = s"$input.numBytes()" + s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);" } override protected def genHashForArray( @@ -824,14 +817,10 @@ object HiveHashFunction extends InterpretedHashFunction { HiveHasher.hashLong(l) } - override protected def hashUnsafeBytes( - base: AnyRef, offset: Long, len: Int, seed: Long): Long = { + override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = { HiveHasher.hashUnsafeBytes(base, offset, len) } - override protected def hashUnsafeBytesBlock( - base: MemoryBlock, seed: Long): Long = HiveHasher.hashUnsafeBytesBlock(base) - private val HIVE_DECIMAL_MAX_PRECISION = 38 private val HIVE_DECIMAL_MAX_SCALE = 38 diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java index 76930f9368..b67c6f3e6e 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java @@ -17,8 +17,7 @@ package org.apache.spark.sql.catalyst.expressions; -import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock; -import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.UTF8String; import org.junit.Assert; import org.junit.Test; @@ -54,7 +53,7 @@ public class HiveHasherSuite { for (int i = 0; i < inputs.length; i++) { UTF8String s = UTF8String.fromString("val_" + inputs[i]); - int hash = HiveHasher.hashUnsafeBytesBlock(s.getMemoryBlock()); + int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes()); Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash)); } } @@ -90,13 +89,13 @@ public class HiveHasherSuite { int byteArrSize = rand.nextInt(100) * 8; byte[] bytes = new byte[byteArrSize]; rand.nextBytes(bytes); - MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes); Assert.assertEquals( - HiveHasher.hashUnsafeBytesBlock(mb), - HiveHasher.hashUnsafeBytesBlock(mb)); + HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); - hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb)); + hashcodes.add(HiveHasher.hashUnsafeBytes( + bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); } // A very loose bound. @@ -113,13 +112,13 @@ public class HiveHasherSuite { byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8); byte[] paddedBytes = new byte[byteArrSize]; System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length); - MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes); Assert.assertEquals( - HiveHasher.hashUnsafeBytesBlock(mb), - HiveHasher.hashUnsafeBytesBlock(mb)); + HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); - hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb)); + hashcodes.add(HiveHasher.hashUnsafeBytes( + paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); } // A very loose bound. diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java index cd8bce623c..1baee91b34 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java @@ -24,8 +24,6 @@ import java.util.Random; import java.util.Set; import org.apache.spark.unsafe.Platform; -import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.junit.Assert; import org.junit.Test; @@ -144,13 +142,13 @@ public class XXH64Suite { int byteArrSize = rand.nextInt(100) * 8; byte[] bytes = new byte[byteArrSize]; rand.nextBytes(bytes); - MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes); Assert.assertEquals( - hasher.hashUnsafeWordsBlock(mb), - hasher.hashUnsafeWordsBlock(mb)); + hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); - hashcodes.add(hasher.hashUnsafeWordsBlock(mb)); + hashcodes.add(hasher.hashUnsafeWords( + bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); } // A very loose bound. @@ -167,13 +165,13 @@ public class XXH64Suite { byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8); byte[] paddedBytes = new byte[byteArrSize]; System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length); - MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes); Assert.assertEquals( - hasher.hashUnsafeWordsBlock(mb), - hasher.hashUnsafeWordsBlock(mb)); + hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize), + hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); - hashcodes.add(hasher.hashUnsafeWordsBlock(mb)); + hashcodes.add(hasher.hashUnsafeWords( + paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize)); } // A very loose bound. diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 6fdadde628..5e0cf7d370 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; -import org.apache.spark.unsafe.memory.OffHeapMemoryBlock; import org.apache.spark.unsafe.types.UTF8String; /** @@ -207,7 +206,7 @@ public final class OffHeapColumnVector extends WritableColumnVector { @Override protected UTF8String getBytesAsUTF8String(int rowId, int count) { - return new UTF8String(new OffHeapMemoryBlock(data + rowId, count)); + return UTF8String.fromAddress(null, data + rowId, count); } // diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index 1c9beda404..5f58b031f6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -25,7 +25,6 @@ import org.apache.arrow.vector.holders.NullableVarCharHolder; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.execution.arrow.ArrowUtils; import org.apache.spark.sql.types.*; -import org.apache.spark.unsafe.memory.OffHeapMemoryBlock; import org.apache.spark.unsafe.types.UTF8String; /** @@ -378,10 +377,9 @@ public final class ArrowColumnVector extends ColumnVector { if (stringResult.isSet == 0) { return null; } else { - return new UTF8String(new OffHeapMemoryBlock( + return UTF8String.fromAddress(null, stringResult.buffer.memoryAddress() + stringResult.start, - stringResult.end - stringResult.start - )); + stringResult.end - stringResult.start); } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala index 470b93efd1..50ae26a3ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark import java.util.{Arrays, Comparator} import org.apache.spark.unsafe.array.LongArray -import org.apache.spark.unsafe.memory.OnHeapMemoryBlock +import org.apache.spark.unsafe.memory.MemoryBlock import org.apache.spark.util.Benchmark import org.apache.spark.util.collection.Sorter import org.apache.spark.util.collection.unsafe.sort._ @@ -36,7 +36,7 @@ import org.apache.spark.util.random.XORShiftRandom class SortBenchmark extends BenchmarkBase { private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) { - val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L)) + val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt))) new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort( buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] { override def compare( @@ -50,8 +50,8 @@ class SortBenchmark extends BenchmarkBase { private def generateKeyPrefixTestData(size: Int, rand: => Long): (LongArray, LongArray) = { val ref = Array.tabulate[Long](size * 2) { i => rand } val extended = ref ++ Array.fill[Long](size * 2)(0) - (new LongArray(OnHeapMemoryBlock.fromArray(ref)), - new LongArray(OnHeapMemoryBlock.fromArray(extended))) + (new LongArray(MemoryBlock.fromLongArray(ref)), + new LongArray(MemoryBlock.fromLongArray(extended))) } ignore("sort") { @@ -60,7 +60,7 @@ class SortBenchmark extends BenchmarkBase { val benchmark = new Benchmark("radix sort " + size, size) benchmark.addTimerCase("reference TimSort key prefix array") { timer => val array = Array.tabulate[Long](size * 2) { i => rand.nextLong } - val buf = new LongArray(OnHeapMemoryBlock.fromArray(array)) + val buf = new LongArray(MemoryBlock.fromLongArray(array)) timer.startTiming() referenceKeyPrefixSort(buf, 0, size, PrefixComparators.BINARY) timer.stopTiming() @@ -78,7 +78,7 @@ class SortBenchmark extends BenchmarkBase { array(i) = rand.nextLong & 0xff i += 1 } - val buf = new LongArray(OnHeapMemoryBlock.fromArray(array)) + val buf = new LongArray(MemoryBlock.fromLongArray(array)) timer.startTiming() RadixSort.sort(buf, size, 0, 7, false, false) timer.stopTiming() @@ -90,7 +90,7 @@ class SortBenchmark extends BenchmarkBase { array(i) = rand.nextLong & 0xffff i += 1 } - val buf = new LongArray(OnHeapMemoryBlock.fromArray(array)) + val buf = new LongArray(MemoryBlock.fromLongArray(array)) timer.startTiming() RadixSort.sort(buf, size, 0, 7, false, false) timer.stopTiming() @@ -102,7 +102,7 @@ class SortBenchmark extends BenchmarkBase { array(i) = rand.nextLong i += 1 } - val buf = new LongArray(OnHeapMemoryBlock.fromArray(array)) + val buf = new LongArray(MemoryBlock.fromLongArray(array)) timer.startTiming() RadixSort.sort(buf, size, 0, 7, false, false) timer.stopTiming() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala index 25ee95daa0..ffda33cf90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala @@ -22,13 +22,13 @@ import java.io.File import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager} import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.unsafe.memory.OnHeapMemoryBlock +import org.apache.spark.unsafe.memory.MemoryBlock import org.apache.spark.util.Utils class RowQueueSuite extends SparkFunSuite { test("in-memory queue") { - val page = new OnHeapMemoryBlock((1<<10) * 8L) + val page = MemoryBlock.fromLongArray(new Array[Long](1<<10)) val queue = new InMemoryRowQueue(page, 1) { override def close() {} }