[SPARK-23762][SQL] UTF8StringBuffer uses MemoryBlock
## What changes were proposed in this pull request? This PR tries to use `MemoryBlock` in `UTF8StringBuffer`. In general, there are two advantages to use `MemoryBlock`. 1. Has clean API calls rather than using a Java array or `PlatformMemory` 2. Improve runtime performance of memory access instead of using `Object`. ## How was this patch tested? Added `UTF8StringBufferSuite` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #20871 from kiszk/SPARK-23762.
This commit is contained in:
parent
6a2289ecf0
commit
0b19122d43
|
@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen;
|
||||||
|
|
||||||
import org.apache.spark.unsafe.Platform;
|
import org.apache.spark.unsafe.Platform;
|
||||||
import org.apache.spark.unsafe.array.ByteArrayMethods;
|
import org.apache.spark.unsafe.array.ByteArrayMethods;
|
||||||
|
import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
|
||||||
|
import org.apache.spark.unsafe.memory.MemoryBlock;
|
||||||
import org.apache.spark.unsafe.types.UTF8String;
|
import org.apache.spark.unsafe.types.UTF8String;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -29,43 +31,34 @@ public class UTF8StringBuilder {
|
||||||
|
|
||||||
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
|
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
|
||||||
|
|
||||||
private byte[] buffer;
|
private ByteArrayMemoryBlock buffer;
|
||||||
private int cursor = Platform.BYTE_ARRAY_OFFSET;
|
private int length = 0;
|
||||||
|
|
||||||
public UTF8StringBuilder() {
|
public UTF8StringBuilder() {
|
||||||
// Since initial buffer size is 16 in `StringBuilder`, we set the same size here
|
// Since initial buffer size is 16 in `StringBuilder`, we set the same size here
|
||||||
this.buffer = new byte[16];
|
this.buffer = new ByteArrayMemoryBlock(16);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Grows the buffer by at least `neededSize`
|
// Grows the buffer by at least `neededSize`
|
||||||
private void grow(int neededSize) {
|
private void grow(int neededSize) {
|
||||||
if (neededSize > ARRAY_MAX - totalSize()) {
|
if (neededSize > ARRAY_MAX - length) {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
|
"Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
|
||||||
"exceeds size limitation " + ARRAY_MAX);
|
"exceeds size limitation " + ARRAY_MAX);
|
||||||
}
|
}
|
||||||
final int length = totalSize() + neededSize;
|
final int requestedSize = length + neededSize;
|
||||||
if (buffer.length < length) {
|
if (buffer.size() < requestedSize) {
|
||||||
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
|
int newLength = requestedSize < ARRAY_MAX / 2 ? requestedSize * 2 : ARRAY_MAX;
|
||||||
final byte[] tmp = new byte[newLength];
|
final ByteArrayMemoryBlock tmp = new ByteArrayMemoryBlock(newLength);
|
||||||
Platform.copyMemory(
|
MemoryBlock.copyMemory(buffer, tmp, length);
|
||||||
buffer,
|
|
||||||
Platform.BYTE_ARRAY_OFFSET,
|
|
||||||
tmp,
|
|
||||||
Platform.BYTE_ARRAY_OFFSET,
|
|
||||||
totalSize());
|
|
||||||
buffer = tmp;
|
buffer = tmp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int totalSize() {
|
|
||||||
return cursor - Platform.BYTE_ARRAY_OFFSET;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void append(UTF8String value) {
|
public void append(UTF8String value) {
|
||||||
grow(value.numBytes());
|
grow(value.numBytes());
|
||||||
value.writeToMemory(buffer, cursor);
|
value.writeToMemory(buffer.getByteArray(), length + Platform.BYTE_ARRAY_OFFSET);
|
||||||
cursor += value.numBytes();
|
length += value.numBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void append(String value) {
|
public void append(String value) {
|
||||||
|
@ -73,6 +66,6 @@ public class UTF8StringBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
public UTF8String build() {
|
public UTF8String build() {
|
||||||
return UTF8String.fromBytes(buffer, 0, totalSize());
|
return UTF8String.fromBytes(buffer.getByteArray(), 0, length);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.sql.catalyst.expressions.codegen
|
||||||
|
|
||||||
|
import org.apache.spark.SparkFunSuite
|
||||||
|
import org.apache.spark.unsafe.types.UTF8String
|
||||||
|
|
||||||
|
class UTF8StringBuilderSuite extends SparkFunSuite {
|
||||||
|
|
||||||
|
test("basic test") {
|
||||||
|
val sb = new UTF8StringBuilder()
|
||||||
|
assert(sb.build() === UTF8String.EMPTY_UTF8)
|
||||||
|
|
||||||
|
sb.append("")
|
||||||
|
assert(sb.build() === UTF8String.EMPTY_UTF8)
|
||||||
|
|
||||||
|
sb.append("abcd")
|
||||||
|
assert(sb.build() === UTF8String.fromString("abcd"))
|
||||||
|
|
||||||
|
sb.append(UTF8String.fromString("1234"))
|
||||||
|
assert(sb.build() === UTF8String.fromString("abcd1234"))
|
||||||
|
|
||||||
|
// expect to grow an internal buffer
|
||||||
|
sb.append(UTF8String.fromString("efgijk567890"))
|
||||||
|
assert(sb.build() === UTF8String.fromString("abcd1234efgijk567890"))
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue