[SPARK-31710][SQL] Fail casting numeric to timestamp by default

## What changes were proposed in this pull request?
we fail casting from numeric to timestamp by default.

## Why are the changes needed?
casting from numeric to timestamp is not a  non-standard,meanwhile it may generate different result between spark and other systems,for example hive

## Does this PR introduce any user-facing change?
Yes,user cannot cast numeric to timestamp directly,user have to use the following function to achieve the same effect:TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS

## How was this patch tested?
unit test added

Closes #28593 from GuoPhilipse/31710-fix-compatibility.

Lead-authored-by: GuoPhilipse <guofei_ok@126.com>
Co-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
GuoPhilipse 2020-06-16 08:35:35 +00:00 committed by Wenchen Fan
parent fe68e95a5a
commit f0e6d0ec13
30 changed files with 233 additions and 150 deletions

View file

@ -30,6 +30,8 @@ license: |
- In Spark 3.1, `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` will fail if the specified datetime pattern is invalid. In Spark 3.0 or earlier, they result `NULL`.
- In Spark 3.1, casting numeric to timestamp will be forbidden by default. It's strongly recommended to use dedicated functions: TIMESTAMP_SECONDS, TIMESTAMP_MILLIS and TIMESTAMP_MICROS. Or you can set `spark.sql.legacy.allowCastNumericToTimestamp` to true to work around it. See more details in SPARK-31710.
## Upgrading from Spark SQL 2.4 to 3.0
### Dataset/DataFrame APIs

View file

@ -534,7 +534,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
.. note:: Evolving
>>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes')
>>> from pyspark.sql.functions import timestamp_seconds
>>> sdf.select(
... 'name',
... timestamp_seconds(sdf.time).alias('time')).withWatermark('time', '10 minutes')
DataFrame[name: string, time: timestamp]
"""
if not eventTime or type(eventTime) is not str:

View file

@ -1427,6 +1427,19 @@ def to_utc_timestamp(timestamp, tz):
return Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz))
@since(3.1)
def timestamp_seconds(col):
"""
>>> from pyspark.sql.functions import timestamp_seconds
>>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time'])
>>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect()
[Row(ts=datetime.datetime(2008, 12, 25, 7, 30))]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.timestamp_seconds(_to_java_column(col)))
@since(2.0)
@ignore_unicode_prefix
def window(timeColumn, windowDuration, slideDuration=None, startTime=None):

View file

@ -644,7 +644,7 @@ class DataFrameTests(ReusedSQLTestCase):
CAST(col6 AS DOUBLE) AS double,
CAST(col7 AS BOOLEAN) AS boolean,
CAST(col8 AS STRING) AS string,
CAST(col9 AS TIMESTAMP) AS timestamp
timestamp_seconds(col9) AS timestamp
FROM VALUES (1, 1, 1, 1, 1, 1, 1, 1, 1),
(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
"""

View file

@ -59,7 +59,8 @@ object Cast {
case (StringType, TimestampType) => true
case (BooleanType, TimestampType) => true
case (DateType, TimestampType) => true
case (_: NumericType, TimestampType) => true
case (_: NumericType, TimestampType) =>
SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP)
case (StringType, DateType) => true
case (TimestampType, DateType) => true
@ -266,7 +267,15 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(
s"cannot cast ${child.dataType.catalogString} to ${dataType.catalogString}")
if (child.dataType.isInstanceOf[NumericType] && dataType.isInstanceOf[TimestampType]) {
s"cannot cast ${child.dataType.catalogString} to ${dataType.catalogString}," +
"you can enable the casting by setting " +
s"${SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key} to true," +
"but we strongly recommend using function " +
"TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS instead."
} else {
s"cannot cast ${child.dataType.catalogString} to ${dataType.catalogString}"
})
}
}

View file

@ -1549,7 +1549,7 @@ case class ParseToDate(left: Expression, format: Option[Expression], child: Expr
def this(left: Expression, format: Expression) {
this(left, Option(format),
Cast(Cast(UnixTimestamp(left, format), TimestampType), DateType))
Cast(SecondsToTimestamp(UnixTimestamp(left, format)), DateType))
}
def this(left: Expression) = {

View file

@ -2608,6 +2608,15 @@ object SQLConf {
.checkValue(_ > 0, "The timeout value must be positive")
.createWithDefault(10L)
val LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP =
buildConf("spark.sql.legacy.allowCastNumericToTimestamp")
.internal()
.doc("When true, allow casting numeric to timestamp," +
"when false, forbid the cast, more details in SPARK-31710")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
/**
* Holds information about keys that have been deprecated.
*
@ -3196,6 +3205,9 @@ class SQLConf extends Serializable with Logging {
def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)
def legacyAllowCastNumericToTimestamp: Boolean =
getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP)
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */

View file

@ -50,7 +50,9 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
}
protected def checkNullCast(from: DataType, to: DataType): Unit = {
checkEvaluation(cast(Literal.create(null, from), to, UTC_OPT), null)
withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
checkEvaluation(cast(Literal.create(null, from), to, UTC_OPT), null)
}
}
test("null cast") {
@ -239,7 +241,9 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
checkCast(1.5, 1.5f)
checkCast(1.5, "1.5")
checkEvaluation(cast(cast(1.toDouble, TimestampType), DoubleType), 1.toDouble)
withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
checkEvaluation(cast(cast(1.toDouble, TimestampType), DoubleType), 1.toDouble)
}
}
test("cast from string") {
@ -305,17 +309,20 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(cast(cast(cast(
cast(cast("5", ByteType), ShortType), IntegerType), FloatType), DoubleType), LongType),
5.toLong)
checkEvaluation(
cast(cast(cast(cast(cast(cast("5", ByteType), TimestampType),
DecimalType.SYSTEM_DEFAULT), LongType), StringType), ShortType),
5.toShort)
checkEvaluation(
cast(cast(cast(cast(cast(cast("5", TimestampType, UTC_OPT), ByteType),
DecimalType.SYSTEM_DEFAULT), LongType), StringType), ShortType),
null)
checkEvaluation(cast(cast(cast(cast(cast(cast("5", DecimalType.SYSTEM_DEFAULT),
ByteType), TimestampType), LongType), StringType), ShortType),
5.toShort)
withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
checkEvaluation(
cast(cast(cast(cast(cast(cast("5", ByteType), TimestampType),
DecimalType.SYSTEM_DEFAULT), LongType), StringType), ShortType),
5.toShort)
checkEvaluation(
cast(cast(cast(cast(cast(cast("5", TimestampType, UTC_OPT), ByteType),
DecimalType.SYSTEM_DEFAULT), LongType), StringType), ShortType),
null)
checkEvaluation(cast(cast(cast(cast(cast(cast("5", DecimalType.SYSTEM_DEFAULT),
ByteType), TimestampType), LongType), StringType), ShortType),
5.toShort)
}
checkEvaluation(cast("23", DoubleType), 23d)
checkEvaluation(cast("23", IntegerType), 23)
@ -376,29 +383,32 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(ts, LongType), 15.toLong)
checkEvaluation(cast(ts, FloatType), 15.003f)
checkEvaluation(cast(ts, DoubleType), 15.003)
checkEvaluation(cast(cast(tss, ShortType), TimestampType),
DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(cast(cast(tss, LongType), TimestampType),
DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(
cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType),
millis.toFloat / MILLIS_PER_SECOND)
checkEvaluation(
cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), DoubleType),
millis.toDouble / MILLIS_PER_SECOND)
checkEvaluation(
cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
Decimal(1))
// A test for higher precision than millis
checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001)
withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
checkEvaluation(cast(cast(tss, ShortType), TimestampType),
DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(cast(cast(tss, LongType), TimestampType),
DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
checkEvaluation(
cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType),
millis.toFloat / MILLIS_PER_SECOND)
checkEvaluation(
cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), DoubleType),
millis.toDouble / MILLIS_PER_SECOND)
checkEvaluation(
cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
Decimal(1))
checkEvaluation(cast(Double.NaN, TimestampType), null)
checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
checkEvaluation(cast(Float.NaN, TimestampType), null)
checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
// A test for higher precision than millis
checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001)
checkEvaluation(cast(Double.NaN, TimestampType), null)
checkEvaluation(cast(1.0 / 0.0, TimestampType), null)
checkEvaluation(cast(Float.NaN, TimestampType), null)
checkEvaluation(cast(1.0f / 0.0f, TimestampType), null)
}
}
test("cast from array") {
@ -1026,8 +1036,11 @@ class CastSuite extends CastSuiteBase {
test("cast from int 2") {
checkEvaluation(cast(1, LongType), 1.toLong)
checkEvaluation(cast(cast(1000, TimestampType), LongType), 1000.toLong)
checkEvaluation(cast(cast(-1200, TimestampType), LongType), -1200.toLong)
withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") {
checkEvaluation(cast(cast(1000, TimestampType), LongType), 1000.toLong)
checkEvaluation(cast(cast(-1200, TimestampType), LongType), -1200.toLong)
}
checkEvaluation(cast(123, DecimalType.USER_DEFAULT), Decimal(123))
checkEvaluation(cast(123, DecimalType(3, 0)), Decimal(123))
@ -1310,6 +1323,20 @@ class CastSuite extends CastSuiteBase {
checkEvaluation(cast(negativeTs, LongType), expectedSecs)
}
}
test("SPARK-31710:fail casting from numeric to timestamp by default") {
Seq(true, false).foreach { enable =>
withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> enable.toString) {
assert(cast(2.toByte, TimestampType).resolved == enable)
assert(cast(10.toShort, TimestampType).resolved == enable)
assert(cast(3, TimestampType).resolved == enable)
assert(cast(10L, TimestampType).resolved == enable)
assert(cast(Decimal(1.2), TimestampType).resolved == enable)
assert(cast(1.7f, TimestampType).resolved == enable)
assert(cast(2.3d, TimestampType).resolved == enable)
}
}
}
}
/**

View file

@ -3358,6 +3358,15 @@ object functions {
window(timeColumn, windowDuration, windowDuration, "0 second")
}
/**
* Creates timestamp from the number of seconds since UTC epoch.
* @group = datetime_funcs
* @since = 3.1.0
*/
def timestamp_seconds(e: Column): Column = withExpr {
SecondsToTimestamp(e.expr)
}
//////////////////////////////////////////////////////////////////////////////////////////////
// Collection functions
//////////////////////////////////////////////////////////////////////////////////////////////

View file

@ -1,15 +1,15 @@
--This test file was converted from window.sql.
-- Test data.
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"),
(null, null, null, null, null, null),
(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null)
AS testData(val, val_long, val_double, val_date, val_timestamp, cate);
-- RowsBetween

View file

@ -5,15 +5,15 @@
-- Test data.
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"),
(null, null, null, null, null, null),
(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null)
AS testData(val, val_long, val_double, val_date, val_timestamp, cate);
-- RowsBetween

View file

@ -4,15 +4,15 @@
-- !query
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"),
(null, null, null, null, null, null),
(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null)
AS testData(val, val_long, val_double, val_date, val_timestamp, cate)
-- !query schema
struct<>

View file

@ -4,15 +4,15 @@
-- !query
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"),
(null, null, null, null, null, null),
(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null)
AS testData(val, val_long, val_double, val_date, val_timestamp, cate)
-- !query schema
struct<>

View file

@ -639,7 +639,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
val now = sql("select unix_timestamp()").collect().head.getLong(0)
checkAnswer(
sql(s"select cast ($now as timestamp)"),
sql(s"select timestamp_seconds($now)"),
Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now))))
}
}
@ -716,7 +716,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss")
checkAnswer(df.select(to_timestamp(col("ss"))),
df.select(unix_timestamp(col("ss")).cast("timestamp")))
df.select(timestamp_seconds(unix_timestamp(col("ss")))))
checkAnswer(df.select(to_timestamp(col("ss"))), Seq(
Row(ts1), Row(ts2)))
if (legacyParserPolicy == "legacy") {

View file

@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.functions.timestamp_seconds
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.SQLTestData.ArrayData
@ -467,7 +468,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
withTable(table) {
TimeZone.setDefault(srcTimeZone)
spark.range(start, end)
.select('id.cast(TimestampType).cast(t).as(column))
.select(timestamp_seconds($"id").cast(t).as(column))
.write.saveAsTable(table)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS $column")

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
import org.apache.spark.sql.streaming.StreamTest
class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
@ -33,7 +33,7 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
test("SPARK-24156: do not plan a no-data batch again after it has already been planned") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)

View file

@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSparkSession
@ -163,7 +163,7 @@ class ForeachWriterSuite extends StreamTest with SharedSparkSession with BeforeA
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -197,7 +197,7 @@ class ForeachWriterSuite extends StreamTest with SharedSparkSession with BeforeA
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)

View file

@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemorySink
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode._
import org.apache.spark.util.Utils
@ -129,7 +129,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
// No event time metrics when there is no watermarking
val inputData1 = MemoryStream[Int]
val aggWithoutWatermark = inputData1.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
@ -146,7 +146,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
// All event time metrics where watermarking is set
val inputData2 = MemoryStream[Int]
val aggWithWatermark = inputData2.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -169,7 +169,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
// All event time metrics where watermarking is set
val inputData = MemoryStream[Int]
val aggWithWatermark = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -224,7 +224,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
// All event time metrics where watermarking is set
val inputData = MemoryStream[Int]
val aggWithWatermark = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -286,7 +286,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -311,7 +311,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "10")
val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -341,7 +341,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
val input = MemoryStream[Long]
val aggWithWatermark = input.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "2 years 5 months")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -373,7 +373,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
test("recovery") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -408,14 +408,14 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
val first = MemoryStream[Int]
val firstDf = first.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.select('value)
val second = MemoryStream[Int]
val secondDf = second.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "5 seconds")
.select('value)
@ -485,7 +485,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -510,7 +510,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy($"eventTime")
.agg(count("*") as 'count)
@ -549,8 +549,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
test("the new watermark should override the old one") {
val df = MemoryStream[(Long, Long)].toDF()
.withColumn("first", $"_1".cast("timestamp"))
.withColumn("second", $"_2".cast("timestamp"))
.withColumn("first", timestamp_seconds($"_1"))
.withColumn("second", timestamp_seconds($"_2"))
.withWatermark("first", "1 minute")
.withWatermark("second", "2 minutes")
@ -562,7 +562,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
test("EventTime watermark should be ignored in batch query.") {
val df = testData
.withColumn("eventTime", $"key".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"key"))
.withWatermark("eventTime", "1 minute")
.select("eventTime")
.as[Long]
@ -601,7 +601,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
test("SPARK-27340 Alias on TimeWindow expression cause watermark metadata lost") {
val inputData = MemoryStream[Int]
val aliasWindow = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.select(window($"eventTime", "5 seconds") as 'aliasWindow)
// Check the eventTime metadata is kept in the top level alias.
@ -631,7 +631,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") {
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
@ -767,10 +767,10 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
input1: MemoryStream[Int],
input2: MemoryStream[Int]): Dataset[_] = {
val df1 = input1.toDF
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
val df2 = input2.toDF
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "15 seconds")
df1.union(df2).select($"eventTime".cast("int"))
}

View file

@ -210,7 +210,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
val inputData = MemoryStream[Long]
val inputDF = inputData.toDF.toDF("time")
val outputDf = inputDF
.selectExpr("CAST(time AS timestamp) AS timestamp")
.selectExpr("timestamp_seconds(time) AS timestamp")
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "5 seconds"))
.count()

View file

@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, MemoryStateStore, StateStore, StateStoreId, StateStoreMetrics, UnsafeRowPair}
import org.apache.spark.sql.functions.timestamp_seconds
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.{DataType, IntegerType}
@ -826,7 +827,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
val inputData = MemoryStream[(String, Int)]
val result =
inputData.toDS
.select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
.select($"_1".as("key"), timestamp_seconds($"_2").as("eventTime"))
.withWatermark("eventTime", "10 seconds")
.as[(String, Long)]
.groupByKey(_._1)
@ -901,7 +902,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
val inputData = MemoryStream[(String, Int)]
val result =
inputData.toDS
.select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
.select($"_1".as("key"), timestamp_seconds($"_2").as("eventTime"))
.withWatermark("eventTime", "10 seconds")
.as[(String, Long)]
.groupByKey(_._1)
@ -1111,7 +1112,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
val inputData = MemoryStream[(String, Long)]
val result =
inputData.toDF().toDF("key", "time")
.selectExpr("key", "cast(time as timestamp) as timestamp")
.selectExpr("key", "timestamp_seconds(time) as timestamp")
.withWatermark("timestamp", "10 second")
.as[(String, Long)]
.groupByKey(x => x._1)

View file

@ -188,7 +188,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
testWithAllStateVersions("state metrics - append mode") {
val inputData = MemoryStream[Int]
val aggWithWatermark = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)

View file

@ -86,7 +86,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
test("deduplicate with watermark") {
val inputData = MemoryStream[Int]
val result = inputData.toDS()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.dropDuplicates()
.select($"eventTime".cast("long").as[Long])
@ -113,7 +113,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
test("deduplicate with aggregate - append mode") {
val inputData = MemoryStream[Int]
val windowedaggregate = inputData.toDS()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.dropDuplicates()
.withWatermark("eventTime", "10 seconds")
@ -230,7 +230,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
test("SPARK-19841: watermarkPredicate should filter based on keys") {
val input = MemoryStream[(Int, Int)]
val df = input.toDS.toDF("time", "id")
.withColumn("time", $"time".cast("timestamp"))
.withColumn("time", timestamp_seconds($"time"))
.withWatermark("time", "1 second")
.dropDuplicates("id", "time") // Change the column positions
.select($"id")
@ -249,7 +249,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
test("SPARK-21546: dropDuplicates should ignore watermark when it's not a key") {
val input = MemoryStream[(Int, Int)]
val df = input.toDS.toDF("id", "time")
.withColumn("time", $"time".cast("timestamp"))
.withColumn("time", timestamp_seconds($"time"))
.withWatermark("time", "1 second")
.dropDuplicates("id")
.select($"id", $"time".cast("long"))
@ -265,7 +265,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") {
val inputData = MemoryStream[Int]
val result = inputData.toDS()
.withColumn("eventTime", $"value".cast("timestamp"))
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", "10 seconds")
.dropDuplicates()
.select($"eventTime".cast("long").as[Long])

View file

@ -87,11 +87,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
val input2 = MemoryStream[Int]
val df1 = input1.toDF
.select('value as "key", 'value.cast("timestamp") as "timestamp", ('value * 2) as "leftValue")
.select('value as "key", timestamp_seconds($"value") as "timestamp",
('value * 2) as "leftValue")
.select('key, window('timestamp, "10 second"), 'leftValue)
val df2 = input2.toDF
.select('value as "key", 'value.cast("timestamp") as "timestamp",
.select('value as "key", timestamp_seconds($"value") as "timestamp",
('value * 3) as "rightValue")
.select('key, window('timestamp, "10 second"), 'rightValue)
@ -127,12 +128,13 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
val input2 = MemoryStream[Int]
val df1 = input1.toDF
.select('value as "key", 'value.cast("timestamp") as "timestamp", ('value * 2) as "leftValue")
.select('value as "key", timestamp_seconds($"value") as "timestamp",
('value * 2) as "leftValue")
.withWatermark("timestamp", "10 seconds")
.select('key, window('timestamp, "10 second"), 'leftValue)
val df2 = input2.toDF
.select('value as "key", 'value.cast("timestamp") as "timestamp",
.select('value as "key", timestamp_seconds($"value") as "timestamp",
('value * 3) as "rightValue")
.select('key, window('timestamp, "10 second"), 'rightValue)
@ -177,11 +179,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
val rightInput = MemoryStream[(Int, Int)]
val df1 = leftInput.toDF.toDF("leftKey", "time")
.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue")
.select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey * 2) as "leftValue")
.withWatermark("leftTime", "10 seconds")
val df2 = rightInput.toDF.toDF("rightKey", "time")
.select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue")
.select('rightKey, timestamp_seconds($"time") as "rightTime",
('rightKey * 3) as "rightValue")
.withWatermark("rightTime", "10 seconds")
val joined =
@ -235,11 +238,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
val rightInput = MemoryStream[(Int, Int)]
val df1 = leftInput.toDF.toDF("leftKey", "time")
.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue")
.select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey * 2) as "leftValue")
.withWatermark("leftTime", "20 seconds")
val df2 = rightInput.toDF.toDF("rightKey", "time")
.select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue")
.select('rightKey, timestamp_seconds($"time") as "rightTime",
('rightKey * 3) as "rightValue")
.withWatermark("rightTime", "30 seconds")
val condition = expr(
@ -425,7 +429,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
test("SPARK-26187 restore the stream-stream inner join query from Spark 2.4") {
val inputStream = MemoryStream[(Int, Long)]
val df = inputStream.toDS()
.select(col("_1").as("value"), col("_2").cast("timestamp").as("timestamp"))
.select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"))
val leftStream = df.select(col("value").as("leftId"), col("timestamp").as("leftTime"))
@ -500,7 +504,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
val df = input.toDF
.select(
'value as "key",
'value.cast("timestamp") as s"${prefix}Time",
timestamp_seconds($"value") as s"${prefix}Time",
('value * multiplier) as s"${prefix}Value")
.withWatermark(s"${prefix}Time", "10 seconds")
@ -682,11 +686,12 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
val rightInput = MemoryStream[(Int, Int)]
val df1 = leftInput.toDF.toDF("leftKey", "time")
.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue")
.select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey * 2) as "leftValue")
.withWatermark("leftTime", "10 seconds")
val df2 = rightInput.toDF.toDF("rightKey", "time")
.select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue")
.select('rightKey, timestamp_seconds($"time") as "rightTime",
('rightKey * 3) as "rightValue")
.withWatermark("rightTime", "10 seconds")
val joined =
@ -777,7 +782,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
val inputStream = MemoryStream[(Int, Long)]
val df = inputStream.toDS()
.select(col("_1").as("value"), col("_2").cast("timestamp").as("timestamp"))
.select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"))
val leftStream = df.select(col("value").as("leftId"), col("timestamp").as("leftTime"))
@ -840,7 +845,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
val inputStream = MemoryStream[(Int, Long)]
val df = inputStream.toDS()
.select(col("_1").as("value"), col("_2").cast("timestamp").as("timestamp"))
.select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"))
// we're just flipping "left" and "right" from left outer join and apply right outer join
@ -883,7 +888,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
test("SPARK-26187 restore the stream-stream outer join query from Spark 2.4") {
val inputStream = MemoryStream[(Int, Long)]
val df = inputStream.toDS()
.select(col("_1").as("value"), col("_2").cast("timestamp").as("timestamp"))
.select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"))
val leftStream = df.select(col("value").as("leftId"), col("timestamp").as("leftTime"))

View file

@ -39,6 +39,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
private val originalLegacyAllowCastNumericToTimestamp =
TestHive.conf.legacyAllowCastNumericToTimestamp
def testCases: Seq[(String, File)] = {
hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
@ -58,6 +60,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// Fix session local timezone to America/Los_Angeles for those timezone sensitive tests
// (timestamp_*)
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles")
// Ensures that cast numeric to timestamp enabled so that we can test them
TestHive.setConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP, true)
RuleExecutor.resetMetrics()
}
@ -68,6 +72,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled)
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone)
TestHive.setConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP,
originalLegacyAllowCastNumericToTimestamp)
// For debugging dump some statistics about how much time was spent in various optimizer rules
logWarning(RuleExecutor.dumpTimeSpent())

View file

@ -201,14 +201,17 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
|IF(TRUE, CAST(NULL AS BINARY), CAST("1" AS BINARY)) AS COL18,
|IF(FALSE, CAST(NULL AS DATE), CAST("1970-01-01" AS DATE)) AS COL19,
|IF(TRUE, CAST(NULL AS DATE), CAST("1970-01-01" AS DATE)) AS COL20,
|IF(TRUE, CAST(NULL AS TIMESTAMP), CAST(1 AS TIMESTAMP)) AS COL21,
|IF(TRUE, CAST(NULL AS TIMESTAMP), CAST('1969-12-31 16:00:01' AS TIMESTAMP)) AS COL21,
|IF(FALSE, CAST(NULL AS DECIMAL), CAST(1 AS DECIMAL)) AS COL22,
|IF(TRUE, CAST(NULL AS DECIMAL), CAST(1 AS DECIMAL)) AS COL23
|FROM src LIMIT 1""".stripMargin)
test("constant null testing timestamp") {
val r1 = sql("SELECT IF(FALSE, CAST(NULL AS TIMESTAMP), CAST(1 AS TIMESTAMP)) AS COL20")
.collect().head
var r1 = sql(
"""
|SELECT IF(FALSE, CAST(NULL AS TIMESTAMP),
|CAST('1969-12-31 16:00:01' AS TIMESTAMP)) AS COL20
""".stripMargin).collect().head
assert(new Timestamp(1000) == r1.getTimestamp(0))
}
@ -552,28 +555,22 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
// Jdk version leads to different query output for double, so not use createQueryTest here
test("timestamp cast #1") {
val res = sql("SELECT CAST(CAST(1 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1").collect().head
val res = sql("SELECT CAST(TIMESTAMP_SECONDS(1) AS DOUBLE) FROM src LIMIT 1").collect().head
assert(1 == res.getDouble(0))
}
test("timestamp cast #2") {
val res = sql("SELECT CAST(CAST(-1 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1").collect().head
val res = sql("SELECT CAST(TIMESTAMP_SECONDS(-1) AS DOUBLE) FROM src LIMIT 1").collect().head
assert(-1 == res.get(0))
}
createQueryTest("timestamp cast #3",
"SELECT CAST(CAST(1.2 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1")
createQueryTest("timestamp cast #4",
"SELECT CAST(CAST(-1.2 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1")
test("timestamp cast #5") {
val res = sql("SELECT CAST(CAST(1200 AS TIMESTAMP) AS INT) FROM src LIMIT 1").collect().head
test("timestamp cast #3") {
val res = sql("SELECT CAST(TIMESTAMP_SECONDS(1200) AS INT) FROM src LIMIT 1").collect().head
assert(1200 == res.getInt(0))
}
test("timestamp cast #6") {
val res = sql("SELECT CAST(CAST(-1200 AS TIMESTAMP) AS INT) FROM src LIMIT 1").collect().head
test("timestamp cast #4") {
val res = sql("SELECT CAST(TIMESTAMP_SECONDS(-1200) AS INT) FROM src LIMIT 1").collect().head
assert(-1200 == res.getInt(0))
}

View file

@ -434,8 +434,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
withTempView("tab1") {
Seq(Tuple1(1451400761)).toDF("test_date").createOrReplaceTempView("tab1")
sql(s"CREATE TEMPORARY FUNCTION testUDFToDate AS '${classOf[GenericUDFToDate].getName}'")
val count = sql("select testUDFToDate(cast(test_date as timestamp))" +
" from tab1 group by testUDFToDate(cast(test_date as timestamp))").count()
val count = sql("select testUDFToDate(timestamp_seconds(test_date))" +
" from tab1 group by testUDFToDate(timestamp_seconds(test_date))").count()
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToDate")
assert(count == 1)
}

View file

@ -1172,7 +1172,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
test("SPARK-6785: HiveQuerySuite - Date comparison test 2") {
checkAnswer(
sql("SELECT CAST(CAST(0 AS timestamp) AS date) > CAST(0 AS timestamp) FROM src LIMIT 1"),
sql("SELECT CAST(timestamp_seconds(0) AS date) > timestamp_seconds(0) FROM src LIMIT 1"),
Row(false))
}
@ -1182,10 +1182,10 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
sql(
"""
| SELECT
| CAST(CAST(0 AS timestamp) AS date),
| CAST(CAST(CAST(0 AS timestamp) AS date) AS string),
| CAST(0 AS timestamp),
| CAST(CAST(0 AS timestamp) AS string),
| CAST(timestamp_seconds(0) AS date),
| CAST(CAST(timestamp_seconds(0) AS date) AS string),
| timestamp_seconds(0),
| CAST(timestamp_seconds(0) AS string),
| CAST(CAST(CAST('1970-01-01 23:00:00' AS timestamp) AS date) AS timestamp)
| FROM src LIMIT 1
""".stripMargin),