[SPARK-8189] [SQL] use Long for TimestampType in SQL
This PR change to use Long as internal type for TimestampType for efficiency, which means it will the precision below 100ns. Author: Davies Liu <davies@databricks.com> Closes #6733 from davies/timestamp and squashes the following commits: d9565fa [Davies Liu] remove print 65cf2f1 [Davies Liu] fix Timestamp in SparkR 86fecfb [Davies Liu] disable two timestamp tests 8f77ee0 [Davies Liu] fix scala style 246ee74 [Davies Liu] address comments 309d2e1 [Davies Liu] use Long for TimestampType in SQL
This commit is contained in:
parent
b928f54384
commit
37719e0cd0
|
@ -18,7 +18,7 @@
|
|||
package org.apache.spark.api.r
|
||||
|
||||
import java.io.{DataInputStream, DataOutputStream}
|
||||
import java.sql.{Date, Time}
|
||||
import java.sql.{Timestamp, Date, Time}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
|
@ -107,9 +107,12 @@ private[spark] object SerDe {
|
|||
Date.valueOf(readString(in))
|
||||
}
|
||||
|
||||
def readTime(in: DataInputStream): Time = {
|
||||
val t = in.readDouble()
|
||||
new Time((t * 1000L).toLong)
|
||||
def readTime(in: DataInputStream): Timestamp = {
|
||||
val seconds = in.readDouble()
|
||||
val sec = Math.floor(seconds).toLong
|
||||
val t = new Timestamp(sec * 1000L)
|
||||
t.setNanos(((seconds - sec) * 1e9).toInt)
|
||||
t
|
||||
}
|
||||
|
||||
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
|
||||
|
@ -227,6 +230,9 @@ private[spark] object SerDe {
|
|||
case "java.sql.Time" =>
|
||||
writeType(dos, "time")
|
||||
writeTime(dos, value.asInstanceOf[Time])
|
||||
case "java.sql.Timestamp" =>
|
||||
writeType(dos, "time")
|
||||
writeTime(dos, value.asInstanceOf[Timestamp])
|
||||
case "[B" =>
|
||||
writeType(dos, "raw")
|
||||
writeBytes(dos, value.asInstanceOf[Array[Byte]])
|
||||
|
@ -289,6 +295,9 @@ private[spark] object SerDe {
|
|||
out.writeDouble(value.getTime.toDouble / 1000.0)
|
||||
}
|
||||
|
||||
def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
|
||||
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
|
||||
}
|
||||
|
||||
// NOTE: Only works for ASCII right now
|
||||
def writeString(out: DataOutputStream, value: String): Unit = {
|
||||
|
|
|
@ -19,6 +19,7 @@ import sys
|
|||
import decimal
|
||||
import time
|
||||
import datetime
|
||||
import calendar
|
||||
import keyword
|
||||
import warnings
|
||||
import json
|
||||
|
@ -654,6 +655,8 @@ def _need_python_to_sql_conversion(dataType):
|
|||
_need_python_to_sql_conversion(dataType.valueType)
|
||||
elif isinstance(dataType, UserDefinedType):
|
||||
return True
|
||||
elif isinstance(dataType, TimestampType):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
@ -707,6 +710,14 @@ def _python_to_sql_converter(dataType):
|
|||
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
|
||||
elif isinstance(dataType, UserDefinedType):
|
||||
return lambda obj: dataType.serialize(obj)
|
||||
elif isinstance(dataType, TimestampType):
|
||||
|
||||
def to_posix_timstamp(dt):
|
||||
if dt.tzinfo is None:
|
||||
return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
|
||||
else:
|
||||
return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
|
||||
return to_posix_timstamp
|
||||
else:
|
||||
raise ValueError("Unexpected type %r" % dataType)
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.spark.sql;
|
|||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Date;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
|
||||
import scala.collection.Seq;
|
||||
|
@ -103,6 +104,11 @@ public abstract class BaseRow implements Row {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Timestamp getTimestamp(int i) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Seq<T> getSeq(int i) {
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
|
@ -260,9 +260,15 @@ trait Row extends Serializable {
|
|||
*
|
||||
* @throws ClassCastException when data type does not match.
|
||||
*/
|
||||
// TODO(davies): This is not the right default implementation, we use Int as Date internally
|
||||
def getDate(i: Int): java.sql.Date = apply(i).asInstanceOf[java.sql.Date]
|
||||
|
||||
/**
|
||||
* Returns the value at position i of date type as java.sql.Timestamp.
|
||||
*
|
||||
* @throws ClassCastException when data type does not match.
|
||||
*/
|
||||
def getTimestamp(i: Int): java.sql.Timestamp = apply(i).asInstanceOf[java.sql.Timestamp]
|
||||
|
||||
/**
|
||||
* Returns the value at position i of array type as a Scala Seq.
|
||||
*
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst
|
|||
|
||||
import java.lang.{Iterable => JavaIterable}
|
||||
import java.math.{BigDecimal => JavaBigDecimal}
|
||||
import java.sql.Date
|
||||
import java.sql.{Timestamp, Date}
|
||||
import java.util.{Map => JavaMap}
|
||||
import javax.annotation.Nullable
|
||||
|
||||
|
@ -58,6 +58,7 @@ object CatalystTypeConverters {
|
|||
case structType: StructType => StructConverter(structType)
|
||||
case StringType => StringConverter
|
||||
case DateType => DateConverter
|
||||
case TimestampType => TimestampConverter
|
||||
case dt: DecimalType => BigDecimalConverter
|
||||
case BooleanType => BooleanConverter
|
||||
case ByteType => ByteConverter
|
||||
|
@ -274,6 +275,15 @@ object CatalystTypeConverters {
|
|||
override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column))
|
||||
}
|
||||
|
||||
private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
|
||||
override def toCatalystImpl(scalaValue: Timestamp): Long =
|
||||
DateUtils.fromJavaTimestamp(scalaValue)
|
||||
override def toScala(catalystValue: Any): Timestamp =
|
||||
if (catalystValue == null) null
|
||||
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
|
||||
override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column))
|
||||
}
|
||||
|
||||
private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
|
||||
override def toCatalystImpl(scalaValue: Any): Decimal = scalaValue match {
|
||||
case d: BigDecimal => Decimal(d)
|
||||
|
@ -367,6 +377,7 @@ object CatalystTypeConverters {
|
|||
def convertToCatalyst(a: Any): Any = a match {
|
||||
case s: String => StringConverter.toCatalyst(s)
|
||||
case d: Date => DateConverter.toCatalyst(d)
|
||||
case t: Timestamp => TimestampConverter.toCatalyst(t)
|
||||
case d: BigDecimal => BigDecimalConverter.toCatalyst(d)
|
||||
case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d)
|
||||
case seq: Seq[Any] => seq.map(convertToCatalyst)
|
||||
|
|
|
@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
private[this] def castToString(from: DataType): Any => Any = from match {
|
||||
case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
|
||||
case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d)))
|
||||
case TimestampType => buildCast[Timestamp](_, t => UTF8String(timestampToString(t)))
|
||||
case TimestampType => buildCast[Long](_,
|
||||
t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t))))
|
||||
case _ => buildCast[Any](_, o => UTF8String(o.toString))
|
||||
}
|
||||
|
||||
|
@ -127,7 +128,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case StringType =>
|
||||
buildCast[UTF8String](_, _.length() != 0)
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
|
||||
buildCast[Long](_, t => t != 0)
|
||||
case DateType =>
|
||||
// Hive would return null when cast from date to boolean
|
||||
buildCast[Int](_, d => null)
|
||||
|
@ -158,20 +159,21 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
if (periodIdx != -1 && n.length() - periodIdx > 9) {
|
||||
n = n.substring(0, periodIdx + 10)
|
||||
}
|
||||
try Timestamp.valueOf(n) catch { case _: java.lang.IllegalArgumentException => null }
|
||||
try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
|
||||
catch { case _: java.lang.IllegalArgumentException => null }
|
||||
})
|
||||
case BooleanType =>
|
||||
buildCast[Boolean](_, b => new Timestamp(if (b) 1 else 0))
|
||||
buildCast[Boolean](_, b => (if (b) 1L else 0))
|
||||
case LongType =>
|
||||
buildCast[Long](_, l => new Timestamp(l))
|
||||
buildCast[Long](_, l => longToTimestamp(l))
|
||||
case IntegerType =>
|
||||
buildCast[Int](_, i => new Timestamp(i))
|
||||
buildCast[Int](_, i => longToTimestamp(i.toLong))
|
||||
case ShortType =>
|
||||
buildCast[Short](_, s => new Timestamp(s))
|
||||
buildCast[Short](_, s => longToTimestamp(s.toLong))
|
||||
case ByteType =>
|
||||
buildCast[Byte](_, b => new Timestamp(b))
|
||||
buildCast[Byte](_, b => longToTimestamp(b.toLong))
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
|
||||
buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
|
||||
// TimestampWritable.decimalToTimestamp
|
||||
case DecimalType() =>
|
||||
buildCast[Decimal](_, d => decimalToTimestamp(d))
|
||||
|
@ -191,25 +193,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
})
|
||||
}
|
||||
|
||||
private[this] def decimalToTimestamp(d: Decimal) = {
|
||||
val seconds = Math.floor(d.toDouble).toLong
|
||||
val bd = (d.toBigDecimal - seconds) * 1000000000
|
||||
val nanos = bd.intValue()
|
||||
|
||||
val millis = seconds * 1000
|
||||
val t = new Timestamp(millis)
|
||||
|
||||
// remaining fractional portion as nanos
|
||||
t.setNanos(nanos)
|
||||
t
|
||||
private[this] def decimalToTimestamp(d: Decimal): Long = {
|
||||
(d.toBigDecimal * 10000000L).longValue()
|
||||
}
|
||||
|
||||
// Timestamp to long, converting milliseconds to seconds
|
||||
private[this] def timestampToLong(ts: Timestamp) = Math.floor(ts.getTime / 1000.0).toLong
|
||||
|
||||
private[this] def timestampToDouble(ts: Timestamp) = {
|
||||
// First part is the seconds since the beginning of time, followed by nanosecs.
|
||||
Math.floor(ts.getTime / 1000.0).toLong + ts.getNanos.toDouble / 1000000000
|
||||
// converting milliseconds to 100ns
|
||||
private[this] def longToTimestamp(t: Long): Long = t * 10000L
|
||||
// converting 100ns to seconds
|
||||
private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 10000000L).toLong
|
||||
// converting 100ns to seconds in double
|
||||
private[this] def timestampToDouble(ts: Long): Double = {
|
||||
ts / 10000000.0
|
||||
}
|
||||
|
||||
// Converts Timestamp to string according to Hive TimestampWritable convention
|
||||
|
@ -234,7 +228,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case TimestampType =>
|
||||
// throw valid precision more than seconds, according to Hive.
|
||||
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
|
||||
buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
|
||||
buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
|
||||
// Hive throws this exception as a Semantic Exception
|
||||
// It is never possible to compare result when hive return with exception,
|
||||
// so we can return null
|
||||
|
@ -253,7 +247,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToLong(t))
|
||||
buildCast[Long](_, t => timestampToLong(t))
|
||||
case x: NumericType =>
|
||||
b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b)
|
||||
}
|
||||
|
@ -269,7 +263,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToLong(t).toInt)
|
||||
buildCast[Long](_, t => timestampToLong(t).toInt)
|
||||
case x: NumericType =>
|
||||
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b)
|
||||
}
|
||||
|
@ -285,7 +279,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToLong(t).toShort)
|
||||
buildCast[Long](_, t => timestampToLong(t).toShort)
|
||||
case x: NumericType =>
|
||||
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toShort
|
||||
}
|
||||
|
@ -301,7 +295,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToLong(t).toByte)
|
||||
buildCast[Long](_, t => timestampToLong(t).toByte)
|
||||
case x: NumericType =>
|
||||
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toByte
|
||||
}
|
||||
|
@ -334,7 +328,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
buildCast[Int](_, d => null) // date can't cast to decimal in Hive
|
||||
case TimestampType =>
|
||||
// Note that we lose precision here.
|
||||
buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
|
||||
buildCast[Long](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
|
||||
case DecimalType() =>
|
||||
b => changePrecision(b.asInstanceOf[Decimal].clone(), target)
|
||||
case LongType =>
|
||||
|
@ -358,7 +352,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToDouble(t))
|
||||
buildCast[Long](_, t => timestampToDouble(t))
|
||||
case x: NumericType =>
|
||||
b => x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)
|
||||
}
|
||||
|
@ -374,7 +368,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
|
||||
buildCast[Long](_, t => timestampToDouble(t).toFloat)
|
||||
case x: NumericType =>
|
||||
b => x.numeric.asInstanceOf[Numeric[Any]].toFloat(b)
|
||||
}
|
||||
|
|
|
@ -203,6 +203,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
|
|||
case BooleanType => new MutableBoolean
|
||||
case LongType => new MutableLong
|
||||
case DateType => new MutableInt // We use INT for DATE internally
|
||||
case TimestampType => new MutableLong // We use Long for Timestamp internally
|
||||
case _ => new MutableAny
|
||||
}.toArray)
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ class CodeGenContext {
|
|||
case BinaryType => "byte[]"
|
||||
case StringType => stringType
|
||||
case DateType => "int"
|
||||
case TimestampType => "java.sql.Timestamp"
|
||||
case TimestampType => "long"
|
||||
case dt: OpenHashSetUDT if dt.elementType == IntegerType => classOf[IntegerHashSet].getName
|
||||
case dt: OpenHashSetUDT if dt.elementType == LongType => classOf[LongHashSet].getName
|
||||
case _ => "Object"
|
||||
|
@ -140,6 +140,7 @@ class CodeGenContext {
|
|||
case FloatType => "Float"
|
||||
case BooleanType => "Boolean"
|
||||
case DateType => "Integer"
|
||||
case TimestampType => "Long"
|
||||
case _ => javaType(dt)
|
||||
}
|
||||
|
||||
|
@ -155,6 +156,7 @@ class CodeGenContext {
|
|||
case DoubleType => "-1.0"
|
||||
case IntegerType => "-1"
|
||||
case DateType => "-1"
|
||||
case TimestampType => "-1L"
|
||||
case _ => "null"
|
||||
}
|
||||
|
||||
|
|
|
@ -73,7 +73,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
|
|||
|
||||
val specificAccessorFunctions = ctx.nativeTypes.map { dataType =>
|
||||
val cases = expressions.zipWithIndex.map {
|
||||
case (e, i) if e.dataType == dataType =>
|
||||
case (e, i) if e.dataType == dataType
|
||||
|| dataType == IntegerType && e.dataType == DateType
|
||||
|| dataType == LongType && e.dataType == TimestampType =>
|
||||
s"case $i: return c$i;"
|
||||
case _ => ""
|
||||
}.mkString("\n ")
|
||||
|
@ -96,7 +98,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
|
|||
|
||||
val specificMutatorFunctions = ctx.nativeTypes.map { dataType =>
|
||||
val cases = expressions.zipWithIndex.map {
|
||||
case (e, i) if e.dataType == dataType =>
|
||||
case (e, i) if e.dataType == dataType
|
||||
|| dataType == IntegerType && e.dataType == DateType
|
||||
|| dataType == LongType && e.dataType == TimestampType =>
|
||||
s"case $i: { c$i = value; return; }"
|
||||
case _ => ""
|
||||
}.mkString("\n")
|
||||
|
@ -119,7 +123,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
|
|||
val nonNull = e.dataType match {
|
||||
case BooleanType => s"$col ? 0 : 1"
|
||||
case ByteType | ShortType | IntegerType | DateType => s"$col"
|
||||
case LongType => s"$col ^ ($col >>> 32)"
|
||||
case LongType | TimestampType => s"$col ^ ($col >>> 32)"
|
||||
case FloatType => s"Float.floatToIntBits($col)"
|
||||
case DoubleType =>
|
||||
s"(int)(Double.doubleToLongBits($col) ^ (Double.doubleToLongBits($col) >>> 32))"
|
||||
|
|
|
@ -37,7 +37,7 @@ object Literal {
|
|||
case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
|
||||
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
|
||||
case d: Decimal => Literal(d, DecimalType.Unlimited)
|
||||
case t: Timestamp => Literal(t, TimestampType)
|
||||
case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
|
||||
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
|
||||
case a: Array[Byte] => Literal(a, BinaryType)
|
||||
case null => Literal(null, NullType)
|
||||
|
@ -100,7 +100,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
|
|||
ev.isNull = "false"
|
||||
ev.primitive = value.toString
|
||||
""
|
||||
case FloatType => // This must go before NumericType
|
||||
case FloatType =>
|
||||
val v = value.asInstanceOf[Float]
|
||||
if (v.isNaN || v.isInfinite) {
|
||||
super.genCode(ctx, ev)
|
||||
|
@ -109,7 +109,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
|
|||
ev.primitive = s"${value}f"
|
||||
""
|
||||
}
|
||||
case DoubleType => // This must go before NumericType
|
||||
case DoubleType =>
|
||||
val v = value.asInstanceOf[Double]
|
||||
if (v.isNaN || v.isInfinite) {
|
||||
super.genCode(ctx, ev)
|
||||
|
@ -118,15 +118,18 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
|
|||
ev.primitive = s"${value}"
|
||||
""
|
||||
}
|
||||
|
||||
case ByteType | ShortType => // This must go before NumericType
|
||||
case ByteType | ShortType =>
|
||||
ev.isNull = "false"
|
||||
ev.primitive = s"(${ctx.javaType(dataType)})$value"
|
||||
""
|
||||
case dt: NumericType if !dt.isInstanceOf[DecimalType] =>
|
||||
case IntegerType | DateType =>
|
||||
ev.isNull = "false"
|
||||
ev.primitive = value.toString
|
||||
""
|
||||
case TimestampType | LongType =>
|
||||
ev.isNull = "false"
|
||||
ev.primitive = s"${value}L"
|
||||
""
|
||||
// eval() version may be faster for non-primitive types
|
||||
case other =>
|
||||
super.genCode(ctx, ev)
|
||||
|
|
|
@ -254,9 +254,9 @@ abstract class BinaryComparison extends BinaryExpression with Predicate {
|
|||
case dt: NumericType if ctx.isNativeType(dt) => defineCodeGen (ctx, ev, {
|
||||
(c1, c3) => s"$c1 $symbol $c3"
|
||||
})
|
||||
case TimestampType =>
|
||||
// java.sql.Timestamp does not have compare()
|
||||
super.genCode(ctx, ev)
|
||||
case DateType | TimestampType => defineCodeGen (ctx, ev, {
|
||||
(c1, c3) => s"$c1 $symbol $c3"
|
||||
})
|
||||
case other => defineCodeGen (ctx, ev, {
|
||||
(c1, c2) => s"$c1.compare($c2) $symbol 0"
|
||||
})
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql.catalyst.util
|
||||
|
||||
import java.sql.Date
|
||||
import java.sql.{Timestamp, Date}
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.{Calendar, TimeZone}
|
||||
|
||||
|
@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast
|
|||
*/
|
||||
object DateUtils {
|
||||
private val MILLIS_PER_DAY = 86400000
|
||||
private val HUNDRED_NANOS_PER_SECOND = 10000000L
|
||||
|
||||
// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
|
||||
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
|
||||
|
@ -45,17 +46,17 @@ object DateUtils {
|
|||
((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
|
||||
}
|
||||
|
||||
private def toMillisSinceEpoch(days: Int): Long = {
|
||||
def toMillisSinceEpoch(days: Int): Long = {
|
||||
val millisUtc = days.toLong * MILLIS_PER_DAY
|
||||
millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
|
||||
}
|
||||
|
||||
def fromJavaDate(date: java.sql.Date): Int = {
|
||||
def fromJavaDate(date: Date): Int = {
|
||||
javaDateToDays(date)
|
||||
}
|
||||
|
||||
def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
|
||||
new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
|
||||
def toJavaDate(daysSinceEpoch: Int): Date = {
|
||||
new Date(toMillisSinceEpoch(daysSinceEpoch))
|
||||
}
|
||||
|
||||
def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
|
||||
|
@ -64,9 +65,9 @@ object DateUtils {
|
|||
if (!s.contains('T')) {
|
||||
// JDBC escape string
|
||||
if (s.contains(' ')) {
|
||||
java.sql.Timestamp.valueOf(s)
|
||||
Timestamp.valueOf(s)
|
||||
} else {
|
||||
java.sql.Date.valueOf(s)
|
||||
Date.valueOf(s)
|
||||
}
|
||||
} else if (s.endsWith("Z")) {
|
||||
// this is zero timezone of ISO8601
|
||||
|
@ -87,4 +88,33 @@ object DateUtils {
|
|||
ISO8601GMT.parse(s)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a java.sql.Timestamp from number of 100ns since epoch
|
||||
*/
|
||||
def toJavaTimestamp(num100ns: Long): Timestamp = {
|
||||
// setNanos() will overwrite the millisecond part, so the milliseconds should be
|
||||
// cut off at seconds
|
||||
var seconds = num100ns / HUNDRED_NANOS_PER_SECOND
|
||||
var nanos = num100ns % HUNDRED_NANOS_PER_SECOND
|
||||
// setNanos() can not accept negative value
|
||||
if (nanos < 0) {
|
||||
nanos += HUNDRED_NANOS_PER_SECOND
|
||||
seconds -= 1
|
||||
}
|
||||
val t = new Timestamp(seconds * 1000)
|
||||
t.setNanos(nanos.toInt * 100)
|
||||
t
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of 100ns since epoch from java.sql.Timestamp.
|
||||
*/
|
||||
def fromJavaTimestamp(t: Timestamp): Long = {
|
||||
if (t != null) {
|
||||
t.getTime() * 10000L + (t.getNanos().toLong / 100) % 10000L
|
||||
} else {
|
||||
0L
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.spark.sql.types
|
||||
|
||||
import java.sql.Timestamp
|
||||
|
||||
import scala.math.Ordering
|
||||
import scala.reflect.runtime.universe.typeTag
|
||||
|
||||
|
@ -38,18 +36,16 @@ class TimestampType private() extends AtomicType {
|
|||
// The companion object and this class is separated so the companion object also subclasses
|
||||
// this type. Otherwise, the companion object would be of type "TimestampType$" in byte code.
|
||||
// Defined with a private constructor so the companion object is the only possible instantiation.
|
||||
private[sql] type InternalType = Timestamp
|
||||
private[sql] type InternalType = Long
|
||||
|
||||
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
|
||||
|
||||
private[sql] val ordering = new Ordering[InternalType] {
|
||||
def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
|
||||
}
|
||||
private[sql] val ordering = implicitly[Ordering[InternalType]]
|
||||
|
||||
/**
|
||||
* The default size of a value of the TimestampType is 12 bytes.
|
||||
*/
|
||||
override def defaultSize: Int = 12
|
||||
override def defaultSize: Int = 8
|
||||
|
||||
private[spark] override def asNullable: TimestampType = this
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
|
|||
import java.sql.{Timestamp, Date}
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.catalyst.util.DateUtils
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/**
|
||||
|
@ -137,7 +138,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
checkEvaluation(cast(cast(sd, DateType), StringType), sd)
|
||||
checkEvaluation(cast(cast(d, StringType), DateType), 0)
|
||||
checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
|
||||
checkEvaluation(cast(cast(ts, StringType), TimestampType), ts)
|
||||
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts))
|
||||
|
||||
// all convert to string type to check
|
||||
checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd)
|
||||
|
@ -269,9 +270,9 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
checkEvaluation(cast(ts, LongType), 15.toLong)
|
||||
checkEvaluation(cast(ts, FloatType), 15.002f)
|
||||
checkEvaluation(cast(ts, DoubleType), 15.002)
|
||||
checkEvaluation(cast(cast(tss, ShortType), TimestampType), ts)
|
||||
checkEvaluation(cast(cast(tss, IntegerType), TimestampType), ts)
|
||||
checkEvaluation(cast(cast(tss, LongType), TimestampType), ts)
|
||||
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts))
|
||||
checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts))
|
||||
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts))
|
||||
checkEvaluation(
|
||||
cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
|
||||
millis.toFloat / 1000)
|
||||
|
@ -283,7 +284,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
Decimal(1))
|
||||
|
||||
// A test for higher precision than millis
|
||||
checkEvaluation(cast(cast(0.00000001, TimestampType), DoubleType), 0.00000001)
|
||||
checkEvaluation(cast(cast(0.0000001, TimestampType), DoubleType), 0.0000001)
|
||||
|
||||
checkEvaluation(cast(Double.NaN, TimestampType), null)
|
||||
checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.catalyst.util
|
||||
|
||||
import java.sql.Timestamp
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
|
||||
|
||||
class DateUtilsSuite extends SparkFunSuite {
|
||||
|
||||
test("timestamp") {
|
||||
val now = new Timestamp(System.currentTimeMillis())
|
||||
now.setNanos(100)
|
||||
val ns = DateUtils.fromJavaTimestamp(now)
|
||||
assert(ns % 10000000L == 1)
|
||||
assert(DateUtils.toJavaTimestamp(ns) == now)
|
||||
|
||||
List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
|
||||
val ts = DateUtils.toJavaTimestamp(t)
|
||||
assert(DateUtils.fromJavaTimestamp(ts) == t)
|
||||
assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -190,7 +190,7 @@ class DataTypeSuite extends SparkFunSuite {
|
|||
checkDefaultSize(DecimalType(10, 5), 4096)
|
||||
checkDefaultSize(DecimalType.Unlimited, 4096)
|
||||
checkDefaultSize(DateType, 4)
|
||||
checkDefaultSize(TimestampType, 12)
|
||||
checkDefaultSize(TimestampType, 8)
|
||||
checkDefaultSize(StringType, 4096)
|
||||
checkDefaultSize(BinaryType, 4096)
|
||||
checkDefaultSize(ArrayType(DoubleType, true), 800)
|
||||
|
|
|
@ -17,10 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql.columnar
|
||||
|
||||
import java.sql.Timestamp
|
||||
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute, AttributeReference}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
|
||||
|
@ -234,22 +232,7 @@ private[sql] class StringColumnStats extends ColumnStats {
|
|||
|
||||
private[sql] class DateColumnStats extends IntColumnStats
|
||||
|
||||
private[sql] class TimestampColumnStats extends ColumnStats {
|
||||
protected var upper: Timestamp = null
|
||||
protected var lower: Timestamp = null
|
||||
|
||||
override def gatherStats(row: Row, ordinal: Int): Unit = {
|
||||
super.gatherStats(row, ordinal)
|
||||
if (!row.isNullAt(ordinal)) {
|
||||
val value = row(ordinal).asInstanceOf[Timestamp]
|
||||
if (upper == null || value.compareTo(upper) > 0) upper = value
|
||||
if (lower == null || value.compareTo(lower) < 0) lower = value
|
||||
sizeInBytes += TIMESTAMP.defaultSize
|
||||
}
|
||||
}
|
||||
|
||||
override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
|
||||
}
|
||||
private[sql] class TimestampColumnStats extends LongColumnStats
|
||||
|
||||
private[sql] class BinaryColumnStats extends ColumnStats {
|
||||
override def gatherStats(row: Row, ordinal: Int): Unit = {
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.spark.sql.columnar
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.sql.Timestamp
|
||||
|
||||
import scala.reflect.runtime.universe.TypeTag
|
||||
|
||||
|
@ -355,22 +354,20 @@ private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
|
|||
}
|
||||
}
|
||||
|
||||
private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 12) {
|
||||
override def extract(buffer: ByteBuffer): Timestamp = {
|
||||
val timestamp = new Timestamp(buffer.getLong())
|
||||
timestamp.setNanos(buffer.getInt())
|
||||
timestamp
|
||||
private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) {
|
||||
override def extract(buffer: ByteBuffer): Long = {
|
||||
buffer.getLong
|
||||
}
|
||||
|
||||
override def append(v: Timestamp, buffer: ByteBuffer): Unit = {
|
||||
buffer.putLong(v.getTime).putInt(v.getNanos)
|
||||
override def append(v: Long, buffer: ByteBuffer): Unit = {
|
||||
buffer.putLong(v)
|
||||
}
|
||||
|
||||
override def getField(row: Row, ordinal: Int): Timestamp = {
|
||||
row(ordinal).asInstanceOf[Timestamp]
|
||||
override def getField(row: Row, ordinal: Int): Long = {
|
||||
row(ordinal).asInstanceOf[Long]
|
||||
}
|
||||
|
||||
override def setField(row: MutableRow, ordinal: Int, value: Timestamp): Unit = {
|
||||
override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
|
||||
row(ordinal) = value
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,14 +20,13 @@ package org.apache.spark.sql.execution
|
|||
import java.io._
|
||||
import java.math.{BigDecimal, BigInteger}
|
||||
import java.nio.ByteBuffer
|
||||
import java.sql.Timestamp
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import org.apache.spark.serializer._
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.serializer._
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, MutableRow, GenericMutableRow}
|
||||
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, SpecificMutableRow}
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/**
|
||||
|
@ -304,11 +303,7 @@ private[sql] object SparkSqlSerializer2 {
|
|||
out.writeByte(NULL)
|
||||
} else {
|
||||
out.writeByte(NOT_NULL)
|
||||
val timestamp = row.getAs[java.sql.Timestamp](i)
|
||||
val time = timestamp.getTime
|
||||
val nanos = timestamp.getNanos
|
||||
out.writeLong(time - (nanos / 1000000)) // Write the milliseconds value.
|
||||
out.writeInt(nanos) // Write the nanoseconds part.
|
||||
out.writeLong(row.getAs[Long](i))
|
||||
}
|
||||
|
||||
case StringType =>
|
||||
|
@ -429,11 +424,7 @@ private[sql] object SparkSqlSerializer2 {
|
|||
if (in.readByte() == NULL) {
|
||||
mutableRow.setNullAt(i)
|
||||
} else {
|
||||
val time = in.readLong() // Read the milliseconds value.
|
||||
val nanos = in.readInt() // Read the nanoseconds part.
|
||||
val timestamp = new Timestamp(time)
|
||||
timestamp.setNanos(nanos)
|
||||
mutableRow.update(i, timestamp)
|
||||
mutableRow.update(i, in.readLong())
|
||||
}
|
||||
|
||||
case StringType =>
|
||||
|
|
|
@ -170,6 +170,8 @@ package object debug {
|
|||
case (_: Short, ShortType) =>
|
||||
case (_: Boolean, BooleanType) =>
|
||||
case (_: Double, DoubleType) =>
|
||||
case (_: Int, DateType) =>
|
||||
case (_: Long, TimestampType) =>
|
||||
case (v, udt: UserDefinedType[_]) => typeCheck(v, udt.sqlType)
|
||||
|
||||
case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) expected $t")
|
||||
|
|
|
@ -148,6 +148,7 @@ object EvaluatePython {
|
|||
case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)
|
||||
|
||||
case (date: Int, DateType) => DateUtils.toJavaDate(date)
|
||||
case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t)
|
||||
case (s: UTF8String, StringType) => s.toString
|
||||
|
||||
// Pyrolite can handle Timestamp and Decimal
|
||||
|
@ -186,10 +187,12 @@ object EvaluatePython {
|
|||
}): Row
|
||||
|
||||
case (c: java.util.Calendar, DateType) =>
|
||||
DateUtils.fromJavaDate(new java.sql.Date(c.getTime().getTime()))
|
||||
DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
|
||||
|
||||
case (c: java.util.Calendar, TimestampType) =>
|
||||
new java.sql.Timestamp(c.getTime().getTime())
|
||||
c.getTimeInMillis * 10000L
|
||||
case (t: java.sql.Timestamp, TimestampType) =>
|
||||
DateUtils.fromJavaTimestamp(t)
|
||||
|
||||
case (_, udt: UserDefinedType[_]) =>
|
||||
fromJava(obj, udt.sqlType)
|
||||
|
|
|
@ -385,7 +385,7 @@ private[sql] class JDBCRDD(
|
|||
// DateUtils.fromJavaDate does not handle null value, so we need to check it.
|
||||
val dateVal = rs.getDate(pos)
|
||||
if (dateVal != null) {
|
||||
mutableRow.update(i, DateUtils.fromJavaDate(dateVal))
|
||||
mutableRow.setInt(i, DateUtils.fromJavaDate(dateVal))
|
||||
} else {
|
||||
mutableRow.update(i, null)
|
||||
}
|
||||
|
@ -417,7 +417,13 @@ private[sql] class JDBCRDD(
|
|||
case LongConversion => mutableRow.setLong(i, rs.getLong(pos))
|
||||
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
|
||||
case StringConversion => mutableRow.setString(i, rs.getString(pos))
|
||||
case TimestampConversion => mutableRow.update(i, rs.getTimestamp(pos))
|
||||
case TimestampConversion =>
|
||||
val t = rs.getTimestamp(pos)
|
||||
if (t != null) {
|
||||
mutableRow.setLong(i, DateUtils.fromJavaTimestamp(t))
|
||||
} else {
|
||||
mutableRow.update(i, null)
|
||||
}
|
||||
case BinaryConversion => mutableRow.update(i, rs.getBytes(pos))
|
||||
case BinaryLongConversion => {
|
||||
val bytes = rs.getBytes(pos)
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.spark.sql.json
|
||||
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.sql.Timestamp
|
||||
|
||||
import scala.collection.Map
|
||||
|
||||
|
@ -65,10 +64,10 @@ private[sql] object JacksonParser {
|
|||
DateUtils.millisToDays(DateUtils.stringToTime(parser.getText).getTime)
|
||||
|
||||
case (VALUE_STRING, TimestampType) =>
|
||||
new Timestamp(DateUtils.stringToTime(parser.getText).getTime)
|
||||
DateUtils.stringToTime(parser.getText).getTime * 10000L
|
||||
|
||||
case (VALUE_NUMBER_INT, TimestampType) =>
|
||||
new Timestamp(parser.getLongValue)
|
||||
parser.getLongValue * 10000L
|
||||
|
||||
case (_, StringType) =>
|
||||
val writer = new ByteArrayOutputStream()
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.spark.sql.json
|
||||
|
||||
import java.sql.Timestamp
|
||||
|
||||
import scala.collection.Map
|
||||
import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
|
||||
|
||||
|
@ -398,11 +396,11 @@ private[sql] object JsonRDD extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private def toTimestamp(value: Any): Timestamp = {
|
||||
private def toTimestamp(value: Any): Long = {
|
||||
value match {
|
||||
case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong)
|
||||
case value: java.lang.Long => new Timestamp(value)
|
||||
case value: java.lang.String => toTimestamp(DateUtils.stringToTime(value).getTime)
|
||||
case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L
|
||||
case value: java.lang.Long => value * 10000L
|
||||
case value: java.lang.String => DateUtils.stringToTime(value).getTime * 10000L
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Co
|
|||
import org.apache.parquet.schema.MessageType
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.util.DateUtils
|
||||
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.parquet.timestamp.NanoTime
|
||||
|
@ -266,8 +267,8 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
|
|||
/**
|
||||
* Read a Timestamp value from a Parquet Int96Value
|
||||
*/
|
||||
protected[parquet] def readTimestamp(value: Binary): Timestamp = {
|
||||
CatalystTimestampConverter.convertToTimestamp(value)
|
||||
protected[parquet] def readTimestamp(value: Binary): Long = {
|
||||
DateUtils.fromJavaTimestamp(CatalystTimestampConverter.convertToTimestamp(value))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -401,7 +402,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
|
|||
current.setInt(fieldIndex, value)
|
||||
|
||||
override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
|
||||
current.update(fieldIndex, value)
|
||||
current.setInt(fieldIndex, value)
|
||||
|
||||
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
|
||||
current.setLong(fieldIndex, value)
|
||||
|
@ -425,7 +426,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
|
|||
current.update(fieldIndex, UTF8String(value))
|
||||
|
||||
override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
|
||||
current.update(fieldIndex, readTimestamp(value))
|
||||
current.setLong(fieldIndex, readTimestamp(value))
|
||||
|
||||
override protected[parquet] def updateDecimal(
|
||||
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.parquet.schema.MessageType
|
|||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
|
||||
import org.apache.spark.sql.catalyst.util.DateUtils
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/**
|
||||
|
@ -204,7 +205,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
|
|||
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
|
||||
case ShortType => writer.addInteger(value.asInstanceOf[Short])
|
||||
case LongType => writer.addLong(value.asInstanceOf[Long])
|
||||
case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp])
|
||||
case TimestampType => writeTimestamp(value.asInstanceOf[Long])
|
||||
case ByteType => writer.addInteger(value.asInstanceOf[Byte])
|
||||
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
|
||||
case FloatType => writer.addFloat(value.asInstanceOf[Float])
|
||||
|
@ -311,8 +312,9 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
|
|||
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
|
||||
}
|
||||
|
||||
private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = {
|
||||
val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts)
|
||||
private[parquet] def writeTimestamp(ts: Long): Unit = {
|
||||
val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(
|
||||
DateUtils.toJavaTimestamp(ts))
|
||||
writer.addBinary(binaryNanoTime)
|
||||
}
|
||||
}
|
||||
|
@ -357,7 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
|
|||
case FloatType => writer.addFloat(record.getFloat(index))
|
||||
case BooleanType => writer.addBoolean(record.getBoolean(index))
|
||||
case DateType => writer.addInteger(record.getInt(index))
|
||||
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
|
||||
case TimestampType => writeTimestamp(record(index).asInstanceOf[Long])
|
||||
case d: DecimalType =>
|
||||
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
|
||||
sys.error(s"Unsupported datatype $d, cannot write to consumer")
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually._
|
|||
import org.apache.spark.Accumulators
|
||||
import org.apache.spark.sql.TestData._
|
||||
import org.apache.spark.sql.columnar._
|
||||
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
|
||||
import org.apache.spark.storage.{StorageLevel, RDDBlockId}
|
||||
|
||||
case class BigData(s: String)
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ class ColumnStatsSuite extends SparkFunSuite {
|
|||
testColumnStats(classOf[FixedDecimalColumnStats], FIXED_DECIMAL(15, 10), Row(null, null, 0))
|
||||
testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0))
|
||||
testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, Int.MinValue, 0))
|
||||
testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(null, null, 0))
|
||||
testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(Long.MaxValue, Long.MinValue, 0))
|
||||
|
||||
def testColumnStats[T <: AtomicType, U <: ColumnStats](
|
||||
columnStatsClass: Class[U],
|
||||
|
|
|
@ -18,17 +18,16 @@
|
|||
package org.apache.spark.sql.columnar
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.sql.Timestamp
|
||||
|
||||
import com.esotericsoftware.kryo.{Serializer, Kryo}
|
||||
import com.esotericsoftware.kryo.io.{Input, Output}
|
||||
import org.apache.spark.serializer.KryoRegistrator
|
||||
import com.esotericsoftware.kryo.{Kryo, Serializer}
|
||||
|
||||
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.serializer.KryoRegistrator
|
||||
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
|
||||
import org.apache.spark.sql.columnar.ColumnarTestUtils._
|
||||
import org.apache.spark.sql.execution.SparkSqlSerializer
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
|
||||
|
||||
class ColumnTypeSuite extends SparkFunSuite with Logging {
|
||||
val DEFAULT_BUFFER_SIZE = 512
|
||||
|
@ -36,7 +35,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
|
|||
test("defaultSize") {
|
||||
val checks = Map(
|
||||
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4,
|
||||
FIXED_DECIMAL(15, 10) -> 8, BOOLEAN -> 1, STRING -> 8, DATE -> 4, TIMESTAMP -> 12,
|
||||
FIXED_DECIMAL(15, 10) -> 8, BOOLEAN -> 1, STRING -> 8, DATE -> 4, TIMESTAMP -> 8,
|
||||
BINARY -> 16, GENERIC -> 16)
|
||||
|
||||
checks.foreach { case (columnType, expectedSize) =>
|
||||
|
@ -69,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
|
|||
checkActualSize(BOOLEAN, true, 1)
|
||||
checkActualSize(STRING, UTF8String("hello"), 4 + "hello".getBytes("utf-8").length)
|
||||
checkActualSize(DATE, 0, 4)
|
||||
checkActualSize(TIMESTAMP, new Timestamp(0L), 12)
|
||||
checkActualSize(TIMESTAMP, 0L, 8)
|
||||
|
||||
val binary = Array.fill[Byte](4)(0: Byte)
|
||||
checkActualSize(BINARY, binary, 4 + 4)
|
||||
|
|
|
@ -17,14 +17,12 @@
|
|||
|
||||
package org.apache.spark.sql.columnar
|
||||
|
||||
import java.sql.Timestamp
|
||||
|
||||
import scala.collection.immutable.HashSet
|
||||
import scala.util.Random
|
||||
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
|
||||
import org.apache.spark.sql.types.{UTF8String, DataType, Decimal, AtomicType}
|
||||
import org.apache.spark.sql.types.{AtomicType, DataType, Decimal, UTF8String}
|
||||
|
||||
object ColumnarTestUtils {
|
||||
def makeNullRow(length: Int): GenericMutableRow = {
|
||||
|
@ -52,10 +50,7 @@ object ColumnarTestUtils {
|
|||
case BOOLEAN => Random.nextBoolean()
|
||||
case BINARY => randomBytes(Random.nextInt(32))
|
||||
case DATE => Random.nextInt()
|
||||
case TIMESTAMP =>
|
||||
val timestamp = new Timestamp(Random.nextLong())
|
||||
timestamp.setNanos(Random.nextInt(999999999))
|
||||
timestamp
|
||||
case TIMESTAMP => Random.nextLong()
|
||||
case _ =>
|
||||
// Using a random one-element map instead of an arbitrary object
|
||||
Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))
|
||||
|
|
|
@ -326,7 +326,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
assert(cal.get(Calendar.HOUR) === 11)
|
||||
assert(cal.get(Calendar.MINUTE) === 22)
|
||||
assert(cal.get(Calendar.SECOND) === 33)
|
||||
assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543543)
|
||||
assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543500)
|
||||
}
|
||||
|
||||
test("test DATE types") {
|
||||
|
|
|
@ -76,21 +76,25 @@ class JsonSuite extends QueryTest with TestJsonData {
|
|||
checkTypePromotion(
|
||||
Decimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType.Unlimited))
|
||||
|
||||
checkTypePromotion(new Timestamp(intNumber), enforceCorrectType(intNumber, TimestampType))
|
||||
checkTypePromotion(new Timestamp(intNumber.toLong),
|
||||
checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber)),
|
||||
enforceCorrectType(intNumber, TimestampType))
|
||||
checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)),
|
||||
enforceCorrectType(intNumber.toLong, TimestampType))
|
||||
val strTime = "2014-09-30 12:34:56"
|
||||
checkTypePromotion(Timestamp.valueOf(strTime), enforceCorrectType(strTime, TimestampType))
|
||||
checkTypePromotion(DateUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
|
||||
enforceCorrectType(strTime, TimestampType))
|
||||
|
||||
val strDate = "2014-10-15"
|
||||
checkTypePromotion(
|
||||
DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
|
||||
|
||||
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
|
||||
checkTypePromotion(new Timestamp(3601000), enforceCorrectType(ISO8601Time1, TimestampType))
|
||||
checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(3601000)),
|
||||
enforceCorrectType(ISO8601Time1, TimestampType))
|
||||
checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType))
|
||||
val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
|
||||
checkTypePromotion(new Timestamp(10801000), enforceCorrectType(ISO8601Time2, TimestampType))
|
||||
checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(10801000)),
|
||||
enforceCorrectType(ISO8601Time2, TimestampType))
|
||||
checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType))
|
||||
}
|
||||
|
||||
|
|
|
@ -252,7 +252,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"load_dyn_part14.*", // These work alone but fail when run with other tests...
|
||||
|
||||
// the answer is sensitive for jdk version
|
||||
"udf_java_method"
|
||||
"udf_java_method",
|
||||
|
||||
// Spark SQL use Long for TimestampType, lose the precision under 100ns
|
||||
"timestamp_1",
|
||||
"timestamp_2"
|
||||
)
|
||||
|
||||
/**
|
||||
|
@ -795,8 +799,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"stats_publisher_error_1",
|
||||
"subq2",
|
||||
"tablename_with_select",
|
||||
"timestamp_1",
|
||||
"timestamp_2",
|
||||
"timestamp_3",
|
||||
"timestamp_comparison",
|
||||
"timestamp_lazy",
|
||||
|
|
|
@ -250,7 +250,8 @@ private[hive] trait HiveInspectors {
|
|||
PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector,
|
||||
poi.getWritableConstantValue.getHiveDecimal)
|
||||
case poi: WritableConstantTimestampObjectInspector =>
|
||||
poi.getWritableConstantValue.getTimestamp.clone()
|
||||
val t = poi.getWritableConstantValue
|
||||
t.getSeconds * 10000000L + t.getNanos / 100L
|
||||
case poi: WritableConstantIntObjectInspector =>
|
||||
poi.getWritableConstantValue.get()
|
||||
case poi: WritableConstantDoubleObjectInspector =>
|
||||
|
@ -313,11 +314,11 @@ private[hive] trait HiveInspectors {
|
|||
case x: DateObjectInspector if x.preferWritable() =>
|
||||
DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
|
||||
case x: DateObjectInspector => DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
|
||||
// org.apache.hadoop.hive.serde2.io.TimestampWritable.set will reset current time object
|
||||
// if next timestamp is null, so Timestamp object is cloned
|
||||
case x: TimestampObjectInspector if x.preferWritable() =>
|
||||
x.getPrimitiveWritableObject(data).getTimestamp.clone()
|
||||
case ti: TimestampObjectInspector => ti.getPrimitiveJavaObject(data).clone()
|
||||
val t = x.getPrimitiveWritableObject(data)
|
||||
t.getSeconds * 10000000L + t.getNanos / 100
|
||||
case ti: TimestampObjectInspector =>
|
||||
DateUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data))
|
||||
case _ => pi.getPrimitiveJavaObject(data)
|
||||
}
|
||||
case li: ListObjectInspector =>
|
||||
|
@ -356,6 +357,9 @@ private[hive] trait HiveInspectors {
|
|||
case _: JavaDateObjectInspector =>
|
||||
(o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
|
||||
|
||||
case _: JavaTimestampObjectInspector =>
|
||||
(o: Any) => DateUtils.toJavaTimestamp(o.asInstanceOf[Long])
|
||||
|
||||
case soi: StandardStructObjectInspector =>
|
||||
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
|
||||
(o: Any) => {
|
||||
|
@ -465,7 +469,7 @@ private[hive] trait HiveInspectors {
|
|||
case _: DateObjectInspector if x.preferWritable() => getDateWritable(a)
|
||||
case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
|
||||
case _: TimestampObjectInspector if x.preferWritable() => getTimestampWritable(a)
|
||||
case _: TimestampObjectInspector => a.asInstanceOf[java.sql.Timestamp]
|
||||
case _: TimestampObjectInspector => DateUtils.toJavaTimestamp(a.asInstanceOf[Long])
|
||||
}
|
||||
case x: SettableStructObjectInspector =>
|
||||
val fieldRefs = x.getAllStructFieldRefs
|
||||
|
@ -727,7 +731,7 @@ private[hive] trait HiveInspectors {
|
|||
TypeInfoFactory.voidTypeInfo, null)
|
||||
|
||||
private def getStringWritable(value: Any): hadoopIo.Text =
|
||||
if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
|
||||
if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].getBytes)
|
||||
|
||||
private def getIntWritable(value: Any): hadoopIo.IntWritable =
|
||||
if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
|
||||
|
@ -776,7 +780,7 @@ private[hive] trait HiveInspectors {
|
|||
if (value == null) {
|
||||
null
|
||||
} else {
|
||||
new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
|
||||
new hiveIo.TimestampWritable(DateUtils.toJavaTimestamp(value.asInstanceOf[Long]))
|
||||
}
|
||||
|
||||
private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
|
||||
|
|
|
@ -363,10 +363,10 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
|
|||
row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))
|
||||
case oi: TimestampObjectInspector =>
|
||||
(value: Any, row: MutableRow, ordinal: Int) =>
|
||||
row.update(ordinal, oi.getPrimitiveJavaObject(value).clone())
|
||||
row.setLong(ordinal, DateUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
|
||||
case oi: DateObjectInspector =>
|
||||
(value: Any, row: MutableRow, ordinal: Int) =>
|
||||
row.update(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
|
||||
row.setInt(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
|
||||
case oi: BinaryObjectInspector =>
|
||||
(value: Any, row: MutableRow, ordinal: Int) =>
|
||||
row.update(ordinal, oi.getPrimitiveJavaObject(value))
|
||||
|
|
|
@ -1 +1 @@
|
|||
-0.0010000000000000009
|
||||
-0.001
|
||||
|
|
Loading…
Reference in a new issue