diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index 611b2a217a..e36efa3b0f 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -18,6 +18,7 @@ package org.apache.spark.unsafe.types; import java.io.Serializable; +import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -66,6 +67,10 @@ public final class CalendarInterval implements Serializable { } } + /** + * Convert a string to CalendarInterval. Return null if the input string is not a valid interval. + * This method is case-sensitive and all characters in the input string should be in lower case. + */ public static CalendarInterval fromString(String s) { if (s == null) { return null; @@ -87,6 +92,26 @@ public final class CalendarInterval implements Serializable { } } + /** + * Convert a string to CalendarInterval. Unlike fromString, this method is case-insensitive and + * will throw IllegalArgumentException when the input string is not a valid interval. + * + * @throws IllegalArgumentException if the string is not a valid internal. + */ + public static CalendarInterval fromCaseInsensitiveString(String s) { + if (s == null || s.trim().isEmpty()) { + throw new IllegalArgumentException("Interval cannot be null or blank."); + } + String sInLowerCase = s.trim().toLowerCase(Locale.ROOT); + String interval = + sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase; + CalendarInterval cal = fromString(interval); + if (cal == null) { + throw new IllegalArgumentException("Invalid interval: " + s); + } + return cal; + } + public static long toLongWithRange(String fieldName, String s, long minValue, long maxValue) throws IllegalArgumentException { long result = 0; diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java index 1e556913b2..994af8f082 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java @@ -104,6 +104,31 @@ public class CalendarIntervalSuite { assertNull(fromString(input)); } + @Test + public void fromCaseInsensitiveStringTest() { + for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) { + assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000)); + } + + for (String input : new String[]{null, "", " "}) { + try { + fromCaseInsensitiveString(input); + fail("Expected to throw an exception for the invalid input"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("cannot be null or blank")); + } + } + + for (String input : new String[]{"interval", "interval1 day", "foo", "foo 1 day"}) { + try { + fromCaseInsensitiveString(input); + fail("Expected to throw an exception for the invalid input"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid interval")); + } + } + } + @Test public void fromYearMonthStringTest() { String input; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 8e48856d46..9aae678deb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.commons.lang3.StringUtils - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure @@ -104,20 +102,7 @@ object TimeWindow { * precision. */ private def getIntervalInMicroSeconds(interval: String): Long = { - if (StringUtils.isBlank(interval)) { - throw new IllegalArgumentException( - "The window duration, slide duration and start time cannot be null or blank.") - } - val intervalString = if (interval.startsWith("interval")) { - interval - } else { - "interval " + interval - } - val cal = CalendarInterval.fromString(intervalString) - if (cal == null) { - throw new IllegalArgumentException( - s"The provided interval ($interval) did not correspond to a valid interval string.") - } + val cal = CalendarInterval.fromCaseInsensitiveString(interval) if (cal.months > 0) { throw new IllegalArgumentException( s"Intervals greater than a month is not supported ($interval).") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5d6e5306f1..74cb3e6274 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -695,8 +695,14 @@ class Dataset[T] private[sql]( // defined on a derived column cannot referenced elsewhere in the plan. def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan { val parsedDelay = - Option(CalendarInterval.fromString("interval " + delayThreshold)) - .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) + try { + CalendarInterval.fromCaseInsensitiveString(delayThreshold) + } catch { + case e: IllegalArgumentException => + throw new AnalysisException( + s"Unable to parse time delay '$delayThreshold'", + cause = Some(e)) + } require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, s"delay threshold ($delayThreshold) should not be negative.") EliminateEventTimeWatermark( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index fcb230bd08..dda9d41f63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.streaming import java.sql.Date import java.util.concurrent.TimeUnit -import org.apache.commons.lang3.StringUtils - import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout} import org.apache.spark.sql.execution.streaming.GroupStateImpl._ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} @@ -161,20 +159,7 @@ private[sql] class GroupStateImpl[S] private( def getTimeoutTimestamp: Long = timeoutTimestamp private def parseDuration(duration: String): Long = { - if (StringUtils.isBlank(duration)) { - throw new IllegalArgumentException( - "Provided duration is null or blank.") - } - val intervalString = if (duration.startsWith("interval")) { - duration - } else { - "interval " + duration - } - val cal = CalendarInterval.fromString(intervalString) - if (cal == null) { - throw new IllegalArgumentException( - s"Provided duration ($duration) is not valid.") - } + val cal = CalendarInterval.fromCaseInsensitiveString(duration) if (cal.milliseconds < 0 || cal.months < 0) { throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala index fd0ff31199..bd343f3806 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala @@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration -import org.apache.commons.lang3.StringUtils - import org.apache.spark.annotation.Evolving import org.apache.spark.sql.streaming.Trigger import org.apache.spark.unsafe.types.CalendarInterval @@ -38,18 +36,7 @@ case class ContinuousTrigger(intervalMs: Long) extends Trigger { private[sql] object ContinuousTrigger { def apply(interval: String): ContinuousTrigger = { - if (StringUtils.isBlank(interval)) { - throw new IllegalArgumentException( - "interval cannot be null or blank.") - } - val cal = if (interval.startsWith("interval")) { - CalendarInterval.fromString(interval) - } else { - CalendarInterval.fromString("interval " + interval) - } - if (cal == null) { - throw new IllegalArgumentException(s"Invalid interval: $interval") - } + val cal = CalendarInterval.fromCaseInsensitiveString(interval) if (cal.months > 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala index 38b0776ec1..417d698bdb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala @@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration -import org.apache.commons.lang3.StringUtils - import org.apache.spark.annotation.Evolving import org.apache.spark.unsafe.types.CalendarInterval @@ -76,18 +74,7 @@ object ProcessingTime { */ @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") def apply(interval: String): ProcessingTime = { - if (StringUtils.isBlank(interval)) { - throw new IllegalArgumentException( - "interval cannot be null or blank.") - } - val cal = if (interval.startsWith("interval")) { - CalendarInterval.fromString(interval) - } else { - CalendarInterval.fromString("interval " + interval) - } - if (cal == null) { - throw new IllegalArgumentException(s"Invalid interval: $interval") - } + val cal = CalendarInterval.fromCaseInsensitiveString(interval) if (cal.months > 0) { throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") }