[SPARK-36037][SQL] Support ANSI SQL LOCALTIMESTAMP datetime value function

### What changes were proposed in this pull request?
`LOCALTIMESTAMP()` is a datetime value function from ANSI SQL.
The syntax show below:
```
<datetime value function> ::=
    <current date value function>
  | <current time value function>
  | <current timestamp value function>
  | <current local time value function>
  | <current local timestamp value function>
<current date value function> ::=
CURRENT_DATE
<current time value function> ::=
CURRENT_TIME [ <left paren> <time precision> <right paren> ]
<current local time value function> ::=
LOCALTIME [ <left paren> <time precision> <right paren> ]
<current timestamp value function> ::=
CURRENT_TIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
<current local timestamp value function> ::=
LOCALTIMESTAMP [ <left paren> <timestamp precision> <right paren> ]
```

`LOCALTIMESTAMP()` returns the current timestamp at the start of query evaluation as TIMESTAMP WITH OUT TIME ZONE. This is similar to `CURRENT_TIMESTAMP()`.
Note we need to update the optimization rule `ComputeCurrentTime` so that Spark returns the same result in a single query if the function is called multiple times.

### Why are the changes needed?
`CURRENT_TIMESTAMP()` returns the current timestamp at the start of query evaluation.
`LOCALTIMESTAMP()` returns the current timestamp without time zone at the start of query evaluation.
The `LOCALTIMESTAMP` function is an ANSI SQL.
The `LOCALTIMESTAMP` function is very useful.

### Does this PR introduce _any_ user-facing change?
'Yes'. Support new function `LOCALTIMESTAMP()`.

### How was this patch tested?
New tests.

Closes #33258 from beliefer/SPARK-36037.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
gengjiaan 2021-07-14 15:38:46 +08:00 committed by Wenchen Fan
parent fd06cc211d
commit b4f7758944
17 changed files with 191 additions and 60 deletions

View file

@ -519,6 +519,7 @@ object FunctionRegistry {
expression[CurrentDate]("current_date"),
expression[CurrentTimestamp]("current_timestamp"),
expression[CurrentTimeZone]("current_timezone"),
expression[LocalTimestamp]("localtimestamp"),
expression[DateDiff]("datediff"),
expression[DateAdd]("date_add"),
expression[DateFormatClass]("date_format"),

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestamp, GroupingSets, MonotonicallyIncreasingID, Now}
import org.apache.spark.sql.catalyst.expressions.{Attribute, CurrentDate, CurrentTimestampLike, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@ -417,7 +417,7 @@ object UnsupportedOperationChecker extends Logging {
subPlan.expressions.foreach { e =>
if (e.collectLeaves().exists {
case (_: CurrentTimestamp | _: Now | _: CurrentDate) => true
case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) => true
case _ => false
}) {
throwError(s"Continuous processing does not support current time operations.")

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import java.text.ParseException
import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId}
import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId, ZoneOffset}
import java.time.format.DateTimeParseException
import java.util.Locale
@ -200,6 +200,44 @@ case class Now() extends CurrentTimestampLike {
override def prettyName: String = "now"
}
/**
* Returns the current timestamp without time zone at the start of query evaluation.
* There is no code generation since this expression should get constant folded by the optimizer.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
_FUNC_() - Returns the current timestamp without time zone at the start of query evaluation. All calls of localtimestamp within the same query return the same value.
_FUNC_ - Returns the current local date-time at the session time zone at the start of query evaluation.
""",
examples = """
Examples:
> SELECT _FUNC_();
2020-04-25 15:49:11.914
""",
group = "datetime_funcs",
since = "3.2.0")
case class LocalTimestamp(timeZoneId: Option[String] = None) extends LeafExpression
with TimeZoneAwareExpression with CodegenFallback {
def this() = this(None)
override def foldable: Boolean = true
override def nullable: Boolean = false
override def dataType: DataType = TimestampNTZType
final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE)
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
override def eval(input: InternalRow): Any = localDateTimeToMicros(LocalDateTime.now(zoneId))
override def prettyName: String = "localtimestamp"
}
/**
* Expression representing the current batch time, which is used by StreamExecution to
* 1. prevent optimizer from pushing this expression below a stateful operator
@ -236,6 +274,8 @@ case class CurrentBatchTimestamp(
val timestampUs = millisToMicros(timestampMs)
dataType match {
case _: TimestampType => Literal(timestampUs, TimestampType)
case _: TimestampNTZType =>
Literal(convertTz(timestampUs, ZoneOffset.UTC, zoneId), TimestampNTZType)
case _: DateType => Literal(microsToDays(timestampUs, zoneId), DateType)
}
}

View file

@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@ -81,16 +80,19 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
val currentTime = Literal.create(timestamp, timeExpr.dataType)
val timezone = Literal.create(conf.sessionLocalTimeZone, StringType)
val localTimestamps = mutable.Map.empty[String, Literal]
plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
case currentDate @ CurrentDate(Some(timeZoneId)) =>
currentDates.getOrElseUpdate(timeZoneId, {
Literal.create(
DateTimeUtils.microsToDays(timestamp, currentDate.zoneId),
DateType)
Literal.create(currentDate.eval().asInstanceOf[Int], DateType)
})
case CurrentTimestamp() | Now() => currentTime
case CurrentTimeZone() => timezone
case localTimestamp @ LocalTimestamp(Some(timeZoneId)) =>
localTimestamps.getOrElseUpdate(timeZoneId, {
Literal.create(localTimestamp.eval().asInstanceOf[Long], TimestampNTZType)
})
}
}
}

View file

@ -96,6 +96,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
assert(math.abs(t1 - ct.getTime) < 5000)
}
test("datetime function localtimestamp") {
outstandingTimezonesIds.foreach { zid =>
val ct = LocalTimestamp(Some(zid)).eval(EmptyRow).asInstanceOf[Long]
val t1 = DateTimeUtils.localDateTimeToMicros(
LocalDateTime.now(DateTimeUtils.getZoneId(zid)))
assert(math.abs(t1 - ct) < 5000)
}
}
test("DayOfYear") {
val sdfDay = new SimpleDateFormat("D", Locale.US)

View file

@ -17,10 +17,10 @@
package org.apache.spark.sql.catalyst.optimizer
import java.time.ZoneId
import java.time.{LocalDateTime, ZoneId}
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Literal, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@ -81,4 +81,25 @@ class ComputeCurrentTimeSuite extends PlanTest {
assert(lits.size == 1)
assert(lits.head == SQLConf.get.sessionLocalTimeZone)
}
test("analyzer should replace localtimestamp with literals") {
val in = Project(Seq(Alias(LocalTimestamp(), "a")(), Alias(LocalTimestamp(), "b")()),
LocalRelation())
val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
val min = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId))
val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
val max = DateTimeUtils.localDateTimeToMicros(LocalDateTime.now(zoneId))
val lits = new scala.collection.mutable.ArrayBuffer[Long]
plan.transformAllExpressions { case e: Literal =>
lits += e.value.asInstanceOf[Long]
e
}
assert(lits.size == 2)
assert(lits(0) >= min && lits(0) <= max)
assert(lits(1) >= min && lits(1) <= max)
assert(lits(0) == lits(1))
}
}

View file

@ -21,7 +21,7 @@ import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
@ -558,6 +558,9 @@ class MicroBatchExecution(
// dummy string to prevent UnresolvedException and to prevent to be used in the future.
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType, Some("Dummy TimeZoneId"))
case lt: LocalTimestamp =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
lt.dataType, lt.timeZoneId)
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)

View file

@ -26,7 +26,7 @@ import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
@ -172,9 +172,9 @@ class ContinuousExecution(
}
withNewSources.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
case (_: CurrentTimestamp | _: CurrentDate) =>
throw new IllegalStateException(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
case (_: CurrentTimestampLike | _: CurrentDate | _: LocalTimestamp) =>
throw new IllegalStateException("CurrentTimestamp, Now, CurrentDate and LocalTimestamp" +
" not yet supported for continuous processing")
}
reportTimeTaken("queryPlanning") {

View file

@ -2974,6 +2974,16 @@ object functions {
*/
def current_timestamp(): Column = withExpr { CurrentTimestamp() }
/**
* Returns the current timestamp without time zone at the start of query evaluation
* as a timestamp without time zone column.
* All calls of localtimestamp within the same query return the same value.
*
* @group datetime_funcs
* @since 3.2.0
*/
def localtimestamp(): Column = withExpr { LocalTimestamp() }
/**
* Converts a date/timestamp/string to a value of string in the format specified by the date
* format given by the second argument.

View file

@ -1,6 +1,6 @@
<!-- Automatically generated by ExpressionsSchemaSuite -->
## Summary
- Number of queries: 359
- Number of queries: 360
- Number of expressions that missing example: 13
- Expressions missing examples: bigint,binary,boolean,date,decimal,double,float,int,smallint,string,timestamp,tinyint,window
## Schema of Built-in Functions
@ -162,6 +162,7 @@
| org.apache.spark.sql.catalyst.expressions.LessThanOrEqual | <= | SELECT 2 <= 2 | struct<(2 <= 2):boolean> |
| org.apache.spark.sql.catalyst.expressions.Levenshtein | levenshtein | SELECT levenshtein('kitten', 'sitting') | struct<levenshtein(kitten, sitting):int> |
| org.apache.spark.sql.catalyst.expressions.Like | like | SELECT like('Spark', '_park') | struct<Spark LIKE _park:boolean> |
| org.apache.spark.sql.catalyst.expressions.LocalTimestamp | localtimestamp | SELECT localtimestamp() | struct<localtimestamp():timestamp_ntz> |
| org.apache.spark.sql.catalyst.expressions.Log | ln | SELECT ln(1) | struct<ln(1):double> |
| org.apache.spark.sql.catalyst.expressions.Log10 | log10 | SELECT log10(10) | struct<LOG10(10):double> |
| org.apache.spark.sql.catalyst.expressions.Log1p | log1p | SELECT log1p(0) | struct<LOG1P(0):double> |

View file

@ -24,6 +24,7 @@ select DATE_FROM_UNIX_DATE(0), DATE_FROM_UNIX_DATE(1000), DATE_FROM_UNIX_DATE(nu
select UNIX_DATE(DATE('1970-01-01')), UNIX_DATE(DATE('2020-12-04')), UNIX_DATE(null);
-- [SPARK-16836] current_date and current_timestamp literals
select current_date = current_date(), current_timestamp = current_timestamp();
select localtimestamp() = localtimestamp();
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd');

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 205
-- Number of queries: 206
-- !query
@ -141,6 +141,14 @@ select current_date = current_date(), current_timestamp = current_timestamp()
----------------------^^^
-- !query
select localtimestamp() = localtimestamp()
-- !query schema
struct<(localtimestamp() = localtimestamp()):boolean>
-- !query output
true
-- !query
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
-- !query schema

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 205
-- Number of queries: 206
-- !query
@ -135,6 +135,14 @@ struct<(current_date() = current_date()):boolean,(current_timestamp() = current_
true true
-- !query
select localtimestamp() = localtimestamp()
-- !query schema
struct<(localtimestamp() = localtimestamp()):boolean>
-- !query output
true
-- !query
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
-- !query schema

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 205
-- Number of queries: 206
-- !query
@ -135,6 +135,14 @@ struct<(current_date() = current_date()):boolean,(current_timestamp() = current_
true true
-- !query
select localtimestamp() = localtimestamp()
-- !query schema
struct<(localtimestamp() = localtimestamp()):boolean>
-- !query output
true
-- !query
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
-- !query schema

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 205
-- Number of queries: 206
-- !query
@ -135,6 +135,14 @@ struct<(current_date() = current_date()):boolean,(current_timestamp() = current_
true true
-- !query
select localtimestamp() = localtimestamp()
-- !query schema
struct<(localtimestamp() = localtimestamp()):boolean>
-- !query output
true
-- !query
select to_date(null), to_date('2016-12-31'), to_date('2016-12-31', 'yyyy-MM-dd')
-- !query schema

View file

@ -175,6 +175,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
"org.apache.spark.sql.catalyst.expressions.CurrentTimestamp",
"org.apache.spark.sql.catalyst.expressions.CurrentTimeZone",
"org.apache.spark.sql.catalyst.expressions.Now",
"org.apache.spark.sql.catalyst.expressions.LocalTimestamp",
// Random output without a seed
"org.apache.spark.sql.catalyst.expressions.Rand",
"org.apache.spark.sql.catalyst.expressions.Randn",

View file

@ -40,7 +40,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode._
import org.apache.spark.sql.streaming.util.{MockSourceProvider, StreamManualClock}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils
@ -406,56 +406,66 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
)
}
testWithAllStateVersions("prune results by current_time, complete mode") {
testWithAllStateVersions("prune results by current_time or localtimestamp, complete mode") {
import testImplicits._
val clock = new StreamManualClock
val inputData = MemoryStream[Long]
val aggregated =
inputData.toDF()
val inputDataOne = MemoryStream[Long]
val aggregatedOne =
inputDataOne.toDF()
.groupBy($"value")
.agg(count("*"))
.where('value >= current_timestamp().cast("long") - 10L)
val inputDataTwo = MemoryStream[Long]
val aggregatedTwo =
inputDataTwo.toDF()
.groupBy($"value")
.agg(count("*"))
.where('value >= localtimestamp().cast(TimestampType).cast("long") - 10L)
testStream(aggregated, Complete)(
StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
Seq((inputDataOne, aggregatedOne), (inputDataTwo, aggregatedTwo)).foreach { x =>
val inputData = x._1
val aggregated = x._2
val clock = new StreamManualClock
testStream(aggregated, Complete)(
StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
// advance clock to 10 seconds, all keys retained
AddData(inputData, 0L, 5L, 5L, 10L),
AdvanceManualClock(10 * 1000),
CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
// advance clock to 10 seconds, all keys retained
AddData(inputData, 0L, 5L, 5L, 10L),
AdvanceManualClock(10 * 1000),
CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
// advance clock to 20 seconds, should retain keys >= 10
AddData(inputData, 15L, 15L, 20L),
AdvanceManualClock(10 * 1000),
CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
// advance clock to 20 seconds, should retain keys >= 10
AddData(inputData, 15L, 15L, 20L),
AdvanceManualClock(10 * 1000),
CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
// advance clock to 30 seconds, should retain keys >= 20
AddData(inputData, 0L, 85L),
AdvanceManualClock(10 * 1000),
CheckLastBatch((20L, 1), (85L, 1)),
// advance clock to 30 seconds, should retain keys >= 20
AddData(inputData, 0L, 85L),
AdvanceManualClock(10 * 1000),
CheckLastBatch((20L, 1), (85L, 1)),
// bounce stream and ensure correct batch timestamp is used
// i.e., we don't take it from the clock, which is at 90 seconds.
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
q.commitLog.purge(3)
// advance by a minute i.e., 90 seconds total
clock.advance(60 * 1000L)
true
},
StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
// The commit log blown, causing the last batch to re-run
CheckLastBatch((20L, 1), (85L, 1)),
AssertOnQuery { q =>
clock.getTimeMillis() == 90000L
},
// bounce stream and ensure correct batch timestamp is used
// i.e., we don't take it from the clock, which is at 90 seconds.
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
q.commitLog.purge(3)
// advance by a minute i.e., 90 seconds total
clock.advance(60 * 1000L)
true
},
StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
// The commit log blown, causing the last batch to re-run
CheckLastBatch((20L, 1), (85L, 1)),
AssertOnQuery { q =>
clock.getTimeMillis() == 90000L
},
// advance clock to 100 seconds, should retain keys >= 90
AddData(inputData, 85L, 90L, 100L, 105L),
AdvanceManualClock(10 * 1000),
CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
)
// advance clock to 100 seconds, should retain keys >= 90
AddData(inputData, 85L, 90L, 100L, 105L),
AdvanceManualClock(10 * 1000),
CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
)
}
}
testWithAllStateVersions("prune results by current_date, complete mode") {