[SPARK-27735][SS] Parsing interval string should be case-insensitive in SS

## What changes were proposed in this pull request?

Some APIs in Structured Streaming requires the user to specify an interval. Right now these APIs don't accept upper-case strings.

This PR adds a new method `fromCaseInsensitiveString` to `CalendarInterval` to support paring upper-case strings, and fixes all APIs that need to parse an interval string.

## How was this patch tested?

The new unit test.

Closes #24619 from zsxwing/SPARK-27735.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Shixiong Zhu 2019-05-16 13:58:27 -07:00 committed by Dongjoon Hyun
parent c6a45e6f67
commit 6a317c8f01
7 changed files with 62 additions and 62 deletions

View file

@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;
import java.io.Serializable;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -66,6 +67,10 @@ public final class CalendarInterval implements Serializable {
}
}
/**
* Convert a string to CalendarInterval. Return null if the input string is not a valid interval.
* This method is case-sensitive and all characters in the input string should be in lower case.
*/
public static CalendarInterval fromString(String s) {
if (s == null) {
return null;
@ -87,6 +92,26 @@ public final class CalendarInterval implements Serializable {
}
}
/**
* Convert a string to CalendarInterval. Unlike fromString, this method is case-insensitive and
* will throw IllegalArgumentException when the input string is not a valid interval.
*
* @throws IllegalArgumentException if the string is not a valid internal.
*/
public static CalendarInterval fromCaseInsensitiveString(String s) {
if (s == null || s.trim().isEmpty()) {
throw new IllegalArgumentException("Interval cannot be null or blank.");
}
String sInLowerCase = s.trim().toLowerCase(Locale.ROOT);
String interval =
sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase;
CalendarInterval cal = fromString(interval);
if (cal == null) {
throw new IllegalArgumentException("Invalid interval: " + s);
}
return cal;
}
public static long toLongWithRange(String fieldName,
String s, long minValue, long maxValue) throws IllegalArgumentException {
long result = 0;

View file

@ -104,6 +104,31 @@ public class CalendarIntervalSuite {
assertNull(fromString(input));
}
@Test
public void fromCaseInsensitiveStringTest() {
for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) {
assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000));
}
for (String input : new String[]{null, "", " "}) {
try {
fromCaseInsensitiveString(input);
fail("Expected to throw an exception for the invalid input");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("cannot be null or blank"));
}
}
for (String input : new String[]{"interval", "interval1 day", "foo", "foo 1 day"}) {
try {
fromCaseInsensitiveString(input);
fail("Expected to throw an exception for the invalid input");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Invalid interval"));
}
}
}
@Test
public void fromYearMonthStringTest() {
String input;

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
@ -104,20 +102,7 @@ object TimeWindow {
* precision.
*/
private def getIntervalInMicroSeconds(interval: String): Long = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"The window duration, slide duration and start time cannot be null or blank.")
}
val intervalString = if (interval.startsWith("interval")) {
interval
} else {
"interval " + interval
}
val cal = CalendarInterval.fromString(intervalString)
if (cal == null) {
throw new IllegalArgumentException(
s"The provided interval ($interval) did not correspond to a valid interval string.")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")

View file

@ -695,8 +695,14 @@ class Dataset[T] private[sql](
// defined on a derived column cannot referenced elsewhere in the plan.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
try {
CalendarInterval.fromCaseInsensitiveString(delayThreshold)
} catch {
case e: IllegalArgumentException =>
throw new AnalysisException(
s"Unable to parse time delay '$delayThreshold'",
cause = Some(e))
}
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(

View file

@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.streaming
import java.sql.Date
import java.util.concurrent.TimeUnit
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
import org.apache.spark.sql.execution.streaming.GroupStateImpl._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
@ -161,20 +159,7 @@ private[sql] class GroupStateImpl[S] private(
def getTimeoutTimestamp: Long = timeoutTimestamp
private def parseDuration(duration: String): Long = {
if (StringUtils.isBlank(duration)) {
throw new IllegalArgumentException(
"Provided duration is null or blank.")
}
val intervalString = if (duration.startsWith("interval")) {
duration
} else {
"interval " + duration
}
val cal = CalendarInterval.fromString(intervalString)
if (cal == null) {
throw new IllegalArgumentException(
s"Provided duration ($duration) is not valid.")
}
val cal = CalendarInterval.fromCaseInsensitiveString(duration)
if (cal.milliseconds < 0 || cal.months < 0) {
throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
}

View file

@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import org.apache.commons.lang3.StringUtils
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval
@ -38,18 +36,7 @@ case class ContinuousTrigger(intervalMs: Long) extends Trigger {
private[sql] object ContinuousTrigger {
def apply(interval: String): ContinuousTrigger = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"interval cannot be null or blank.")
}
val cal = if (interval.startsWith("interval")) {
CalendarInterval.fromString(interval)
} else {
CalendarInterval.fromString("interval " + interval)
}
if (cal == null) {
throw new IllegalArgumentException(s"Invalid interval: $interval")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}

View file

@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import org.apache.commons.lang3.StringUtils
import org.apache.spark.annotation.Evolving
import org.apache.spark.unsafe.types.CalendarInterval
@ -76,18 +74,7 @@ object ProcessingTime {
*/
@deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
def apply(interval: String): ProcessingTime = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
"interval cannot be null or blank.")
}
val cal = if (interval.startsWith("interval")) {
CalendarInterval.fromString(interval)
} else {
CalendarInterval.fromString("interval " + interval)
}
if (cal == null) {
throw new IllegalArgumentException(s"Invalid interval: $interval")
}
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}