Revert "[SPARK-4508] [SQL] build native date type to conform behavior to Hive"
This reverts commit 1646f89d96
.
This commit is contained in:
parent
cfea30037f
commit
eccb9fbb2d
|
@ -20,7 +20,7 @@ package org.apache.spark.sql
|
|||
import scala.util.hashing.MurmurHash3
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.GenericRow
|
||||
import org.apache.spark.sql.types.DateUtils
|
||||
|
||||
|
||||
object Row {
|
||||
/**
|
||||
|
|
|
@ -17,13 +17,14 @@
|
|||
|
||||
package org.apache.spark.sql.catalyst
|
||||
|
||||
import java.sql.Timestamp
|
||||
import java.sql.{Date, Timestamp}
|
||||
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute, AttributeReference, Row}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
|
||||
/**
|
||||
* A default version of ScalaReflection that uses the runtime universe.
|
||||
*/
|
||||
|
@ -71,7 +72,6 @@ trait ScalaReflection {
|
|||
}.toArray)
|
||||
case (d: BigDecimal, _) => Decimal(d)
|
||||
case (d: java.math.BigDecimal, _) => Decimal(d)
|
||||
case (d: java.sql.Date, _) => DateUtils.fromJavaDate(d)
|
||||
case (other, _) => other
|
||||
}
|
||||
|
||||
|
@ -85,7 +85,6 @@ trait ScalaReflection {
|
|||
}
|
||||
case (r: Row, s: StructType) => convertRowToScala(r, s)
|
||||
case (d: Decimal, _: DecimalType) => d.toJavaBigDecimal
|
||||
case (i: Int, DateType) => DateUtils.toJavaDate(i)
|
||||
case (other, _) => other
|
||||
}
|
||||
|
||||
|
@ -160,7 +159,7 @@ trait ScalaReflection {
|
|||
valueDataType, valueContainsNull = valueNullable), nullable = true)
|
||||
case t if t <:< typeOf[String] => Schema(StringType, nullable = true)
|
||||
case t if t <:< typeOf[Timestamp] => Schema(TimestampType, nullable = true)
|
||||
case t if t <:< typeOf[java.sql.Date] => Schema(DateType, nullable = true)
|
||||
case t if t <:< typeOf[Date] => Schema(DateType, nullable = true)
|
||||
case t if t <:< typeOf[BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
|
||||
case t if t <:< typeOf[java.math.BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
|
||||
case t if t <:< typeOf[Decimal] => Schema(DecimalType.Unlimited, nullable = true)
|
||||
|
@ -192,7 +191,7 @@ trait ScalaReflection {
|
|||
case obj: LongType.JvmType => LongType
|
||||
case obj: FloatType.JvmType => FloatType
|
||||
case obj: DoubleType.JvmType => DoubleType
|
||||
case obj: java.sql.Date => DateType
|
||||
case obj: DateType.JvmType => DateType
|
||||
case obj: java.math.BigDecimal => DecimalType.Unlimited
|
||||
case obj: Decimal => DecimalType.Unlimited
|
||||
case obj: TimestampType.JvmType => TimestampType
|
||||
|
|
|
@ -52,7 +52,6 @@ class SqlParser extends AbstractSparkSQLParser {
|
|||
protected val CAST = Keyword("CAST")
|
||||
protected val COALESCE = Keyword("COALESCE")
|
||||
protected val COUNT = Keyword("COUNT")
|
||||
protected val DATE = Keyword("DATE")
|
||||
protected val DECIMAL = Keyword("DECIMAL")
|
||||
protected val DESC = Keyword("DESC")
|
||||
protected val DISTINCT = Keyword("DISTINCT")
|
||||
|
@ -384,7 +383,6 @@ class SqlParser extends AbstractSparkSQLParser {
|
|||
| DOUBLE ^^^ DoubleType
|
||||
| fixedDecimalType
|
||||
| DECIMAL ^^^ DecimalType.Unlimited
|
||||
| DATE ^^^ DateType
|
||||
)
|
||||
|
||||
protected lazy val fixedDecimalType: Parser[DataType] =
|
||||
|
|
|
@ -113,7 +113,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
// UDFToString
|
||||
private[this] def castToString(from: DataType): Any => Any = from match {
|
||||
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
|
||||
case DateType => buildCast[Int](_, d => DateUtils.toString(d))
|
||||
case DateType => buildCast[Date](_, dateToString)
|
||||
case TimestampType => buildCast[Timestamp](_, timestampToString)
|
||||
case _ => buildCast[Any](_, _.toString)
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
|
||||
case DateType =>
|
||||
// Hive would return null when cast from date to boolean
|
||||
buildCast[Int](_, d => null)
|
||||
buildCast[Date](_, d => null)
|
||||
case LongType =>
|
||||
buildCast[Long](_, _ != 0)
|
||||
case IntegerType =>
|
||||
|
@ -171,7 +171,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case ByteType =>
|
||||
buildCast[Byte](_, b => new Timestamp(b))
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
|
||||
buildCast[Date](_, d => new Timestamp(d.getTime))
|
||||
// TimestampWritable.decimalToTimestamp
|
||||
case DecimalType() =>
|
||||
buildCast[Decimal](_, d => decimalToTimestamp(d))
|
||||
|
@ -224,24 +224,37 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
}
|
||||
}
|
||||
|
||||
// Converts Timestamp to string according to Hive TimestampWritable convention
|
||||
private[this] def timestampToDateString(ts: Timestamp): String = {
|
||||
Cast.threadLocalDateFormat.get.format(ts)
|
||||
}
|
||||
|
||||
// DateConverter
|
||||
private[this] def castToDate(from: DataType): Any => Any = from match {
|
||||
case StringType =>
|
||||
buildCast[String](_, s =>
|
||||
try DateUtils.fromJavaDate(Date.valueOf(s))
|
||||
catch { case _: java.lang.IllegalArgumentException => null }
|
||||
)
|
||||
try Date.valueOf(s) catch { case _: java.lang.IllegalArgumentException => null })
|
||||
case TimestampType =>
|
||||
// throw valid precision more than seconds, according to Hive.
|
||||
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
|
||||
buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
|
||||
buildCast[Timestamp](_, t => new Date(Math.floor(t.getTime / 1000.0).toLong * 1000))
|
||||
// Hive throws this exception as a Semantic Exception
|
||||
// It is never possible to compare result when hive return with exception,
|
||||
// so we can return null
|
||||
// It is never possible to compare result when hive return with exception, so we can return null
|
||||
// NULL is more reasonable here, since the query itself obeys the grammar.
|
||||
case _ => _ => null
|
||||
}
|
||||
|
||||
// Date cannot be cast to long, according to hive
|
||||
private[this] def dateToLong(d: Date) = null
|
||||
|
||||
// Date cannot be cast to double, according to hive
|
||||
private[this] def dateToDouble(d: Date) = null
|
||||
|
||||
// Converts Date to string according to Hive DateWritable convention
|
||||
private[this] def dateToString(d: Date): String = {
|
||||
Cast.threadLocalDateFormat.get.format(d)
|
||||
}
|
||||
|
||||
// LongConverter
|
||||
private[this] def castToLong(from: DataType): Any => Any = from match {
|
||||
case StringType =>
|
||||
|
@ -251,7 +264,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case BooleanType =>
|
||||
buildCast[Boolean](_, b => if (b) 1L else 0L)
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
buildCast[Date](_, d => dateToLong(d))
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToLong(t))
|
||||
case x: NumericType =>
|
||||
|
@ -267,7 +280,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case BooleanType =>
|
||||
buildCast[Boolean](_, b => if (b) 1 else 0)
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
buildCast[Date](_, d => dateToLong(d))
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToLong(t).toInt)
|
||||
case x: NumericType =>
|
||||
|
@ -283,7 +296,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case BooleanType =>
|
||||
buildCast[Boolean](_, b => if (b) 1.toShort else 0.toShort)
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
buildCast[Date](_, d => dateToLong(d))
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToLong(t).toShort)
|
||||
case x: NumericType =>
|
||||
|
@ -299,7 +312,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case BooleanType =>
|
||||
buildCast[Boolean](_, b => if (b) 1.toByte else 0.toByte)
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
buildCast[Date](_, d => dateToLong(d))
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToLong(t).toByte)
|
||||
case x: NumericType =>
|
||||
|
@ -329,7 +342,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case BooleanType =>
|
||||
buildCast[Boolean](_, b => changePrecision(if (b) Decimal(1) else Decimal(0), target))
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => null) // date can't cast to decimal in Hive
|
||||
buildCast[Date](_, d => null) // date can't cast to decimal in Hive
|
||||
case TimestampType =>
|
||||
// Note that we lose precision here.
|
||||
buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
|
||||
|
@ -354,7 +367,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case BooleanType =>
|
||||
buildCast[Boolean](_, b => if (b) 1d else 0d)
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
buildCast[Date](_, d => dateToDouble(d))
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToDouble(t))
|
||||
case x: NumericType =>
|
||||
|
@ -370,7 +383,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
case BooleanType =>
|
||||
buildCast[Boolean](_, b => if (b) 1f else 0f)
|
||||
case DateType =>
|
||||
buildCast[Int](_, d => null)
|
||||
buildCast[Date](_, d => dateToDouble(d))
|
||||
case TimestampType =>
|
||||
buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
|
||||
case x: NumericType =>
|
||||
|
@ -428,17 +441,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
|
|||
}
|
||||
|
||||
object Cast {
|
||||
// `SimpleDateFormat` is not thread-safe.
|
||||
private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
|
||||
override def initialValue() = {
|
||||
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
|
||||
}
|
||||
}
|
||||
|
||||
// `SimpleDateFormat` is not thread-safe.
|
||||
private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
|
||||
override def initialValue() = {
|
||||
new SimpleDateFormat("yyyy-MM-dd")
|
||||
}
|
||||
}
|
||||
|
||||
// `SimpleDateFormat` is not thread-safe.
|
||||
private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
|
||||
override def initialValue() = {
|
||||
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -246,9 +246,6 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
|
|||
new String(${eval.primitiveTerm}.asInstanceOf[Array[Byte]])
|
||||
""".children
|
||||
|
||||
case Cast(child @ DateType(), StringType) =>
|
||||
child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType)
|
||||
|
||||
case Cast(child @ NumericType(), IntegerType) =>
|
||||
child.castOrNull(c => q"$c.toInt", IntegerType)
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ object Literal {
|
|||
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
|
||||
case d: Decimal => Literal(d, DecimalType.Unlimited)
|
||||
case t: Timestamp => Literal(t, TimestampType)
|
||||
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
|
||||
case d: Date => Literal(d, DateType)
|
||||
case a: Array[Byte] => Literal(a, BinaryType)
|
||||
case null => Literal(null, NullType)
|
||||
}
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.types
|
||||
|
||||
import java.sql.Date
|
||||
import java.util.{Calendar, TimeZone}
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.Cast
|
||||
|
||||
/**
|
||||
* helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
|
||||
*/
|
||||
object DateUtils {
|
||||
private val MILLIS_PER_DAY = 86400000
|
||||
|
||||
// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
|
||||
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
|
||||
override protected def initialValue: TimeZone = {
|
||||
Calendar.getInstance.getTimeZone
|
||||
}
|
||||
}
|
||||
|
||||
private def javaDateToDays(d: Date): Int = {
|
||||
millisToDays(d.getTime)
|
||||
}
|
||||
|
||||
def millisToDays(millisLocal: Long): Int = {
|
||||
((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
|
||||
}
|
||||
|
||||
private def toMillisSinceEpoch(days: Int): Long = {
|
||||
val millisUtc = days.toLong * MILLIS_PER_DAY
|
||||
millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
|
||||
}
|
||||
|
||||
def fromJavaDate(date: java.sql.Date): Int = {
|
||||
javaDateToDays(date)
|
||||
}
|
||||
|
||||
def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
|
||||
new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
|
||||
}
|
||||
|
||||
def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql.types
|
||||
|
||||
import java.sql.Timestamp
|
||||
import java.sql.{Date, Timestamp}
|
||||
|
||||
import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
|
||||
import scala.reflect.ClassTag
|
||||
|
@ -387,16 +387,18 @@ case object TimestampType extends NativeType {
|
|||
*/
|
||||
@DeveloperApi
|
||||
case object DateType extends NativeType {
|
||||
private[sql] type JvmType = Int
|
||||
private[sql] type JvmType = Date
|
||||
|
||||
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
|
||||
|
||||
private[sql] val ordering = implicitly[Ordering[JvmType]]
|
||||
private[sql] val ordering = new Ordering[JvmType] {
|
||||
def compare(x: Date, y: Date) = x.compareTo(y)
|
||||
}
|
||||
|
||||
/**
|
||||
* The default size of a value of the DateType is 4 bytes.
|
||||
* The default size of a value of the DateType is 8 bytes.
|
||||
*/
|
||||
override def defaultSize: Int = 4
|
||||
override def defaultSize: Int = 8
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -303,7 +303,6 @@ class ExpressionEvaluationSuite extends FunSuite {
|
|||
|
||||
val sd = "1970-01-01"
|
||||
val d = Date.valueOf(sd)
|
||||
val zts = sd + " 00:00:00"
|
||||
val sts = sd + " 00:00:02"
|
||||
val nts = sts + ".1"
|
||||
val ts = Timestamp.valueOf(nts)
|
||||
|
@ -320,14 +319,14 @@ class ExpressionEvaluationSuite extends FunSuite {
|
|||
checkEvaluation(Cast(Literal(1.toDouble) cast TimestampType, DoubleType), 1.toDouble)
|
||||
|
||||
checkEvaluation(Cast(Literal(sd) cast DateType, StringType), sd)
|
||||
checkEvaluation(Cast(Literal(d) cast StringType, DateType), 0)
|
||||
checkEvaluation(Cast(Literal(d) cast StringType, DateType), d)
|
||||
checkEvaluation(Cast(Literal(nts) cast TimestampType, StringType), nts)
|
||||
checkEvaluation(Cast(Literal(ts) cast StringType, TimestampType), ts)
|
||||
// all convert to string type to check
|
||||
checkEvaluation(
|
||||
Cast(Cast(Literal(nts) cast TimestampType, DateType), StringType), sd)
|
||||
checkEvaluation(
|
||||
Cast(Cast(Literal(ts) cast DateType, TimestampType), StringType), zts)
|
||||
Cast(Cast(Literal(ts) cast DateType, TimestampType), StringType), sts)
|
||||
|
||||
checkEvaluation(Cast("abdef" cast BinaryType, StringType), "abdef")
|
||||
|
||||
|
@ -378,8 +377,8 @@ class ExpressionEvaluationSuite extends FunSuite {
|
|||
}
|
||||
|
||||
test("date") {
|
||||
val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))
|
||||
val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02"))
|
||||
val d1 = Date.valueOf("1970-01-01")
|
||||
val d2 = Date.valueOf("1970-01-02")
|
||||
checkEvaluation(Literal(d1) < Literal(d2), true)
|
||||
}
|
||||
|
||||
|
@ -460,21 +459,22 @@ class ExpressionEvaluationSuite extends FunSuite {
|
|||
|
||||
test("date casting") {
|
||||
val d = Date.valueOf("1970-01-01")
|
||||
checkEvaluation(Cast(Literal(d), ShortType), null)
|
||||
checkEvaluation(Cast(Literal(d), IntegerType), null)
|
||||
checkEvaluation(Cast(Literal(d), LongType), null)
|
||||
checkEvaluation(Cast(Literal(d), FloatType), null)
|
||||
checkEvaluation(Cast(Literal(d), DoubleType), null)
|
||||
checkEvaluation(Cast(Literal(d), DecimalType.Unlimited), null)
|
||||
checkEvaluation(Cast(Literal(d), DecimalType(10, 2)), null)
|
||||
checkEvaluation(Cast(Literal(d), StringType), "1970-01-01")
|
||||
checkEvaluation(Cast(Cast(Literal(d), TimestampType), StringType), "1970-01-01 00:00:00")
|
||||
checkEvaluation(Cast(d, ShortType), null)
|
||||
checkEvaluation(Cast(d, IntegerType), null)
|
||||
checkEvaluation(Cast(d, LongType), null)
|
||||
checkEvaluation(Cast(d, FloatType), null)
|
||||
checkEvaluation(Cast(d, DoubleType), null)
|
||||
checkEvaluation(Cast(d, DecimalType.Unlimited), null)
|
||||
checkEvaluation(Cast(d, DecimalType(10, 2)), null)
|
||||
checkEvaluation(Cast(d, StringType), "1970-01-01")
|
||||
checkEvaluation(Cast(Cast(d, TimestampType), StringType), "1970-01-01 00:00:00")
|
||||
}
|
||||
|
||||
test("timestamp casting") {
|
||||
val millis = 15 * 1000 + 2
|
||||
val seconds = millis * 1000 + 2
|
||||
val ts = new Timestamp(millis)
|
||||
val ts1 = new Timestamp(15 * 1000) // a timestamp without the milliseconds part
|
||||
val tss = new Timestamp(seconds)
|
||||
checkEvaluation(Cast(ts, ShortType), 15)
|
||||
checkEvaluation(Cast(ts, IntegerType), 15)
|
||||
|
|
|
@ -106,7 +106,7 @@ class DataTypeSuite extends FunSuite {
|
|||
checkDefaultSize(DoubleType, 8)
|
||||
checkDefaultSize(DecimalType(10, 5), 4096)
|
||||
checkDefaultSize(DecimalType.Unlimited, 4096)
|
||||
checkDefaultSize(DateType, 4)
|
||||
checkDefaultSize(DateType, 8)
|
||||
checkDefaultSize(TimestampType, 8)
|
||||
checkDefaultSize(StringType, 4096)
|
||||
checkDefaultSize(BinaryType, 4096)
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql.columnar
|
||||
|
||||
import java.sql.Timestamp
|
||||
import java.sql.{Date, Timestamp}
|
||||
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute, AttributeReference}
|
||||
|
@ -215,7 +215,22 @@ private[sql] class StringColumnStats extends ColumnStats {
|
|||
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
|
||||
}
|
||||
|
||||
private[sql] class DateColumnStats extends IntColumnStats
|
||||
private[sql] class DateColumnStats extends ColumnStats {
|
||||
protected var upper: Date = null
|
||||
protected var lower: Date = null
|
||||
|
||||
override def gatherStats(row: Row, ordinal: Int) {
|
||||
super.gatherStats(row, ordinal)
|
||||
if (!row.isNullAt(ordinal)) {
|
||||
val value = row(ordinal).asInstanceOf[Date]
|
||||
if (upper == null || value.compareTo(upper) > 0) upper = value
|
||||
if (lower == null || value.compareTo(lower) < 0) lower = value
|
||||
sizeInBytes += DATE.defaultSize
|
||||
}
|
||||
}
|
||||
|
||||
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
|
||||
}
|
||||
|
||||
private[sql] class TimestampColumnStats extends ColumnStats {
|
||||
protected var upper: Timestamp = null
|
||||
|
|
|
@ -335,20 +335,21 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
|
|||
}
|
||||
}
|
||||
|
||||
private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
|
||||
private[sql] object DATE extends NativeColumnType(DateType, 8, 8) {
|
||||
override def extract(buffer: ByteBuffer) = {
|
||||
buffer.getInt
|
||||
val date = new Date(buffer.getLong())
|
||||
date
|
||||
}
|
||||
|
||||
override def append(v: Int, buffer: ByteBuffer): Unit = {
|
||||
buffer.putInt(v)
|
||||
override def append(v: Date, buffer: ByteBuffer): Unit = {
|
||||
buffer.putLong(v.getTime)
|
||||
}
|
||||
|
||||
override def getField(row: Row, ordinal: Int) = {
|
||||
row(ordinal).asInstanceOf[Int]
|
||||
row(ordinal).asInstanceOf[Date]
|
||||
}
|
||||
|
||||
def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
|
||||
override def setField(row: MutableRow, ordinal: Int, value: Date): Unit = {
|
||||
row(ordinal) = value
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,8 +135,6 @@ object EvaluatePython {
|
|||
|
||||
case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)
|
||||
|
||||
case (date: Int, DateType) => DateUtils.toJavaDate(date)
|
||||
|
||||
// Pyrolite can handle Timestamp and Decimal
|
||||
case (other, _) => other
|
||||
}
|
||||
|
@ -173,7 +171,7 @@ object EvaluatePython {
|
|||
}): Row
|
||||
|
||||
case (c: java.util.Calendar, DateType) =>
|
||||
DateUtils.fromJavaDate(new java.sql.Date(c.getTime().getTime()))
|
||||
new java.sql.Date(c.getTime().getTime())
|
||||
|
||||
case (c: java.util.Calendar, TimestampType) =>
|
||||
new java.sql.Timestamp(c.getTime().getTime())
|
||||
|
|
|
@ -377,12 +377,10 @@ private[sql] object JsonRDD extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private def toDate(value: Any): Int = {
|
||||
private def toDate(value: Any): Date = {
|
||||
value match {
|
||||
// only support string as date
|
||||
case value: java.lang.String =>
|
||||
DateUtils.millisToDays(DataTypeConversions.stringToTime(value).getTime)
|
||||
case value: java.sql.Date => DateUtils.fromJavaDate(value)
|
||||
case value: java.lang.String => new Date(DataTypeConversions.stringToTime(value).getTime)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -296,13 +296,6 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
|
|||
mapData.collect().take(1).map(Row.fromTuple).toSeq)
|
||||
}
|
||||
|
||||
test("date row") {
|
||||
checkAnswer(sql(
|
||||
"""select cast("2015-01-28" as date) from testData limit 1"""),
|
||||
Row(java.sql.Date.valueOf("2015-01-28"))
|
||||
)
|
||||
}
|
||||
|
||||
test("from follow multiple brackets") {
|
||||
checkAnswer(sql(
|
||||
"select key from ((select * from testData limit 1) union all (select * from testData limit 1)) x limit 1"),
|
||||
|
|
|
@ -83,8 +83,7 @@ class ScalaReflectionRelationSuite extends FunSuite {
|
|||
|
||||
assert(sql("SELECT * FROM reflectData").collect().head ===
|
||||
Row("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
|
||||
new java.math.BigDecimal(1), new Date(70, 0, 1), // This is 1970-01-01
|
||||
new Timestamp(12345), Seq(1,2,3)))
|
||||
new java.math.BigDecimal(1), new Date(12345), new Timestamp(12345), Seq(1,2,3)))
|
||||
}
|
||||
|
||||
test("query case class RDD with nulls") {
|
||||
|
|
|
@ -30,7 +30,7 @@ class ColumnStatsSuite extends FunSuite {
|
|||
testColumnStats(classOf[FloatColumnStats], FLOAT, Row(Float.MaxValue, Float.MinValue, 0))
|
||||
testColumnStats(classOf[DoubleColumnStats], DOUBLE, Row(Double.MaxValue, Double.MinValue, 0))
|
||||
testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0))
|
||||
testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, Int.MinValue, 0))
|
||||
testColumnStats(classOf[DateColumnStats], DATE, Row(null, null, 0))
|
||||
testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(null, null, 0))
|
||||
|
||||
def testColumnStats[T <: NativeType, U <: ColumnStats](
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.spark.sql.columnar
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.sql.Timestamp
|
||||
import java.sql.{Date, Timestamp}
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
|
@ -34,7 +34,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
|
|||
test("defaultSize") {
|
||||
val checks = Map(
|
||||
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4, BOOLEAN -> 1,
|
||||
STRING -> 8, DATE -> 4, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)
|
||||
STRING -> 8, DATE -> 8, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)
|
||||
|
||||
checks.foreach { case (columnType, expectedSize) =>
|
||||
assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
|
||||
|
@ -64,7 +64,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
|
|||
checkActualSize(FLOAT, Float.MaxValue, 4)
|
||||
checkActualSize(BOOLEAN, true, 1)
|
||||
checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
|
||||
checkActualSize(DATE, 0, 4)
|
||||
checkActualSize(DATE, new Date(0L), 8)
|
||||
checkActualSize(TIMESTAMP, new Timestamp(0L), 12)
|
||||
|
||||
val binary = Array.fill[Byte](4)(0: Byte)
|
||||
|
|
|
@ -17,11 +17,11 @@
|
|||
|
||||
package org.apache.spark.sql.columnar
|
||||
|
||||
import java.sql.Timestamp
|
||||
|
||||
import scala.collection.immutable.HashSet
|
||||
import scala.util.Random
|
||||
|
||||
import java.sql.{Date, Timestamp}
|
||||
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
|
||||
import org.apache.spark.sql.types.{DataType, NativeType}
|
||||
|
@ -50,7 +50,7 @@ object ColumnarTestUtils {
|
|||
case STRING => Random.nextString(Random.nextInt(32))
|
||||
case BOOLEAN => Random.nextBoolean()
|
||||
case BINARY => randomBytes(Random.nextInt(32))
|
||||
case DATE => Random.nextInt()
|
||||
case DATE => new Date(Random.nextLong())
|
||||
case TIMESTAMP =>
|
||||
val timestamp = new Timestamp(Random.nextLong())
|
||||
timestamp.setNanos(Random.nextInt(999999999))
|
||||
|
|
|
@ -67,15 +67,14 @@ class JsonSuite extends QueryTest {
|
|||
checkTypePromotion(Timestamp.valueOf(strTime), enforceCorrectType(strTime, TimestampType))
|
||||
|
||||
val strDate = "2014-10-15"
|
||||
checkTypePromotion(
|
||||
DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
|
||||
checkTypePromotion(Date.valueOf(strDate), enforceCorrectType(strDate, DateType))
|
||||
|
||||
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
|
||||
checkTypePromotion(new Timestamp(3601000), enforceCorrectType(ISO8601Time1, TimestampType))
|
||||
checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType))
|
||||
checkTypePromotion(new Date(3601000), enforceCorrectType(ISO8601Time1, DateType))
|
||||
val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
|
||||
checkTypePromotion(new Timestamp(10801000), enforceCorrectType(ISO8601Time2, TimestampType))
|
||||
checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType))
|
||||
checkTypePromotion(new Date(10801000), enforceCorrectType(ISO8601Time2, DateType))
|
||||
}
|
||||
|
||||
test("Get compatible type") {
|
||||
|
|
|
@ -357,7 +357,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"database_drop",
|
||||
"database_location",
|
||||
"database_properties",
|
||||
"date_1",
|
||||
"date_2",
|
||||
"date_3",
|
||||
"date_4",
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.spark.sql.hive
|
||||
|
||||
import java.io.{BufferedReader, InputStreamReader, PrintStream}
|
||||
import java.sql.Timestamp
|
||||
import java.sql.{Date, Timestamp}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.language.implicitConversions
|
||||
|
@ -409,7 +409,7 @@ private object HiveContext {
|
|||
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
|
||||
}.toSeq.sorted.mkString("{", ",", "}")
|
||||
case (null, _) => "NULL"
|
||||
case (d: Int, DateType) => new DateWritable(d).toString
|
||||
case (d: Date, DateType) => new DateWritable(d).toString
|
||||
case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
|
||||
case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
|
||||
case (decimal: java.math.BigDecimal, DecimalType()) =>
|
||||
|
|
|
@ -267,8 +267,7 @@ private[hive] trait HiveInspectors {
|
|||
val temp = new Array[Byte](writable.getLength)
|
||||
System.arraycopy(writable.getBytes, 0, temp, 0, temp.length)
|
||||
temp
|
||||
case poi: WritableConstantDateObjectInspector =>
|
||||
DateUtils.fromJavaDate(poi.getWritableConstantValue.get())
|
||||
case poi: WritableConstantDateObjectInspector => poi.getWritableConstantValue.get()
|
||||
case mi: StandardConstantMapObjectInspector =>
|
||||
// take the value from the map inspector object, rather than the input data
|
||||
mi.getWritableConstantValue.map { case (k, v) =>
|
||||
|
@ -305,8 +304,7 @@ private[hive] trait HiveInspectors {
|
|||
System.arraycopy(bw.getBytes(), 0, result, 0, bw.getLength())
|
||||
result
|
||||
case x: DateObjectInspector if x.preferWritable() =>
|
||||
DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
|
||||
case x: DateObjectInspector => DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
|
||||
x.getPrimitiveWritableObject(data).get()
|
||||
// org.apache.hadoop.hive.serde2.io.TimestampWritable.set will reset current time object
|
||||
// if next timestamp is null, so Timestamp object is cloned
|
||||
case x: TimestampObjectInspector if x.preferWritable() =>
|
||||
|
@ -345,9 +343,6 @@ private[hive] trait HiveInspectors {
|
|||
case _: JavaHiveDecimalObjectInspector =>
|
||||
(o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toJavaBigDecimal)
|
||||
|
||||
case _: JavaDateObjectInspector =>
|
||||
(o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
|
||||
|
||||
case soi: StandardStructObjectInspector =>
|
||||
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
|
||||
(o: Any) => {
|
||||
|
@ -431,7 +426,7 @@ private[hive] trait HiveInspectors {
|
|||
case _: BinaryObjectInspector if x.preferWritable() => HiveShim.getBinaryWritable(a)
|
||||
case _: BinaryObjectInspector => a.asInstanceOf[Array[Byte]]
|
||||
case _: DateObjectInspector if x.preferWritable() => HiveShim.getDateWritable(a)
|
||||
case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
|
||||
case _: DateObjectInspector => a.asInstanceOf[java.sql.Date]
|
||||
case _: TimestampObjectInspector if x.preferWritable() => HiveShim.getTimestampWritable(a)
|
||||
case _: TimestampObjectInspector => a.asInstanceOf[java.sql.Timestamp]
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.spark.SerializableWritable
|
|||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.types.DateUtils
|
||||
|
||||
/**
|
||||
* A trait for subclasses that handle table scans.
|
||||
|
@ -307,7 +306,7 @@ private[hive] object HadoopTableReader extends HiveInspectors {
|
|||
row.update(ordinal, oi.getPrimitiveJavaObject(value).clone())
|
||||
case oi: DateObjectInspector =>
|
||||
(value: Any, row: MutableRow, ordinal: Int) =>
|
||||
row.update(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
|
||||
row.update(ordinal, oi.getPrimitiveJavaObject(value))
|
||||
case oi: BinaryObjectInspector =>
|
||||
(value: Any, row: MutableRow, ordinal: Int) =>
|
||||
row.update(ordinal, oi.getPrimitiveJavaObject(value))
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
1970-01-01 1970-01-01 1969-12-31 16:00:00 1969-12-31 16:00:00 1970-01-01 00:00:00
|
|
@ -1 +0,0 @@
|
|||
true
|
|
@ -1 +0,0 @@
|
|||
true
|
|
@ -1 +0,0 @@
|
|||
0
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.sql.hive
|
||||
|
||||
import java.util
|
||||
import java.sql.Date
|
||||
import java.util.{Locale, TimeZone}
|
||||
|
||||
import org.apache.hadoop.hive.ql.udf.UDAFPercentile
|
||||
|
@ -75,7 +76,7 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
|
|||
Literal(0.asInstanceOf[Float]) ::
|
||||
Literal(0.asInstanceOf[Double]) ::
|
||||
Literal("0") ::
|
||||
Literal(new java.sql.Date(114, 8, 23)) ::
|
||||
Literal(new Date(2014, 9, 23)) ::
|
||||
Literal(Decimal(BigDecimal(123.123))) ::
|
||||
Literal(new java.sql.Timestamp(123123)) ::
|
||||
Literal(Array[Byte](1,2,3)) ::
|
||||
|
@ -142,6 +143,7 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
|
|||
case (r1: Array[Byte], r2: Array[Byte])
|
||||
if r1 != null && r2 != null && r1.length == r2.length =>
|
||||
r1.zip(r2).map { case (b1, b2) => assert(b1 === b2) }
|
||||
case (r1: Date, r2: Date) => assert(r1.compareTo(r2) === 0)
|
||||
case (r1, r2) => assert(r1 === r2)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -253,30 +253,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|
|||
|
||||
createQueryTest("Cast Timestamp to Timestamp in UDF",
|
||||
"""
|
||||
| SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp))
|
||||
| FROM src LIMIT 1
|
||||
""".stripMargin)
|
||||
|
||||
createQueryTest("Date comparison test 1",
|
||||
"""
|
||||
| SELECT
|
||||
| CAST(CAST('1970-01-01 22:00:00' AS timestamp) AS date) ==
|
||||
| CAST(CAST('1970-01-01 23:00:00' AS timestamp) AS date)
|
||||
| FROM src LIMIT 1
|
||||
""".stripMargin)
|
||||
|
||||
createQueryTest("Date comparison test 2",
|
||||
"SELECT CAST(CAST(0 AS timestamp) AS date) > CAST(0 AS timestamp) FROM src LIMIT 1")
|
||||
|
||||
createQueryTest("Date cast",
|
||||
"""
|
||||
| SELECT
|
||||
| CAST(CAST(0 AS timestamp) AS date),
|
||||
| CAST(CAST(CAST(0 AS timestamp) AS date) AS string),
|
||||
| CAST(0 AS timestamp),
|
||||
| CAST(CAST(0 AS timestamp) AS string),
|
||||
| CAST(CAST(CAST('1970-01-01 23:00:00' AS timestamp) AS date) AS timestamp)
|
||||
| FROM src LIMIT 1
|
||||
| SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp))
|
||||
| FROM src LIMIT 1
|
||||
""".stripMargin)
|
||||
|
||||
createQueryTest("Simple Average",
|
||||
|
|
|
@ -160,7 +160,7 @@ private[hive] object HiveShim {
|
|||
if (value == null) null else new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]])
|
||||
|
||||
def getDateWritable(value: Any): hiveIo.DateWritable =
|
||||
if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
|
||||
if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[java.sql.Date])
|
||||
|
||||
def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
|
||||
if (value == null) {
|
||||
|
|
|
@ -263,7 +263,7 @@ private[hive] object HiveShim {
|
|||
}
|
||||
|
||||
def getDateWritable(value: Any): hiveIo.DateWritable =
|
||||
if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
|
||||
if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[java.sql.Date])
|
||||
|
||||
def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
|
||||
if (value == null) {
|
||||
|
|
Loading…
Reference in a new issue