[SPARK-9867] [SQL] Move utilities for binary data into ByteArray

The utilities such as Substring#substringBinarySQL and BinaryPrefixComparator#computePrefix for binary data are put together in ByteArray for easy-to-read.

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #8122 from maropu/CleanUpForBinaryType.
This commit is contained in:
Takeshi YAMAMURO 2015-10-01 21:33:27 -04:00 committed by Reynold Xin
parent 01cd688f52
commit 2272962eb0
3 changed files with 52 additions and 51 deletions

View file

@ -21,6 +21,7 @@ import com.google.common.primitives.UnsignedLongs;
import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.Utils;
@ -62,21 +63,7 @@ public class PrefixComparators {
}
public static long computePrefix(byte[] bytes) {
if (bytes == null) {
return 0L;
} else {
/**
* TODO: If a wrapper for BinaryType is created (SPARK-8786),
* these codes below will be in the wrapper class.
*/
final int minLen = Math.min(bytes.length, 8);
long p = 0;
for (int i = 0; i < minLen; ++i) {
p |= (128L + Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i))
<< (56 - 8 * i);
}
return p;
}
return ByteArray.getPrefix(bytes);
}
}

View file

@ -18,14 +18,12 @@
package org.apache.spark.sql.catalyst.expressions
import java.text.DecimalFormat
import java.util.Arrays
import java.util.{Map => JMap, HashMap}
import java.util.Locale
import java.util.{HashMap, Locale, Map => JMap}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{ByteArray, UTF8String}
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines expressions for string operations.
@ -690,34 +688,6 @@ case class StringSpace(child: Expression)
override def prettyName: String = "space"
}
object Substring {
def subStringBinarySQL(bytes: Array[Byte], pos: Int, len: Int): Array[Byte] = {
if (pos > bytes.length) {
return Array[Byte]()
}
var start = if (pos > 0) {
pos - 1
} else if (pos < 0) {
bytes.length + pos
} else {
0
}
val end = if ((bytes.length - start) < len) {
bytes.length
} else {
start + len
}
start = Math.max(start, 0) // underflow
if (start < end) {
Arrays.copyOfRange(bytes, start, end)
} else {
Array[Byte]()
}
}
}
/**
* A function that takes a substring of its first argument starting at a given position.
* Defined for String and Binary types.
@ -740,18 +710,17 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
str.dataType match {
case StringType => string.asInstanceOf[UTF8String]
.substringSQL(pos.asInstanceOf[Int], len.asInstanceOf[Int])
case BinaryType => Substring.subStringBinarySQL(string.asInstanceOf[Array[Byte]],
case BinaryType => ByteArray.subStringSQL(string.asInstanceOf[Array[Byte]],
pos.asInstanceOf[Int], len.asInstanceOf[Int])
}
}
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val cls = classOf[Substring].getName
defineCodeGen(ctx, ev, (string, pos, len) => {
str.dataType match {
case StringType => s"$string.substringSQL($pos, $len)"
case BinaryType => s"$cls.subStringBinarySQL($string, $pos, $len)"
case BinaryType => s"${classOf[ByteArray].getName}.subStringSQL($string, $pos, $len)"
}
})
}

View file

@ -19,7 +19,11 @@ package org.apache.spark.unsafe.types;
import org.apache.spark.unsafe.Platform;
public class ByteArray {
import java.util.Arrays;
public final class ByteArray {
public static final byte[] EMPTY_BYTE = new byte[0];
/**
* Writes the content of a byte array into a memory address, identified by an object and an
@ -29,4 +33,45 @@ public class ByteArray {
public static void writeToMemory(byte[] src, Object target, long targetOffset) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET, target, targetOffset, src.length);
}
/**
* Returns a 64-bit integer that can be used as the prefix used in sorting.
*/
public static long getPrefix(byte[] bytes) {
if (bytes == null) {
return 0L;
} else {
final int minLen = Math.min(bytes.length, 8);
long p = 0;
for (int i = 0; i < minLen; ++i) {
p |= (128L + Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i))
<< (56 - 8 * i);
}
return p;
}
}
public static byte[] subStringSQL(byte[] bytes, int pos, int len) {
// This pos calculation is according to UTF8String#subStringSQL
if (pos > bytes.length) {
return EMPTY_BYTE;
}
int start = 0;
int end;
if (pos > 0) {
start = pos - 1;
} else if (pos < 0) {
start = bytes.length + pos;
}
if ((bytes.length - start) < len) {
end = bytes.length;
} else {
end = start + len;
}
start = Math.max(start, 0); // underflow
if (start >= end) {
return EMPTY_BYTE;
}
return Arrays.copyOfRange(bytes, start, end);
}
}