[SPARK-29605][SQL] Optimize string to interval casting

### What changes were proposed in this pull request?
In the PR, I propose new function `stringToInterval()` in `IntervalUtils` for converting `UTF8String` to `CalendarInterval`. The function is used in casting a `STRING` column to an `INTERVAL` column.

### Why are the changes needed?
The proposed implementation is ~10 times faster. For example, parsing 9 interval units on JDK 8:
Before:
```
9 units w/ interval                               14004          14125         116          0.1       14003.6       0.0X
9 units w/o interval                              13785          14056         290          0.1       13784.9       0.0X
```
After:
```
9 units w/ interval                                1343           1344           1          0.7        1343.0       0.3X
9 units w/o interval                               1345           1349           8          0.7        1344.6       0.3X
```

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
- By new tests for `stringToInterval` in `IntervalUtilsSuite`
- By existing tests

Closes #26256 from MaxGekk/string-to-interval.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Maxim Gekk 2019-11-07 12:39:52 +08:00 committed by Wenchen Fan
parent 3437862975
commit 29dc59ac29
6 changed files with 304 additions and 98 deletions

View file

@ -370,7 +370,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
return Platform.getByte(base, offset + i);
}
private boolean matchAt(final UTF8String s, int pos) {
public boolean matchAt(final UTF8String s, int pos) {
if (s.numBytes + pos > numBytes || pos < 0) {
return false;
}

View file

@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.UTF8StringBuilder
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.unsafe.types.UTF8String.{IntWrapper, LongWrapper}
object Cast {
@ -466,7 +466,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
// IntervalConverter
private[this] def castToInterval(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, s => IntervalUtils.safeFromString(s.toString))
buildCast[UTF8String](_, s => IntervalUtils.stringToInterval(s))
}
// LongConverter
@ -1215,7 +1215,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
case StringType =>
val util = IntervalUtils.getClass.getCanonicalName.stripSuffix("$")
(c, evPrim, evNull) =>
code"""$evPrim = $util.safeFromString($c.toString());
code"""$evPrim = $util.stringToInterval($c);
if(${evPrim} == null) {
${evNull} = true;
}

View file

@ -23,7 +23,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
object IntervalUtils {
final val MONTHS_PER_YEAR: Int = 12
@ -39,6 +39,7 @@ object IntervalUtils {
final val MICROS_PER_MONTH: Long = DAYS_PER_MONTH * DateTimeUtils.MICROS_PER_DAY
/* 365.25 days per year assumes leap year every four years */
final val MICROS_PER_YEAR: Long = (36525L * DateTimeUtils.MICROS_PER_DAY) / 100
final val DAYS_PER_WEEK: Byte = 7
def getYears(interval: CalendarInterval): Int = {
interval.months / MONTHS_PER_YEAR
@ -389,4 +390,194 @@ object IntervalUtils {
if (num == 0) throw new java.lang.ArithmeticException("divide by zero")
fromDoubles(interval.months / num, interval.days / num, interval.microseconds / num)
}
private object ParseState extends Enumeration {
val PREFIX,
BEGIN_VALUE,
PARSE_SIGN,
PARSE_UNIT_VALUE,
FRACTIONAL_PART,
BEGIN_UNIT_NAME,
UNIT_NAME_SUFFIX,
END_UNIT_NAME = Value
}
private final val intervalStr = UTF8String.fromString("interval ")
private final val yearStr = UTF8String.fromString("year")
private final val monthStr = UTF8String.fromString("month")
private final val weekStr = UTF8String.fromString("week")
private final val dayStr = UTF8String.fromString("day")
private final val hourStr = UTF8String.fromString("hour")
private final val minuteStr = UTF8String.fromString("minute")
private final val secondStr = UTF8String.fromString("second")
private final val millisStr = UTF8String.fromString("millisecond")
private final val microsStr = UTF8String.fromString("microsecond")
def stringToInterval(input: UTF8String): CalendarInterval = {
import ParseState._
if (input == null) {
return null
}
// scalastyle:off caselocale .toLowerCase
val s = input.trim.toLowerCase
// scalastyle:on
val bytes = s.getBytes
if (bytes.length == 0) {
return null
}
var state = PREFIX
var i = 0
var currentValue: Long = 0
var isNegative: Boolean = false
var months: Int = 0
var days: Int = 0
var microseconds: Long = 0
var fractionScale: Int = 0
var fraction: Int = 0
while (i < bytes.length) {
val b = bytes(i)
state match {
case PREFIX =>
if (s.startsWith(intervalStr)) {
if (s.numBytes() == intervalStr.numBytes()) {
return null
} else {
i += intervalStr.numBytes()
}
}
state = BEGIN_VALUE
case BEGIN_VALUE =>
b match {
case ' ' => i += 1
case _ => state = PARSE_SIGN
}
case PARSE_SIGN =>
b match {
case '-' =>
isNegative = true
i += 1
case '+' =>
isNegative = false
i += 1
case _ if '0' <= b && b <= '9' =>
isNegative = false
case _ => return null
}
currentValue = 0
fraction = 0
// Sets the scale to an invalid value to track fraction presence
// in the BEGIN_UNIT_NAME state
fractionScale = -1
state = PARSE_UNIT_VALUE
case PARSE_UNIT_VALUE =>
b match {
case _ if '0' <= b && b <= '9' =>
try {
currentValue = Math.addExact(Math.multiplyExact(10, currentValue), (b - '0'))
} catch {
case _: ArithmeticException => return null
}
case ' ' =>
state = BEGIN_UNIT_NAME
case '.' =>
fractionScale = (DateTimeUtils.NANOS_PER_SECOND / 10).toInt
state = FRACTIONAL_PART
case _ => return null
}
i += 1
case FRACTIONAL_PART =>
b match {
case _ if '0' <= b && b <= '9' && fractionScale > 0 =>
fraction += (b - '0') * fractionScale
fractionScale /= 10
case ' ' =>
fraction /= DateTimeUtils.NANOS_PER_MICROS.toInt
state = BEGIN_UNIT_NAME
case _ => return null
}
i += 1
case BEGIN_UNIT_NAME =>
if (b == ' ') {
i += 1
} else {
// Checks that only seconds can have the fractional part
if (b != 's' && fractionScale >= 0) {
return null
}
if (isNegative) {
currentValue = -currentValue
fraction = -fraction
}
try {
b match {
case 'y' if s.matchAt(yearStr, i) =>
val monthsInYears = Math.multiplyExact(MONTHS_PER_YEAR, currentValue)
months = Math.toIntExact(Math.addExact(months, monthsInYears))
i += yearStr.numBytes()
case 'w' if s.matchAt(weekStr, i) =>
val daysInWeeks = Math.multiplyExact(DAYS_PER_WEEK, currentValue)
days = Math.toIntExact(Math.addExact(days, daysInWeeks))
i += weekStr.numBytes()
case 'd' if s.matchAt(dayStr, i) =>
days = Math.addExact(days, Math.toIntExact(currentValue))
i += dayStr.numBytes()
case 'h' if s.matchAt(hourStr, i) =>
val hoursUs = Math.multiplyExact(currentValue, MICROS_PER_HOUR)
microseconds = Math.addExact(microseconds, hoursUs)
i += hourStr.numBytes()
case 's' if s.matchAt(secondStr, i) =>
val secondsUs = Math.multiplyExact(currentValue, DateTimeUtils.MICROS_PER_SECOND)
microseconds = Math.addExact(Math.addExact(microseconds, secondsUs), fraction)
i += secondStr.numBytes()
case 'm' =>
if (s.matchAt(monthStr, i)) {
months = Math.addExact(months, Math.toIntExact(currentValue))
i += monthStr.numBytes()
} else if (s.matchAt(minuteStr, i)) {
val minutesUs = Math.multiplyExact(currentValue, MICROS_PER_MINUTE)
microseconds = Math.addExact(microseconds, minutesUs)
i += minuteStr.numBytes()
} else if (s.matchAt(millisStr, i)) {
val millisUs = Math.multiplyExact(
currentValue,
DateTimeUtils.MICROS_PER_MILLIS)
microseconds = Math.addExact(microseconds, millisUs)
i += millisStr.numBytes()
} else if (s.matchAt(microsStr, i)) {
microseconds = Math.addExact(microseconds, currentValue)
i += microsStr.numBytes()
} else return null
case _ => return null
}
} catch {
case _: ArithmeticException => return null
}
state = UNIT_NAME_SUFFIX
}
case UNIT_NAME_SUFFIX =>
b match {
case 's' => state = END_UNIT_NAME
case ' ' => state = BEGIN_VALUE
case _ => return null
}
i += 1
case END_UNIT_NAME =>
b match {
case ' ' =>
i += 1
state = BEGIN_VALUE
case _ => return null
}
}
}
val result = state match {
case UNIT_NAME_SUFFIX | END_UNIT_NAME | BEGIN_VALUE =>
new CalendarInterval(months, days, microseconds)
case _ => null
}
result
}
}

View file

@ -22,11 +22,39 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{MICROS_PER_MILLIS, MICROS_PER_SECOND}
import org.apache.spark.sql.catalyst.util.IntervalUtils._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
class IntervalUtilsSuite extends SparkFunSuite {
test("fromString: basic") {
private def checkFromString(input: String, expected: CalendarInterval): Unit = {
assert(fromString(input) === expected)
assert(stringToInterval(UTF8String.fromString(input)) === expected)
}
private def checkFromInvalidString(input: String, errorMsg: String): Unit = {
try {
fromString(input)
fail("Expected to throw an exception for the invalid input")
} catch {
case e: IllegalArgumentException =>
val msg = e.getMessage
assert(msg.contains(errorMsg))
}
assert(stringToInterval(UTF8String.fromString(input)) === null)
}
private def testSingleUnit(
unit: String, number: Int, months: Int, days: Int, microseconds: Long): Unit = {
for (prefix <- Seq("interval ", "")) {
val input1 = prefix + number + " " + unit
val input2 = prefix + number + " " + unit + "s"
val result = new CalendarInterval(months, days, microseconds)
checkFromString(input1, result)
checkFromString(input2, result)
}
}
test("string to interval: basic") {
testSingleUnit("YEAR", 3, 36, 0, 0)
testSingleUnit("Month", 3, 3, 0, 0)
testSingleUnit("Week", 3, 0, 21, 0)
@ -37,60 +65,48 @@ class IntervalUtilsSuite extends SparkFunSuite {
testSingleUnit("MilliSecond", 3, 0, 0, 3 * MICROS_PER_MILLIS)
testSingleUnit("MicroSecond", 3, 0, 0, 3)
for (input <- Seq(null, "", " ")) {
try {
fromString(input)
fail("Expected to throw an exception for the invalid input")
} catch {
case e: IllegalArgumentException =>
val msg = e.getMessage
if (input == null) {
assert(msg.contains("cannot be null"))
}
}
}
checkFromInvalidString(null, "cannot be null")
for (input <- Seq("interval", "interval1 day", "foo", "foo 1 day")) {
try {
fromString(input)
fail("Expected to throw an exception for the invalid input")
} catch {
case e: IllegalArgumentException =>
val msg = e.getMessage
assert(msg.contains("Invalid interval string"))
}
for (input <- Seq("", " ", "interval", "interval1 day", "foo", "foo 1 day")) {
checkFromInvalidString(input, "Invalid interval string")
}
}
test("fromString: random order field") {
val input = "1 day 1 year"
val result = new CalendarInterval(12, 1, 0)
assert(fromString(input) == result)
}
test("fromString: duplicated fields") {
val input = "1 day 1 day"
val result = new CalendarInterval(0, 2, 0)
assert(fromString(input) == result)
}
test("fromString: value with +/-") {
val input = "+1 year -1 day"
val result = new CalendarInterval(12, -1, 0)
assert(fromString(input) == result)
}
private def testSingleUnit(
unit: String, number: Int, months: Int, days: Int, microseconds: Long): Unit = {
for (prefix <- Seq("interval ", "")) {
val input1 = prefix + number + " " + unit
val input2 = prefix + number + " " + unit + "s"
val result = new CalendarInterval(months, days, microseconds)
assert(fromString(input1) == result)
assert(fromString(input2) == result)
test("string to interval: multiple units") {
Seq(
"-1 MONTH 1 day -1 microseconds" -> new CalendarInterval(-1, 1, -1),
" 123 MONTHS 123 DAYS 123 Microsecond " -> new CalendarInterval(123, 123, 123),
"interval -1 day +3 Microseconds" -> new CalendarInterval(0, -1, 3),
" interval 8 years -11 months 123 weeks -1 day " +
"23 hours -22 minutes 1 second -123 millisecond 567 microseconds " ->
new CalendarInterval(85, 860, 81480877567L)).foreach { case (input, expected) =>
checkFromString(input, expected)
}
}
test("string to interval: special cases") {
// Support any order of interval units
checkFromString("1 day 1 year", new CalendarInterval(12, 1, 0))
// Allow duplicated units and summarize their values
checkFromString("1 day 10 day", new CalendarInterval(0, 11, 0))
// Only the seconds units can have the fractional part
checkFromInvalidString("1.5 days", "Error parsing interval string")
checkFromInvalidString("1. hour", "Error parsing interval string")
}
test("string to interval: seconds with fractional part") {
checkFromString("0.1 seconds", new CalendarInterval(0, 0, 100000))
checkFromString("1. seconds", new CalendarInterval(0, 0, 1000000))
checkFromString("123.001 seconds", new CalendarInterval(0, 0, 123001000))
checkFromString("1.001001 seconds", new CalendarInterval(0, 0, 1001001))
checkFromString("1 minute 1.001001 seconds", new CalendarInterval(0, 0, 61001001))
checkFromString("-1.5 seconds", new CalendarInterval(0, 0, -1500000))
// truncate nanoseconds to microseconds
checkFromString("0.999999999 seconds", new CalendarInterval(0, 0, 999999))
checkFromInvalidString("0.123456789123 seconds", "Error parsing interval string")
}
test("from year-month string") {
assert(fromYearMonthString("99-10") === new CalendarInterval(99 * 12 + 10, 0, 0L))
assert(fromYearMonthString("+99-10") === new CalendarInterval(99 * 12 + 10, 0, 0L))

View file

@ -1,25 +1,25 @@
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.15.1
Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
cast strings to intervals: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
prepare string w/ interval 672 728 64 1.5 672.1 1.0X
prepare string w/o interval 580 602 19 1.7 580.4 1.2X
1 units w/ interval 9450 9575 138 0.1 9449.6 0.1X
1 units w/o interval 8948 8968 19 0.1 8948.3 0.1X
2 units w/ interval 10947 10966 19 0.1 10947.1 0.1X
2 units w/o interval 10470 10489 26 0.1 10469.5 0.1X
3 units w/ interval 12265 12333 72 0.1 12264.5 0.1X
3 units w/o interval 12001 12004 3 0.1 12000.6 0.1X
4 units w/ interval 13749 13828 69 0.1 13748.5 0.0X
4 units w/o interval 13467 13479 15 0.1 13467.3 0.0X
5 units w/ interval 15392 15446 51 0.1 15392.1 0.0X
5 units w/o interval 15090 15107 29 0.1 15089.7 0.0X
6 units w/ interval 16696 16714 20 0.1 16695.9 0.0X
6 units w/o interval 16361 16366 5 0.1 16361.4 0.0X
7 units w/ interval 18190 18270 71 0.1 18190.2 0.0X
7 units w/o interval 17757 17767 9 0.1 17756.7 0.0X
8 units w/ interval 19821 19870 43 0.1 19820.7 0.0X
8 units w/o interval 19479 19555 97 0.1 19479.5 0.0X
9 units w/ interval 21417 21481 56 0.0 21417.1 0.0X
9 units w/o interval 21058 21131 86 0.0 21058.2 0.0X
prepare string w/ interval 442 472 41 2.3 442.4 1.0X
prepare string w/o interval 420 423 6 2.4 419.6 1.1X
1 units w/ interval 350 359 9 2.9 349.8 1.3X
1 units w/o interval 316 317 1 3.2 316.4 1.4X
2 units w/ interval 457 459 2 2.2 457.0 1.0X
2 units w/o interval 432 435 3 2.3 432.2 1.0X
3 units w/ interval 610 613 3 1.6 609.8 0.7X
3 units w/o interval 581 583 2 1.7 580.5 0.8X
4 units w/ interval 720 724 4 1.4 720.4 0.6X
4 units w/o interval 699 704 8 1.4 699.4 0.6X
5 units w/ interval 850 850 0 1.2 849.9 0.5X
5 units w/o interval 829 832 5 1.2 828.7 0.5X
6 units w/ interval 927 932 4 1.1 927.1 0.5X
6 units w/o interval 891 892 1 1.1 890.5 0.5X
7 units w/ interval 1033 1040 8 1.0 1033.2 0.4X
7 units w/o interval 1020 1024 5 1.0 1020.2 0.4X
8 units w/ interval 1168 1169 2 0.9 1168.0 0.4X
8 units w/o interval 1155 1157 2 0.9 1154.5 0.4X
9 units w/ interval 1326 1328 3 0.8 1326.1 0.3X
9 units w/o interval 1372 1381 14 0.7 1372.5 0.3X

View file

@ -1,26 +1,25 @@
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Java HotSpot(TM) 64-Bit Server VM 1.8.0_231-b11 on Mac OS X 10.15.1
Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
cast strings to intervals: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
prepare string w/ interval 596 647 61 1.7 596.0 1.0X
prepare string w/o interval 530 554 22 1.9 530.2 1.1X
1 units w/ interval 9168 9243 66 0.1 9167.8 0.1X
1 units w/o interval 8740 8744 5 0.1 8740.2 0.1X
2 units w/ interval 10815 10874 52 0.1 10815.0 0.1X
2 units w/o interval 10413 10419 11 0.1 10412.8 0.1X
3 units w/ interval 12490 12530 37 0.1 12490.3 0.0X
3 units w/o interval 12173 12180 9 0.1 12172.8 0.0X
4 units w/ interval 13788 13834 43 0.1 13788.0 0.0X
4 units w/o interval 13445 13456 10 0.1 13445.5 0.0X
5 units w/ interval 15313 15330 15 0.1 15312.7 0.0X
5 units w/o interval 14928 14942 16 0.1 14928.0 0.0X
6 units w/ interval 16959 17003 42 0.1 16959.1 0.0X
6 units w/o interval 16623 16627 5 0.1 16623.3 0.0X
7 units w/ interval 18955 18972 21 0.1 18955.4 0.0X
7 units w/o interval 18454 18462 7 0.1 18454.1 0.0X
8 units w/ interval 20835 20843 8 0.0 20835.4 0.0X
8 units w/o interval 20446 20463 19 0.0 20445.7 0.0X
9 units w/ interval 22981 23031 43 0.0 22981.4 0.0X
9 units w/o interval 22581 22603 25 0.0 22581.1 0.0X
prepare string w/ interval 422 437 16 2.4 421.8 1.0X
prepare string w/o interval 369 374 8 2.7 369.4 1.1X
1 units w/ interval 426 430 5 2.3 425.5 1.0X
1 units w/o interval 382 386 5 2.6 382.1 1.1X
2 units w/ interval 519 527 9 1.9 518.5 0.8X
2 units w/o interval 505 512 6 2.0 505.4 0.8X
3 units w/ interval 650 653 3 1.5 649.6 0.6X
3 units w/o interval 630 633 4 1.6 629.7 0.7X
4 units w/ interval 755 761 6 1.3 754.9 0.6X
4 units w/o interval 745 749 3 1.3 745.3 0.6X
5 units w/ interval 882 891 14 1.1 882.0 0.5X
5 units w/o interval 867 870 3 1.2 867.4 0.5X
6 units w/ interval 1008 1013 4 1.0 1008.2 0.4X
6 units w/o interval 990 995 5 1.0 990.4 0.4X
7 units w/ interval 1057 1063 6 0.9 1056.9 0.4X
7 units w/o interval 1042 1046 4 1.0 1042.3 0.4X
8 units w/ interval 1206 1208 2 0.8 1206.0 0.3X
8 units w/o interval 1194 1198 4 0.8 1194.1 0.4X
9 units w/ interval 1322 1324 3 0.8 1321.5 0.3X
9 units w/o interval 1314 1318 4 0.8 1313.6 0.3X