[SPARK-2190][SQL] Specialized ColumnType for Timestamp
JIRA issue: [SPARK-2190](https://issues.apache.org/jira/browse/SPARK-2190) Added specialized in-memory column type for `Timestamp`. Whitelisted all timestamp related Hive tests except `timestamp_udf`, which is timezone sensitive. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1440 from liancheng/timestamp-column-type and squashes the following commits: e682175 [Cheng Lian] Enabled more timezone sensitive Hive tests. 53a358f [Cheng Lian] Fixed failed test suites 01b592d [Cheng Lian] Fixed SimpleDateFormat thread safety issue 2a59343 [Cheng Lian] Removed timezone sensitive Hive timestamp tests 45dd05d [Cheng Lian] Added Timestamp specific in-memory columnar representation
This commit is contained in:
parent
db56f2df1b
commit
cd273a2381
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -19,6 +19,7 @@ conf/spark-env.sh
|
||||||
conf/streaming-env.sh
|
conf/streaming-env.sh
|
||||||
conf/log4j.properties
|
conf/log4j.properties
|
||||||
conf/spark-defaults.conf
|
conf/spark-defaults.conf
|
||||||
|
conf/hive-site.xml
|
||||||
docs/_site
|
docs/_site
|
||||||
docs/api
|
docs/api
|
||||||
target/
|
target/
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.spark.sql.catalyst.expressions
|
package org.apache.spark.sql.catalyst.expressions
|
||||||
|
|
||||||
import java.sql.Timestamp
|
import java.sql.Timestamp
|
||||||
|
import java.text.{DateFormat, SimpleDateFormat}
|
||||||
|
|
||||||
import org.apache.spark.sql.catalyst.types._
|
import org.apache.spark.sql.catalyst.types._
|
||||||
|
|
||||||
|
@ -41,6 +42,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
|
||||||
// UDFToString
|
// UDFToString
|
||||||
private[this] def castToString: Any => Any = child.dataType match {
|
private[this] def castToString: Any => Any = child.dataType match {
|
||||||
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
|
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
|
||||||
|
case TimestampType => buildCast[Timestamp](_, timestampToString)
|
||||||
case _ => buildCast[Any](_, _.toString)
|
case _ => buildCast[Any](_, _.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,6 +128,18 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
|
||||||
ts.getTime / 1000 + ts.getNanos.toDouble / 1000000000
|
ts.getTime / 1000 + ts.getNanos.toDouble / 1000000000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Converts Timestamp to string according to Hive TimestampWritable convention
|
||||||
|
private[this] def timestampToString(ts: Timestamp): String = {
|
||||||
|
val timestampString = ts.toString
|
||||||
|
val formatted = Cast.threadLocalDateFormat.get.format(ts)
|
||||||
|
|
||||||
|
if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
|
||||||
|
formatted + timestampString.substring(19)
|
||||||
|
} else {
|
||||||
|
formatted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private[this] def castToLong: Any => Any = child.dataType match {
|
private[this] def castToLong: Any => Any = child.dataType match {
|
||||||
case StringType =>
|
case StringType =>
|
||||||
buildCast[String](_, s => try s.toLong catch {
|
buildCast[String](_, s => try s.toLong catch {
|
||||||
|
@ -249,3 +263,12 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
|
||||||
if (evaluated == null) null else cast(evaluated)
|
if (evaluated == null) null else cast(evaluated)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object Cast {
|
||||||
|
// `SimpleDateFormat` is not thread-safe.
|
||||||
|
private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
|
||||||
|
override def initialValue() = {
|
||||||
|
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -158,7 +158,7 @@ class ExpressionEvaluationSuite extends FunSuite {
|
||||||
checkEvaluation("abc" like regEx, true, new GenericRow(Array[Any]("a%")))
|
checkEvaluation("abc" like regEx, true, new GenericRow(Array[Any]("a%")))
|
||||||
checkEvaluation("abc" like regEx, false, new GenericRow(Array[Any]("b%")))
|
checkEvaluation("abc" like regEx, false, new GenericRow(Array[Any]("b%")))
|
||||||
checkEvaluation("abc" like regEx, false, new GenericRow(Array[Any]("bc%")))
|
checkEvaluation("abc" like regEx, false, new GenericRow(Array[Any]("bc%")))
|
||||||
|
|
||||||
checkEvaluation(Literal(null, StringType) like regEx, null, new GenericRow(Array[Any]("bc%")))
|
checkEvaluation(Literal(null, StringType) like regEx, null, new GenericRow(Array[Any]("bc%")))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,7 +203,7 @@ class ExpressionEvaluationSuite extends FunSuite {
|
||||||
|
|
||||||
test("data type casting") {
|
test("data type casting") {
|
||||||
|
|
||||||
val sts = "1970-01-01 00:00:01.0"
|
val sts = "1970-01-01 00:00:01.1"
|
||||||
val ts = Timestamp.valueOf(sts)
|
val ts = Timestamp.valueOf(sts)
|
||||||
|
|
||||||
checkEvaluation("abdef" cast StringType, "abdef")
|
checkEvaluation("abdef" cast StringType, "abdef")
|
||||||
|
@ -293,7 +293,7 @@ class ExpressionEvaluationSuite extends FunSuite {
|
||||||
// A test for higher precision than millis
|
// A test for higher precision than millis
|
||||||
checkEvaluation(Cast(Cast(0.00000001, TimestampType), DoubleType), 0.00000001)
|
checkEvaluation(Cast(Cast(0.00000001, TimestampType), DoubleType), 0.00000001)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("null checking") {
|
test("null checking") {
|
||||||
val row = new GenericRow(Array[Any]("^Ba*n", null, true, null))
|
val row = new GenericRow(Array[Any]("^Ba*n", null, true, null))
|
||||||
val c1 = 'a.string.at(0)
|
val c1 = 'a.string.at(0)
|
||||||
|
@ -312,7 +312,7 @@ class ExpressionEvaluationSuite extends FunSuite {
|
||||||
|
|
||||||
checkEvaluation(IsNull(Literal(null, ShortType)), true)
|
checkEvaluation(IsNull(Literal(null, ShortType)), true)
|
||||||
checkEvaluation(IsNotNull(Literal(null, ShortType)), false)
|
checkEvaluation(IsNotNull(Literal(null, ShortType)), false)
|
||||||
|
|
||||||
checkEvaluation(Coalesce(c1 :: c2 :: Nil), "^Ba*n", row)
|
checkEvaluation(Coalesce(c1 :: c2 :: Nil), "^Ba*n", row)
|
||||||
checkEvaluation(Coalesce(Literal(null, StringType) :: Nil), null, row)
|
checkEvaluation(Coalesce(Literal(null, StringType) :: Nil), null, row)
|
||||||
checkEvaluation(Coalesce(Literal(null, StringType) :: c1 :: c2 :: Nil), "^Ba*n", row)
|
checkEvaluation(Coalesce(Literal(null, StringType) :: c1 :: c2 :: Nil), "^Ba*n", row)
|
||||||
|
@ -323,11 +323,11 @@ class ExpressionEvaluationSuite extends FunSuite {
|
||||||
checkEvaluation(If(Literal(null, BooleanType), c2, c1), "^Ba*n", row)
|
checkEvaluation(If(Literal(null, BooleanType), c2, c1), "^Ba*n", row)
|
||||||
checkEvaluation(If(Literal(true, BooleanType), c1, c2), "^Ba*n", row)
|
checkEvaluation(If(Literal(true, BooleanType), c1, c2), "^Ba*n", row)
|
||||||
checkEvaluation(If(Literal(false, BooleanType), c2, c1), "^Ba*n", row)
|
checkEvaluation(If(Literal(false, BooleanType), c2, c1), "^Ba*n", row)
|
||||||
checkEvaluation(If(Literal(false, BooleanType),
|
checkEvaluation(If(Literal(false, BooleanType),
|
||||||
Literal("a", StringType), Literal("b", StringType)), "b", row)
|
Literal("a", StringType), Literal("b", StringType)), "b", row)
|
||||||
|
|
||||||
checkEvaluation(In(c1, c1 :: c2 :: Nil), true, row)
|
checkEvaluation(In(c1, c1 :: c2 :: Nil), true, row)
|
||||||
checkEvaluation(In(Literal("^Ba*n", StringType),
|
checkEvaluation(In(Literal("^Ba*n", StringType),
|
||||||
Literal("^Ba*n", StringType) :: Nil), true, row)
|
Literal("^Ba*n", StringType) :: Nil), true, row)
|
||||||
checkEvaluation(In(Literal("^Ba*n", StringType),
|
checkEvaluation(In(Literal("^Ba*n", StringType),
|
||||||
Literal("^Ba*n", StringType) :: c2 :: Nil), true, row)
|
Literal("^Ba*n", StringType) :: c2 :: Nil), true, row)
|
||||||
|
@ -378,7 +378,7 @@ class ExpressionEvaluationSuite extends FunSuite {
|
||||||
|
|
||||||
test("complex type") {
|
test("complex type") {
|
||||||
val row = new GenericRow(Array[Any](
|
val row = new GenericRow(Array[Any](
|
||||||
"^Ba*n", // 0
|
"^Ba*n", // 0
|
||||||
null.asInstanceOf[String], // 1
|
null.asInstanceOf[String], // 1
|
||||||
new GenericRow(Array[Any]("aa", "bb")), // 2
|
new GenericRow(Array[Any]("aa", "bb")), // 2
|
||||||
Map("aa"->"bb"), // 3
|
Map("aa"->"bb"), // 3
|
||||||
|
@ -391,18 +391,18 @@ class ExpressionEvaluationSuite extends FunSuite {
|
||||||
val typeMap = MapType(StringType, StringType)
|
val typeMap = MapType(StringType, StringType)
|
||||||
val typeArray = ArrayType(StringType)
|
val typeArray = ArrayType(StringType)
|
||||||
|
|
||||||
checkEvaluation(GetItem(BoundReference(3, AttributeReference("c", typeMap)()),
|
checkEvaluation(GetItem(BoundReference(3, AttributeReference("c", typeMap)()),
|
||||||
Literal("aa")), "bb", row)
|
Literal("aa")), "bb", row)
|
||||||
checkEvaluation(GetItem(Literal(null, typeMap), Literal("aa")), null, row)
|
checkEvaluation(GetItem(Literal(null, typeMap), Literal("aa")), null, row)
|
||||||
checkEvaluation(GetItem(Literal(null, typeMap), Literal(null, StringType)), null, row)
|
checkEvaluation(GetItem(Literal(null, typeMap), Literal(null, StringType)), null, row)
|
||||||
checkEvaluation(GetItem(BoundReference(3, AttributeReference("c", typeMap)()),
|
checkEvaluation(GetItem(BoundReference(3, AttributeReference("c", typeMap)()),
|
||||||
Literal(null, StringType)), null, row)
|
Literal(null, StringType)), null, row)
|
||||||
|
|
||||||
checkEvaluation(GetItem(BoundReference(4, AttributeReference("c", typeArray)()),
|
checkEvaluation(GetItem(BoundReference(4, AttributeReference("c", typeArray)()),
|
||||||
Literal(1)), "bb", row)
|
Literal(1)), "bb", row)
|
||||||
checkEvaluation(GetItem(Literal(null, typeArray), Literal(1)), null, row)
|
checkEvaluation(GetItem(Literal(null, typeArray), Literal(1)), null, row)
|
||||||
checkEvaluation(GetItem(Literal(null, typeArray), Literal(null, IntegerType)), null, row)
|
checkEvaluation(GetItem(Literal(null, typeArray), Literal(null, IntegerType)), null, row)
|
||||||
checkEvaluation(GetItem(BoundReference(4, AttributeReference("c", typeArray)()),
|
checkEvaluation(GetItem(BoundReference(4, AttributeReference("c", typeArray)()),
|
||||||
Literal(null, IntegerType)), null, row)
|
Literal(null, IntegerType)), null, row)
|
||||||
|
|
||||||
checkEvaluation(GetField(BoundReference(2, AttributeReference("c", typeS)()), "a"), "aa", row)
|
checkEvaluation(GetField(BoundReference(2, AttributeReference("c", typeS)()), "a"), "aa", row)
|
||||||
|
|
|
@ -90,6 +90,9 @@ private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
|
||||||
private[sql] class StringColumnAccessor(buffer: ByteBuffer)
|
private[sql] class StringColumnAccessor(buffer: ByteBuffer)
|
||||||
extends NativeColumnAccessor(buffer, STRING)
|
extends NativeColumnAccessor(buffer, STRING)
|
||||||
|
|
||||||
|
private[sql] class TimestampColumnAccessor(buffer: ByteBuffer)
|
||||||
|
extends NativeColumnAccessor(buffer, TIMESTAMP)
|
||||||
|
|
||||||
private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
|
private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
|
||||||
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY)
|
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY)
|
||||||
with NullableColumnAccessor
|
with NullableColumnAccessor
|
||||||
|
@ -105,16 +108,17 @@ private[sql] object ColumnAccessor {
|
||||||
val columnTypeId = dup.getInt()
|
val columnTypeId = dup.getInt()
|
||||||
|
|
||||||
columnTypeId match {
|
columnTypeId match {
|
||||||
case INT.typeId => new IntColumnAccessor(dup)
|
case INT.typeId => new IntColumnAccessor(dup)
|
||||||
case LONG.typeId => new LongColumnAccessor(dup)
|
case LONG.typeId => new LongColumnAccessor(dup)
|
||||||
case FLOAT.typeId => new FloatColumnAccessor(dup)
|
case FLOAT.typeId => new FloatColumnAccessor(dup)
|
||||||
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
|
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
|
||||||
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
|
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
|
||||||
case BYTE.typeId => new ByteColumnAccessor(dup)
|
case BYTE.typeId => new ByteColumnAccessor(dup)
|
||||||
case SHORT.typeId => new ShortColumnAccessor(dup)
|
case SHORT.typeId => new ShortColumnAccessor(dup)
|
||||||
case STRING.typeId => new StringColumnAccessor(dup)
|
case STRING.typeId => new StringColumnAccessor(dup)
|
||||||
case BINARY.typeId => new BinaryColumnAccessor(dup)
|
case TIMESTAMP.typeId => new TimestampColumnAccessor(dup)
|
||||||
case GENERIC.typeId => new GenericColumnAccessor(dup)
|
case BINARY.typeId => new BinaryColumnAccessor(dup)
|
||||||
|
case GENERIC.typeId => new GenericColumnAccessor(dup)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,6 +109,9 @@ private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColum
|
||||||
|
|
||||||
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)
|
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)
|
||||||
|
|
||||||
|
private[sql] class TimestampColumnBuilder
|
||||||
|
extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP)
|
||||||
|
|
||||||
private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
|
private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
|
||||||
|
|
||||||
// TODO (lian) Add support for array, struct and map
|
// TODO (lian) Add support for array, struct and map
|
||||||
|
|
|
@ -344,21 +344,52 @@ private[sql] class StringColumnStats extends BasicColumnStats(STRING) {
|
||||||
}
|
}
|
||||||
|
|
||||||
override def contains(row: Row, ordinal: Int) = {
|
override def contains(row: Row, ordinal: Int) = {
|
||||||
!(upperBound eq null) && {
|
(upperBound ne null) && {
|
||||||
val field = columnType.getField(row, ordinal)
|
val field = columnType.getField(row, ordinal)
|
||||||
lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0
|
lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def isAbove(row: Row, ordinal: Int) = {
|
override def isAbove(row: Row, ordinal: Int) = {
|
||||||
!(upperBound eq null) && {
|
(upperBound ne null) && {
|
||||||
val field = columnType.getField(row, ordinal)
|
val field = columnType.getField(row, ordinal)
|
||||||
field.compareTo(upperBound) < 0
|
field.compareTo(upperBound) < 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def isBelow(row: Row, ordinal: Int) = {
|
override def isBelow(row: Row, ordinal: Int) = {
|
||||||
!(lowerBound eq null) && {
|
(lowerBound ne null) && {
|
||||||
|
val field = columnType.getField(row, ordinal)
|
||||||
|
lowerBound.compareTo(field) < 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private[sql] class TimestampColumnStats extends BasicColumnStats(TIMESTAMP) {
|
||||||
|
override def initialBounds = (null, null)
|
||||||
|
|
||||||
|
override def gatherStats(row: Row, ordinal: Int) {
|
||||||
|
val field = columnType.getField(row, ordinal)
|
||||||
|
if ((upperBound eq null) || field.compareTo(upperBound) > 0) _upper = field
|
||||||
|
if ((lowerBound eq null) || field.compareTo(lowerBound) < 0) _lower = field
|
||||||
|
}
|
||||||
|
|
||||||
|
override def contains(row: Row, ordinal: Int) = {
|
||||||
|
(upperBound ne null) && {
|
||||||
|
val field = columnType.getField(row, ordinal)
|
||||||
|
lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def isAbove(row: Row, ordinal: Int) = {
|
||||||
|
(lowerBound ne null) && {
|
||||||
|
val field = columnType.getField(row, ordinal)
|
||||||
|
field.compareTo(upperBound) < 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def isBelow(row: Row, ordinal: Int) = {
|
||||||
|
(lowerBound ne null) && {
|
||||||
val field = columnType.getField(row, ordinal)
|
val field = columnType.getField(row, ordinal)
|
||||||
lowerBound.compareTo(field) < 0
|
lowerBound.compareTo(field) < 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,8 @@ import java.nio.ByteBuffer
|
||||||
|
|
||||||
import scala.reflect.runtime.universe.TypeTag
|
import scala.reflect.runtime.universe.TypeTag
|
||||||
|
|
||||||
|
import java.sql.Timestamp
|
||||||
|
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.Row
|
||||||
import org.apache.spark.sql.catalyst.expressions.MutableRow
|
import org.apache.spark.sql.catalyst.expressions.MutableRow
|
||||||
import org.apache.spark.sql.catalyst.types._
|
import org.apache.spark.sql.catalyst.types._
|
||||||
|
@ -221,6 +223,26 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
|
||||||
override def getField(row: Row, ordinal: Int) = row.getString(ordinal)
|
override def getField(row: Row, ordinal: Int) = row.getString(ordinal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 8, 12) {
|
||||||
|
override def extract(buffer: ByteBuffer) = {
|
||||||
|
val timestamp = new Timestamp(buffer.getLong())
|
||||||
|
timestamp.setNanos(buffer.getInt())
|
||||||
|
timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
override def append(v: Timestamp, buffer: ByteBuffer) {
|
||||||
|
buffer.putLong(v.getTime).putInt(v.getNanos)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getField(row: Row, ordinal: Int) = {
|
||||||
|
row(ordinal).asInstanceOf[Timestamp]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def setField(row: MutableRow, ordinal: Int, value: Timestamp) {
|
||||||
|
row(ordinal) = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
|
private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
|
||||||
typeId: Int,
|
typeId: Int,
|
||||||
defaultSize: Int)
|
defaultSize: Int)
|
||||||
|
@ -240,7 +262,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) {
|
private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](9, 16) {
|
||||||
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) {
|
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) {
|
||||||
row(ordinal) = value
|
row(ordinal) = value
|
||||||
}
|
}
|
||||||
|
@ -251,7 +273,7 @@ private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) {
|
||||||
// Used to process generic objects (all types other than those listed above). Objects should be
|
// Used to process generic objects (all types other than those listed above). Objects should be
|
||||||
// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
|
// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
|
||||||
// byte array.
|
// byte array.
|
||||||
private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) {
|
private[sql] object GENERIC extends ByteArrayColumnType[DataType](10, 16) {
|
||||||
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) {
|
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) {
|
||||||
row(ordinal) = SparkSqlSerializer.deserialize[Any](value)
|
row(ordinal) = SparkSqlSerializer.deserialize[Any](value)
|
||||||
}
|
}
|
||||||
|
@ -262,16 +284,17 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) {
|
||||||
private[sql] object ColumnType {
|
private[sql] object ColumnType {
|
||||||
def apply(dataType: DataType): ColumnType[_, _] = {
|
def apply(dataType: DataType): ColumnType[_, _] = {
|
||||||
dataType match {
|
dataType match {
|
||||||
case IntegerType => INT
|
case IntegerType => INT
|
||||||
case LongType => LONG
|
case LongType => LONG
|
||||||
case FloatType => FLOAT
|
case FloatType => FLOAT
|
||||||
case DoubleType => DOUBLE
|
case DoubleType => DOUBLE
|
||||||
case BooleanType => BOOLEAN
|
case BooleanType => BOOLEAN
|
||||||
case ByteType => BYTE
|
case ByteType => BYTE
|
||||||
case ShortType => SHORT
|
case ShortType => SHORT
|
||||||
case StringType => STRING
|
case StringType => STRING
|
||||||
case BinaryType => BINARY
|
case BinaryType => BINARY
|
||||||
case _ => GENERIC
|
case TimestampType => TIMESTAMP
|
||||||
|
case _ => GENERIC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,14 +22,15 @@ import org.scalatest.FunSuite
|
||||||
import org.apache.spark.sql.catalyst.types._
|
import org.apache.spark.sql.catalyst.types._
|
||||||
|
|
||||||
class ColumnStatsSuite extends FunSuite {
|
class ColumnStatsSuite extends FunSuite {
|
||||||
testColumnStats(classOf[BooleanColumnStats], BOOLEAN)
|
testColumnStats(classOf[BooleanColumnStats], BOOLEAN)
|
||||||
testColumnStats(classOf[ByteColumnStats], BYTE)
|
testColumnStats(classOf[ByteColumnStats], BYTE)
|
||||||
testColumnStats(classOf[ShortColumnStats], SHORT)
|
testColumnStats(classOf[ShortColumnStats], SHORT)
|
||||||
testColumnStats(classOf[IntColumnStats], INT)
|
testColumnStats(classOf[IntColumnStats], INT)
|
||||||
testColumnStats(classOf[LongColumnStats], LONG)
|
testColumnStats(classOf[LongColumnStats], LONG)
|
||||||
testColumnStats(classOf[FloatColumnStats], FLOAT)
|
testColumnStats(classOf[FloatColumnStats], FLOAT)
|
||||||
testColumnStats(classOf[DoubleColumnStats], DOUBLE)
|
testColumnStats(classOf[DoubleColumnStats], DOUBLE)
|
||||||
testColumnStats(classOf[StringColumnStats], STRING)
|
testColumnStats(classOf[StringColumnStats], STRING)
|
||||||
|
testColumnStats(classOf[TimestampColumnStats], TIMESTAMP)
|
||||||
|
|
||||||
def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]](
|
def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]](
|
||||||
columnStatsClass: Class[U],
|
columnStatsClass: Class[U],
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.spark.sql.columnar
|
package org.apache.spark.sql.columnar
|
||||||
|
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
|
import java.sql.Timestamp
|
||||||
|
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
|
@ -32,7 +33,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
|
||||||
test("defaultSize") {
|
test("defaultSize") {
|
||||||
val checks = Map(
|
val checks = Map(
|
||||||
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4,
|
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4,
|
||||||
BOOLEAN -> 1, STRING -> 8, BINARY -> 16, GENERIC -> 16)
|
BOOLEAN -> 1, STRING -> 8, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)
|
||||||
|
|
||||||
checks.foreach { case (columnType, expectedSize) =>
|
checks.foreach { case (columnType, expectedSize) =>
|
||||||
assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
|
assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
|
||||||
|
@ -52,14 +53,15 @@ class ColumnTypeSuite extends FunSuite with Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
checkActualSize(INT, Int.MaxValue, 4)
|
checkActualSize(INT, Int.MaxValue, 4)
|
||||||
checkActualSize(SHORT, Short.MaxValue, 2)
|
checkActualSize(SHORT, Short.MaxValue, 2)
|
||||||
checkActualSize(LONG, Long.MaxValue, 8)
|
checkActualSize(LONG, Long.MaxValue, 8)
|
||||||
checkActualSize(BYTE, Byte.MaxValue, 1)
|
checkActualSize(BYTE, Byte.MaxValue, 1)
|
||||||
checkActualSize(DOUBLE, Double.MaxValue, 8)
|
checkActualSize(DOUBLE, Double.MaxValue, 8)
|
||||||
checkActualSize(FLOAT, Float.MaxValue, 4)
|
checkActualSize(FLOAT, Float.MaxValue, 4)
|
||||||
checkActualSize(BOOLEAN, true, 1)
|
checkActualSize(BOOLEAN, true, 1)
|
||||||
checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
|
checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
|
||||||
|
checkActualSize(TIMESTAMP, new Timestamp(0L), 12)
|
||||||
|
|
||||||
val binary = Array.fill[Byte](4)(0: Byte)
|
val binary = Array.fill[Byte](4)(0: Byte)
|
||||||
checkActualSize(BINARY, binary, 4 + 4)
|
checkActualSize(BINARY, binary, 4 + 4)
|
||||||
|
@ -188,17 +190,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def hexDump(value: Any): String = {
|
private def hexDump(value: Any): String = {
|
||||||
if (value.isInstanceOf[String]) {
|
value.toString.map(ch => Integer.toHexString(ch & 0xffff)).mkString(" ")
|
||||||
val sb = new StringBuilder()
|
|
||||||
for (ch <- value.asInstanceOf[String].toCharArray) {
|
|
||||||
sb.append(Integer.toHexString(ch & 0xffff)).append(' ')
|
|
||||||
}
|
|
||||||
if (! sb.isEmpty) sb.setLength(sb.length - 1)
|
|
||||||
sb.toString()
|
|
||||||
} else {
|
|
||||||
// for now ..
|
|
||||||
hexDump(value.toString)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def dumpBuffer(buff: ByteBuffer): Any = {
|
private def dumpBuffer(buff: ByteBuffer): Any = {
|
||||||
|
@ -207,7 +199,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
|
||||||
val b = buff.get()
|
val b = buff.get()
|
||||||
sb.append(Integer.toHexString(b & 0xff)).append(' ')
|
sb.append(Integer.toHexString(b & 0xff)).append(' ')
|
||||||
}
|
}
|
||||||
if (! sb.isEmpty) sb.setLength(sb.length - 1)
|
if (sb.nonEmpty) sb.setLength(sb.length - 1)
|
||||||
sb.toString()
|
sb.toString()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.spark.sql.columnar
|
||||||
import scala.collection.immutable.HashSet
|
import scala.collection.immutable.HashSet
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
|
|
||||||
|
import java.sql.Timestamp
|
||||||
|
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.Row
|
||||||
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
|
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
|
||||||
import org.apache.spark.sql.catalyst.types.{DataType, NativeType}
|
import org.apache.spark.sql.catalyst.types.{DataType, NativeType}
|
||||||
|
@ -39,15 +41,19 @@ object ColumnarTestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
(columnType match {
|
(columnType match {
|
||||||
case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
|
case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
|
||||||
case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
|
case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
|
||||||
case INT => Random.nextInt()
|
case INT => Random.nextInt()
|
||||||
case LONG => Random.nextLong()
|
case LONG => Random.nextLong()
|
||||||
case FLOAT => Random.nextFloat()
|
case FLOAT => Random.nextFloat()
|
||||||
case DOUBLE => Random.nextDouble()
|
case DOUBLE => Random.nextDouble()
|
||||||
case STRING => Random.nextString(Random.nextInt(32))
|
case STRING => Random.nextString(Random.nextInt(32))
|
||||||
case BOOLEAN => Random.nextBoolean()
|
case BOOLEAN => Random.nextBoolean()
|
||||||
case BINARY => randomBytes(Random.nextInt(32))
|
case BINARY => randomBytes(Random.nextInt(32))
|
||||||
|
case TIMESTAMP =>
|
||||||
|
val timestamp = new Timestamp(Random.nextLong())
|
||||||
|
timestamp.setNanos(Random.nextInt(999999999))
|
||||||
|
timestamp
|
||||||
case _ =>
|
case _ =>
|
||||||
// Using a random one-element map instead of an arbitrary object
|
// Using a random one-element map instead of an arbitrary object
|
||||||
Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))
|
Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))
|
||||||
|
@ -96,5 +102,4 @@ object ColumnarTestUtils {
|
||||||
|
|
||||||
(values, rows)
|
(values, rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.spark.sql.hive
|
package org.apache.spark.sql.hive
|
||||||
|
|
||||||
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
|
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
|
||||||
|
import java.sql.Timestamp
|
||||||
import java.util.{ArrayList => JArrayList}
|
import java.util.{ArrayList => JArrayList}
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
|
@ -28,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf
|
||||||
import org.apache.hadoop.hive.ql.Driver
|
import org.apache.hadoop.hive.ql.Driver
|
||||||
import org.apache.hadoop.hive.ql.processors._
|
import org.apache.hadoop.hive.ql.processors._
|
||||||
import org.apache.hadoop.hive.ql.session.SessionState
|
import org.apache.hadoop.hive.ql.session.SessionState
|
||||||
|
import org.apache.hadoop.hive.serde2.io.TimestampWritable
|
||||||
|
|
||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.SparkContext
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
|
@ -266,6 +268,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
|
||||||
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
|
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
|
||||||
}.toSeq.sorted.mkString("{", ",", "}")
|
}.toSeq.sorted.mkString("{", ",", "}")
|
||||||
case (null, _) => "NULL"
|
case (null, _) => "NULL"
|
||||||
|
case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
|
||||||
case (other, tpe) if primitiveTypes contains tpe => other.toString
|
case (other, tpe) if primitiveTypes contains tpe => other.toString
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
1.293872461E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.293872461E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.2938724611E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01.1
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.2938724610001E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01.0001
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.2938724610001E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01.0001
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.293872461001E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01.001000011
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.293872461E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.293872461E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.2938724611E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01.1
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
|
@ -0,0 +1 @@
|
||||||
|
-4787
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1293872461
|
|
@ -0,0 +1 @@
|
||||||
|
1.29387251E9
|
|
@ -0,0 +1 @@
|
||||||
|
1.2938724610001E9
|
|
@ -0,0 +1 @@
|
||||||
|
2011-01-01 01:01:01.0001
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
true
|
|
@ -0,0 +1 @@
|
||||||
|
77
|
Some files were not shown because too many files have changed in this diff Show more
Loading…
Reference in a new issue