[SPARK-31762][SQL] Fix perf regression of date/timestamp formatting in toHiveString
### What changes were proposed in this pull request? 1. Add new methods that accept date-time Java types to the DateFormatter and TimestampFormatter traits. The methods format input date-time instances to strings: - TimestampFormatter: - `def format(ts: Timestamp): String` - `def format(instant: Instant): String` - DateFormatter: - `def format(date: Date): String` - `def format(localDate: LocalDate): String` 2. Re-use the added methods from `HiveResult.toHiveString` 3. Borrow the code for formatting of `java.sql.Timestamp` from Spark 2.4 `DateTimeUtils.timestampToString` to `FractionTimestampFormatter` because legacy formatters don't support variable length patterns for seconds fractions. ### Why are the changes needed? To avoid unnecessary overhead of converting Java date-time types to micros/days before formatting. Also formatters have to convert input micros/days back to Java types to pass instances to standard library API. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing tests for toHiveString and new tests in `TimestampFormatterSuite`. Closes #28582 from MaxGekk/opt-format-old-types. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
a06768ec4d
commit
5d673319af
|
@ -29,7 +29,10 @@ import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
|
|||
|
||||
sealed trait DateFormatter extends Serializable {
|
||||
def parse(s: String): Int // returns days since epoch
|
||||
|
||||
def format(days: Int): String
|
||||
def format(date: Date): String
|
||||
def format(localDate: LocalDate): String
|
||||
}
|
||||
|
||||
class Iso8601DateFormatter(
|
||||
|
@ -56,22 +59,32 @@ class Iso8601DateFormatter(
|
|||
}
|
||||
}
|
||||
|
||||
override def format(localDate: LocalDate): String = {
|
||||
localDate.format(formatter)
|
||||
}
|
||||
|
||||
override def format(days: Int): String = {
|
||||
LocalDate.ofEpochDay(days).format(formatter)
|
||||
format(LocalDate.ofEpochDay(days))
|
||||
}
|
||||
|
||||
override def format(date: Date): String = {
|
||||
legacyFormatter.format(date)
|
||||
}
|
||||
}
|
||||
|
||||
trait LegacyDateFormatter extends DateFormatter {
|
||||
def parseToDate(s: String): Date
|
||||
def formatDate(d: Date): String
|
||||
|
||||
override def parse(s: String): Int = {
|
||||
fromJavaDate(new java.sql.Date(parseToDate(s).getTime))
|
||||
}
|
||||
|
||||
override def format(days: Int): String = {
|
||||
val date = DateTimeUtils.toJavaDate(days)
|
||||
formatDate(date)
|
||||
format(DateTimeUtils.toJavaDate(days))
|
||||
}
|
||||
|
||||
override def format(localDate: LocalDate): String = {
|
||||
format(localDateToDays(localDate))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,14 +92,14 @@ class LegacyFastDateFormatter(pattern: String, locale: Locale) extends LegacyDat
|
|||
@transient
|
||||
private lazy val fdf = FastDateFormat.getInstance(pattern, locale)
|
||||
override def parseToDate(s: String): Date = fdf.parse(s)
|
||||
override def formatDate(d: Date): String = fdf.format(d)
|
||||
override def format(d: Date): String = fdf.format(d)
|
||||
}
|
||||
|
||||
class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter {
|
||||
@transient
|
||||
private lazy val sdf = new SimpleDateFormat(pattern, locale)
|
||||
override def parseToDate(s: String): Date = sdf.parse(s)
|
||||
override def formatDate(d: Date): String = sdf.format(d)
|
||||
override def format(d: Date): String = sdf.format(d)
|
||||
}
|
||||
|
||||
object DateFormatter {
|
||||
|
|
|
@ -50,7 +50,10 @@ sealed trait TimestampFormatter extends Serializable {
|
|||
@throws(classOf[DateTimeParseException])
|
||||
@throws(classOf[DateTimeException])
|
||||
def parse(s: String): Long
|
||||
|
||||
def format(us: Long): String
|
||||
def format(ts: Timestamp): String
|
||||
def format(instant: Instant): String
|
||||
}
|
||||
|
||||
class Iso8601TimestampFormatter(
|
||||
|
@ -84,9 +87,17 @@ class Iso8601TimestampFormatter(
|
|||
}
|
||||
}
|
||||
|
||||
override def format(instant: Instant): String = {
|
||||
formatter.withZone(zoneId).format(instant)
|
||||
}
|
||||
|
||||
override def format(us: Long): String = {
|
||||
val instant = DateTimeUtils.microsToInstant(us)
|
||||
formatter.withZone(zoneId).format(instant)
|
||||
format(instant)
|
||||
}
|
||||
|
||||
override def format(ts: Timestamp): String = {
|
||||
legacyFormatter.format(ts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,10 +111,27 @@ class Iso8601TimestampFormatter(
|
|||
*/
|
||||
class FractionTimestampFormatter(zoneId: ZoneId)
|
||||
extends Iso8601TimestampFormatter(
|
||||
"", zoneId, TimestampFormatter.defaultLocale, needVarLengthSecondFraction = false) {
|
||||
TimestampFormatter.defaultPattern,
|
||||
zoneId,
|
||||
TimestampFormatter.defaultLocale,
|
||||
needVarLengthSecondFraction = false) {
|
||||
|
||||
@transient
|
||||
override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter
|
||||
|
||||
// The new formatter will omit the trailing 0 in the timestamp string, but the legacy formatter
|
||||
// can't. Here we borrow the code from Spark 2.4 DateTimeUtils.timestampToString to omit the
|
||||
// trailing 0 for the legacy formatter as well.
|
||||
override def format(ts: Timestamp): String = {
|
||||
val timestampString = ts.toString
|
||||
val formatted = legacyFormatter.format(ts)
|
||||
|
||||
if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
|
||||
formatted + timestampString.substring(19)
|
||||
} else {
|
||||
formatted
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -149,7 +177,7 @@ class LegacyFastTimestampFormatter(
|
|||
fastDateFormat.getTimeZone,
|
||||
fastDateFormat.getPattern.count(_ == 'S'))
|
||||
|
||||
def parse(s: String): SQLTimestamp = {
|
||||
override def parse(s: String): SQLTimestamp = {
|
||||
cal.clear() // Clear the calendar because it can be re-used many times
|
||||
if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) {
|
||||
throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
|
||||
|
@ -160,12 +188,20 @@ class LegacyFastTimestampFormatter(
|
|||
rebaseJulianToGregorianMicros(julianMicros)
|
||||
}
|
||||
|
||||
def format(timestamp: SQLTimestamp): String = {
|
||||
override def format(timestamp: SQLTimestamp): String = {
|
||||
val julianMicros = rebaseGregorianToJulianMicros(timestamp)
|
||||
cal.setTimeInMillis(Math.floorDiv(julianMicros, MICROS_PER_SECOND) * MILLIS_PER_SECOND)
|
||||
cal.setMicros(Math.floorMod(julianMicros, MICROS_PER_SECOND))
|
||||
fastDateFormat.format(cal)
|
||||
}
|
||||
|
||||
override def format(ts: Timestamp): String = {
|
||||
format(fromJavaTimestamp(ts))
|
||||
}
|
||||
|
||||
override def format(instant: Instant): String = {
|
||||
format(instantToMicros(instant))
|
||||
}
|
||||
}
|
||||
|
||||
class LegacySimpleTimestampFormatter(
|
||||
|
@ -187,6 +223,14 @@ class LegacySimpleTimestampFormatter(
|
|||
override def format(us: Long): String = {
|
||||
sdf.format(toJavaTimestamp(us))
|
||||
}
|
||||
|
||||
override def format(ts: Timestamp): String = {
|
||||
sdf.format(ts)
|
||||
}
|
||||
|
||||
override def format(instant: Instant): String = {
|
||||
format(instantToMicros(instant))
|
||||
}
|
||||
}
|
||||
|
||||
object LegacyDateFormats extends Enumeration {
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.time.{DateTimeException, LocalDate, ZoneOffset}
|
|||
import org.apache.spark.{SparkFunSuite, SparkUpgradeException}
|
||||
import org.apache.spark.sql.catalyst.plans.SQLHelper
|
||||
import org.apache.spark.sql.catalyst.util._
|
||||
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, localDateToDays}
|
||||
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
|
||||
|
||||
|
@ -41,8 +41,11 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
|
|||
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
|
||||
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
|
||||
val formatter = DateFormatter(getZoneId(timeZone))
|
||||
val date = formatter.format(17867)
|
||||
assert(date === "2018-12-02")
|
||||
val (days, expected) = (17867, "2018-12-02")
|
||||
val date = formatter.format(days)
|
||||
assert(date === expected)
|
||||
assert(formatter.format(daysToLocalDate(days)) === expected)
|
||||
assert(formatter.format(toJavaDate(days)) === expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -70,8 +73,9 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
|
|||
DateFormatter.defaultLocale,
|
||||
legacyFormat)
|
||||
val days = formatter.parse(date)
|
||||
val formatted = formatter.format(days)
|
||||
assert(date === formatted)
|
||||
assert(date === formatter.format(days))
|
||||
assert(date === formatter.format(daysToLocalDate(days)))
|
||||
assert(date === formatter.format(toJavaDate(days)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -170,7 +174,9 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
|
|||
DateFormatter.defaultLocale,
|
||||
legacyFormat)
|
||||
assert(LocalDate.ofEpochDay(formatter.parse("1000-01-01")) === LocalDate.of(1000, 1, 1))
|
||||
assert(formatter.format(LocalDate.of(1000, 1, 1)) === "1000-01-01")
|
||||
assert(formatter.format(localDateToDays(LocalDate.of(1000, 1, 1))) === "1000-01-01")
|
||||
assert(formatter.format(java.sql.Date.valueOf("1000-01-01")) === "1000-01-01")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,20 +57,29 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers
|
|||
test("format timestamps using time zones") {
|
||||
val microsSinceEpoch = 1543745472001234L
|
||||
val expectedTimestamp = Map(
|
||||
"UTC" -> "2018-12-02T10:11:12.001234",
|
||||
PST.getId -> "2018-12-02T02:11:12.001234",
|
||||
CET.getId -> "2018-12-02T11:11:12.001234",
|
||||
"Africa/Dakar" -> "2018-12-02T10:11:12.001234",
|
||||
"America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
|
||||
"Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
|
||||
"Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
|
||||
"Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
|
||||
"UTC" -> "2018-12-02 10:11:12.001234",
|
||||
PST.getId -> "2018-12-02 02:11:12.001234",
|
||||
CET.getId -> "2018-12-02 11:11:12.001234",
|
||||
"Africa/Dakar" -> "2018-12-02 10:11:12.001234",
|
||||
"America/Los_Angeles" -> "2018-12-02 02:11:12.001234",
|
||||
"Antarctica/Vostok" -> "2018-12-02 16:11:12.001234",
|
||||
"Asia/Hong_Kong" -> "2018-12-02 18:11:12.001234",
|
||||
"Europe/Amsterdam" -> "2018-12-02 11:11:12.001234")
|
||||
DateTimeTestUtils.outstandingTimezonesIds.foreach { zoneId =>
|
||||
val formatter = TimestampFormatter(
|
||||
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
|
||||
DateTimeUtils.getZoneId(zoneId))
|
||||
val timestamp = formatter.format(microsSinceEpoch)
|
||||
assert(timestamp === expectedTimestamp(zoneId))
|
||||
Seq(
|
||||
TimestampFormatter(
|
||||
"yyyy-MM-dd HH:mm:ss.SSSSSS",
|
||||
getZoneId(zoneId),
|
||||
// Test only FAST_DATE_FORMAT because other legacy formats don't support formatting
|
||||
// in microsecond precision.
|
||||
LegacyDateFormats.FAST_DATE_FORMAT,
|
||||
needVarLengthSecondFraction = false),
|
||||
TimestampFormatter.getFractionFormatter(getZoneId(zoneId))).foreach { formatter =>
|
||||
val timestamp = formatter.format(microsSinceEpoch)
|
||||
assert(timestamp === expectedTimestamp(zoneId))
|
||||
assert(formatter.format(microsToInstant(microsSinceEpoch)) === expectedTimestamp(zoneId))
|
||||
assert(formatter.format(toJavaTimestamp(microsSinceEpoch)) === expectedTimestamp(zoneId))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,20 +134,30 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers
|
|||
}
|
||||
|
||||
test("format fraction of second") {
|
||||
val formatter = TimestampFormatter.getFractionFormatter(ZoneOffset.UTC)
|
||||
assert(formatter.format(0) === "1970-01-01 00:00:00")
|
||||
assert(formatter.format(1) === "1970-01-01 00:00:00.000001")
|
||||
assert(formatter.format(1000) === "1970-01-01 00:00:00.001")
|
||||
assert(formatter.format(900000) === "1970-01-01 00:00:00.9")
|
||||
assert(formatter.format(1000000) === "1970-01-01 00:00:01")
|
||||
val formatter = TimestampFormatter.getFractionFormatter(UTC)
|
||||
Seq(
|
||||
0 -> "1970-01-01 00:00:00",
|
||||
1 -> "1970-01-01 00:00:00.000001",
|
||||
1000 -> "1970-01-01 00:00:00.001",
|
||||
900000 -> "1970-01-01 00:00:00.9",
|
||||
1000000 -> "1970-01-01 00:00:01").foreach { case (micros, tsStr) =>
|
||||
assert(formatter.format(micros) === tsStr)
|
||||
assert(formatter.format(microsToInstant(micros)) === tsStr)
|
||||
DateTimeTestUtils.withDefaultTimeZone(UTC) {
|
||||
assert(formatter.format(toJavaTimestamp(micros)) === tsStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("formatting negative years with default pattern") {
|
||||
val instant = LocalDateTime.of(-99, 1, 1, 0, 0, 0)
|
||||
.atZone(ZoneOffset.UTC)
|
||||
.toInstant
|
||||
val instant = LocalDateTime.of(-99, 1, 1, 0, 0, 0).atZone(UTC).toInstant
|
||||
val micros = DateTimeUtils.instantToMicros(instant)
|
||||
assert(TimestampFormatter(ZoneOffset.UTC).format(micros) === "-0099-01-01 00:00:00")
|
||||
assert(TimestampFormatter(UTC).format(micros) === "-0099-01-01 00:00:00")
|
||||
assert(TimestampFormatter(UTC).format(instant) === "-0099-01-01 00:00:00")
|
||||
DateTimeTestUtils.withDefaultTimeZone(UTC) { // toJavaTimestamp depends on the default time zone
|
||||
assert(TimestampFormatter("yyyy-MM-dd HH:mm:SS G", UTC).format(toJavaTimestamp(micros))
|
||||
=== "0100-01-01 00:00:00 BC")
|
||||
}
|
||||
}
|
||||
|
||||
test("special timestamp values") {
|
||||
|
@ -266,24 +285,31 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers
|
|||
|
||||
test("SPARK-31557: rebasing in legacy formatters/parsers") {
|
||||
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> LegacyBehaviorPolicy.LEGACY.toString) {
|
||||
LegacyDateFormats.values.foreach { legacyFormat =>
|
||||
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
|
||||
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zoneId.getId) {
|
||||
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
|
||||
withClue(s"${zoneId.getId} legacyFormat = $legacyFormat") {
|
||||
val formatter = TimestampFormatter(
|
||||
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
|
||||
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zoneId.getId) {
|
||||
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
|
||||
withClue(s"zoneId = ${zoneId.getId}") {
|
||||
val formatters = LegacyDateFormats.values.map { legacyFormat =>
|
||||
TimestampFormatter(
|
||||
TimestampFormatter.defaultPattern,
|
||||
zoneId,
|
||||
TimestampFormatter.defaultLocale,
|
||||
legacyFormat,
|
||||
needVarLengthSecondFraction = false)
|
||||
}.toSeq :+ TimestampFormatter.getFractionFormatter(zoneId)
|
||||
formatters.foreach { formatter =>
|
||||
assert(microsToInstant(formatter.parse("1000-01-01 01:02:03"))
|
||||
.atZone(zoneId)
|
||||
.toLocalDateTime === LocalDateTime.of(1000, 1, 1, 1, 2, 3))
|
||||
|
||||
assert(formatter.format(
|
||||
LocalDateTime.of(1000, 1, 1, 1, 2, 3).atZone(zoneId).toInstant) ===
|
||||
"1000-01-01 01:02:03")
|
||||
assert(formatter.format(instantToMicros(
|
||||
LocalDateTime.of(1000, 1, 1, 1, 2, 3)
|
||||
.atZone(zoneId).toInstant)) === "1000-01-01 01:02:03")
|
||||
assert(formatter.format(java.sql.Timestamp.valueOf("1000-01-01 01:02:03")) ===
|
||||
"1000-01-01 01:02:03")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,13 +80,10 @@ object HiveResult {
|
|||
def toHiveString(a: (Any, DataType), nested: Boolean = false): String = a match {
|
||||
case (null, _) => if (nested) "null" else "NULL"
|
||||
case (b, BooleanType) => b.toString
|
||||
case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d))
|
||||
case (ld: LocalDate, DateType) =>
|
||||
dateFormatter.format(DateTimeUtils.localDateToDays(ld))
|
||||
case (t: Timestamp, TimestampType) =>
|
||||
timestampFormatter.format(DateTimeUtils.fromJavaTimestamp(t))
|
||||
case (i: Instant, TimestampType) =>
|
||||
timestampFormatter.format(DateTimeUtils.instantToMicros(i))
|
||||
case (d: Date, DateType) => dateFormatter.format(d)
|
||||
case (ld: LocalDate, DateType) => dateFormatter.format(ld)
|
||||
case (t: Timestamp, TimestampType) => timestampFormatter.format(t)
|
||||
case (i: Instant, TimestampType) => timestampFormatter.format(i)
|
||||
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
|
||||
case (decimal: java.math.BigDecimal, DecimalType()) => decimal.toPlainString
|
||||
case (n, _: NumericType) => n.toString
|
||||
|
|
|
@ -584,7 +584,7 @@ select make_date(-44, 3, 15)
|
|||
-- !query schema
|
||||
struct<make_date(-44, 3, 15):date>
|
||||
-- !query output
|
||||
-0044-03-15
|
||||
0045-03-15
|
||||
|
||||
|
||||
-- !query
|
||||
|
|
Loading…
Reference in a new issue