[SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals

### What changes were proposed in this pull request?

This PR extends the way to represent `delayThreshold` with ANSI interval literals for watermark.

### Why are the changes needed?

A `delayThreshold` is semantically an interval value so it's should be represented as ANSI interval literals as well as the conventional `1 second` form.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33456 from sarutak/delayThreshold-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Kousuke Saruta 2021-07-22 17:36:22 +03:00 committed by Max Gekk
parent 3a1db2ddd4
commit 07fa38e2c1
3 changed files with 71 additions and 8 deletions

View file

@ -2265,7 +2265,7 @@ private[spark] object QueryCompilationErrors {
s"""Cannot resolve column name "$colName" among (${fieldsStr})${extraMsg}""")
}
def cannotParseTimeDelayError(delayThreshold: String, e: IllegalArgumentException): Throwable = {
def cannotParseTimeDelayError(delayThreshold: String, e: Throwable): Throwable = {
new AnalysisException(s"Unable to parse time delay '$delayThreshold'", cause = Some(e))
}

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream}
import java.util.Locale
import scala.annotation.varargs
import scala.collection.JavaConverters._
@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException, ParserUtils}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
@ -64,7 +65,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils
private[sql] object Dataset {
@ -740,11 +741,19 @@ class Dataset[T] private[sql](
// We only accept an existing column name, not a derived column here as a watermark that is
// defined on a derived column cannot referenced elsewhere in the plan.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
val parsedDelay =
try {
val parsedDelay = try {
if (delayThreshold.toLowerCase(Locale.ROOT).trim.startsWith("interval")) {
CatalystSqlParser.parseExpression(delayThreshold) match {
case Literal(months: Int, _: YearMonthIntervalType) =>
new CalendarInterval(months, 0, 0)
case Literal(micros: Long, _: DayTimeIntervalType) =>
new CalendarInterval(0, 0, micros)
}
} else {
IntervalUtils.stringToInterval(UTF8String.fromString(delayThreshold))
}
} catch {
case e: IllegalArgumentException =>
case NonFatal(e) =>
throw QueryCompilationErrors.cannotParseTimeDelayError(delayThreshold, e)
}
require(!IntervalUtils.isNegative(parsedDelay),

View file

@ -31,6 +31,7 @@ import org.scalatest.matchers.should.Matchers._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemorySink
@ -765,6 +766,59 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
}
}
test("SPARK-35815: Support ANSI intervals for delay threshold") {
val DAYS_PER_MONTH = 31
Seq(
// Conventional form and some variants
(Seq("3 days", "Interval 3 day", "inTerval '3' day"), 3 * MILLIS_PER_DAY),
(Seq(" 5 hours", "INTERVAL 5 hour", "interval '5' hour"), 5 * MILLIS_PER_HOUR),
(Seq("\t8 minutes", "interval 8 minute", "interval '8' minute"), 8 * MILLIS_PER_MINUTE),
(Seq("10 seconds", "interval 10 second", "interval '10' second"), 10 * MILLIS_PER_SECOND),
(Seq("1 years", "interval 1 year", "interval '1' year"),
MONTHS_PER_YEAR * DAYS_PER_MONTH * MILLIS_PER_DAY),
(Seq("1 months", "interval 1 month", "interval '1' month"), DAYS_PER_MONTH * MILLIS_PER_DAY),
(Seq(
"1 day 2 hours 3 minutes 4 seconds",
" interval 1 day 2 hours 3 minutes 4 seconds",
"\tinterval '1' day '2' hours '3' minutes '4' seconds",
"interval '1 2:3:4' day to second"),
MILLIS_PER_DAY + 2 * MILLIS_PER_HOUR + 3 * MILLIS_PER_MINUTE + 4 * MILLIS_PER_SECOND),
(Seq(
" 1 year 2 months",
"interval 1 year 2 month",
"interval '1' year '2' month",
"\tinterval '1-2' year to month"),
(MONTHS_PER_YEAR * DAYS_PER_MONTH + 2 * DAYS_PER_MONTH) * MILLIS_PER_DAY)
).foreach { case (delayThresholdVariants, expectedMs) =>
delayThresholdVariants.foreach { case delayThreshold =>
val df = MemoryStream[Int].toDF
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", delayThreshold)
val eventTimeAttr = df.queryExecution.analyzed.output.find(a => a.name == "eventTime")
assert(eventTimeAttr.isDefined)
val metadata = eventTimeAttr.get.metadata
assert(metadata.contains(EventTimeWatermark.delayKey))
assert(metadata.getLong(EventTimeWatermark.delayKey) === expectedMs)
}
}
// Invalid interval patterns
Seq(
"1 foo",
"interva 2 day",
"intrval '3' day",
"interval 4 foo",
"interval '5' foo",
"interval '1 2:3:4' day to hour",
"interval '1 2' year to month").foreach { delayThreshold =>
intercept[AnalysisException] {
val df = MemoryStream[Int].toDF
.withColumn("eventTime", timestamp_seconds($"value"))
.withWatermark("eventTime", delayThreshold)
}
}
}
private def dfWithMultipleWatermarks(
input1: MemoryStream[Int],
input2: MemoryStream[Int]): Dataset[_] = {