[SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON

## What changes were proposed in this pull request?

In the PR, I propose to switch on **java.time API** for parsing timestamps and dates from JSON inputs with microseconds precision. The SQL config `spark.sql.legacy.timeParser.enabled` allow to switch back to previous behavior with using `java.text.SimpleDateFormat`/`FastDateFormat` for parsing/generating timestamps/dates.

## How was this patch tested?

It was tested by `JsonExpressionsSuite`, `JsonFunctionsSuite` and `JsonSuite`.

Closes #23196 from MaxGekk/json-time-parser.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Maxim Gekk 2018-12-16 09:32:13 +08:00 committed by Wenchen Fan
parent 860f4497f2
commit 8a27952cdb
12 changed files with 415 additions and 328 deletions

View file

@ -35,7 +35,7 @@ displayTitle: Spark SQL Upgrading Guide
- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.
- Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
- Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback to `Timestamp.valueOf`. To parse the same timestamp since Spark 3.0, the pattern should be `yyyy-MM-dd HH:mm:ss.SSS`. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
- In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.

View file

@ -22,13 +22,13 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.DateTimeFormatter
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.types._
class CSVInferSchema(val options: CSVOptions) extends Serializable {
@transient
private lazy val timeParser = DateTimeFormatter(
private lazy val timestampParser = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private def tryParseTimestamp(field: String): DataType = {
// This case infers a custom `dataFormat` is set.
if ((allCatch opt timeParser.parse(field)).isDefined) {
if ((allCatch opt timestampParser.parse(field)).isDefined) {
TimestampType
} else {
tryParseBoolean(field)

View file

@ -22,7 +22,7 @@ import java.io.Writer
import com.univocity.parsers.csv.CsvWriter
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.types._
class UnivocityGenerator(
@ -41,18 +41,18 @@ class UnivocityGenerator(
private val valueConverters: Array[ValueConverter] =
schema.map(_.dataType).map(makeConverter).toArray
private val timeFormatter = DateTimeFormatter(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType =>
(row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))
case TimestampType =>
(row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))
(row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal))
case udt: UserDefinedType[_] => makeConverter(udt.sqlType)

View file

@ -74,11 +74,11 @@ class UnivocityParser(
private val row = new GenericInternalRow(requiredSchema.length)
private val timeFormatter = DateTimeFormatter(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
@ -158,7 +158,7 @@ class UnivocityParser(
}
case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse)
case _: DateType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)

View file

@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.util.{Locale, TimeZone}
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
@ -82,13 +81,10 @@ private[sql] class JSONOptions(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
val timestampFormat: String =
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

View file

@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._
/**
@ -77,6 +77,12 @@ private[sql] class JacksonGenerator(
private val lineSeparator: String = options.lineSeparatorInWrite
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
(row: SpecializedGetters, ordinal: Int) =>
@ -116,14 +122,12 @@ private[sql] class JacksonGenerator(
case TimestampType =>
(row: SpecializedGetters, ordinal: Int) =>
val timestampString =
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
val timestampString = timestampFormatter.format(row.getLong(ordinal))
gen.writeString(timestampString)
case DateType =>
(row: SpecializedGetters, ordinal: Int) =>
val dateString =
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
val dateString = dateFormatter.format(row.getInt(ordinal))
gen.writeString(dateString)
case BinaryType =>

View file

@ -55,6 +55,12 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
@ -218,17 +224,7 @@ class JacksonParser(
case TimestampType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Long.box {
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
}
timestampFormatter.parse(parser.getText)
case VALUE_NUMBER_INT =>
parser.getLongValue * 1000000L
@ -237,22 +233,7 @@ class JacksonParser(
case DateType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.x
Int.box {
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime))
.orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime))
}
.getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
stringValue.toInt
}
}
dateFormatter.parse(parser.getText)
}
case BinaryType =>

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util
import java.time._
import java.time.format.DateTimeFormatterBuilder
import java.time.temporal.{ChronoField, TemporalQueries}
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.{Locale, TimeZone}
import scala.util.Try
@ -28,31 +28,44 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.internal.SQLConf
sealed trait DateTimeFormatter {
sealed trait TimestampFormatter {
def parse(s: String): Long // returns microseconds since epoch
def format(us: Long): String
}
class Iso8601DateTimeFormatter(
trait FormatterUtils {
protected def zoneId: ZoneId
protected def buildFormatter(
pattern: String,
locale: Locale): java.time.format.DateTimeFormatter = {
new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
}
protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
Instant.from(zonedDateTime)
}
}
class Iso8601TimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateTimeFormatter {
val formatter = new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
locale: Locale) extends TimestampFormatter with FormatterUtils {
val zoneId = timeZone.toZoneId
val formatter = buildFormatter(pattern, locale)
def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
if (temporalAccessor.query(TemporalQueries.offset()) == null) {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
Instant.from(zonedDateTime)
toInstantWithZoneId(temporalAccessor)
} else {
Instant.from(temporalAccessor)
}
@ -75,10 +88,10 @@ class Iso8601DateTimeFormatter(
}
}
class LegacyDateTimeFormatter(
class LegacyTimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateTimeFormatter {
locale: Locale) extends TimestampFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)
protected def toMillis(s: String): Long = format.parse(s).getTime
@ -90,21 +103,21 @@ class LegacyDateTimeFormatter(
}
}
class LegacyFallbackDateTimeFormatter(
class LegacyFallbackTimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) {
locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) {
override def toMillis(s: String): Long = {
Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
}
}
object DateTimeFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = {
object TimestampFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
new LegacyFallbackTimestampFormatter(format, timeZone, locale)
} else {
new Iso8601DateTimeFormatter(format, timeZone, locale)
new Iso8601TimestampFormatter(format, timeZone, locale)
}
}
}
@ -116,13 +129,19 @@ sealed trait DateFormatter {
class Iso8601DateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateFormatter {
locale: Locale) extends DateFormatter with FormatterUtils {
val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale)
val zoneId = ZoneId.of("UTC")
val formatter = buildFormatter(pattern, locale)
def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
toInstantWithZoneId(temporalAccessor)
}
override def parse(s: String): Int = {
val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
val seconds = toInstant(s).getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
days.toInt
@ -130,15 +149,12 @@ class Iso8601DateFormatter(
override def format(days: Int): String = {
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
formatter.withZone(zoneId).format(instant)
}
}
class LegacyDateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)
class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
val format = FastDateFormat.getInstance(pattern, locale)
def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
@ -153,8 +169,7 @@ class LegacyDateFormatter(
class LegacyFallbackDateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
locale: Locale) extends LegacyDateFormatter(pattern, locale) {
override def parse(s: String): Int = {
Try(super.parse(s)).orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
@ -169,11 +184,11 @@ class LegacyFallbackDateFormatter(
}
object DateFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = {
def apply(format: String, locale: Locale): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateFormatter(format, timeZone, locale)
new LegacyFallbackDateFormatter(format, locale)
} else {
new Iso8601DateFormatter(format, timeZone, locale)
new Iso8601DateFormatter(format, locale)
}
}
}

View file

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.util
import java.util.{Locale, TimeZone}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils}
class DateTimeFormatterSuite extends SparkFunSuite {
test("parsing dates using time zones") {
val localDate = "2018-12-02"
val expectedDays = Map(
"UTC" -> 17867,
"PST" -> 17867,
"CET" -> 17866,
"Africa/Dakar" -> 17867,
"America/Los_Angeles" -> 17867,
"Antarctica/Vostok" -> 17866,
"Asia/Hong_Kong" -> 17866,
"Europe/Amsterdam" -> 17866)
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US)
val daysSinceEpoch = formatter.parse(localDate)
assert(daysSinceEpoch === expectedDays(timeZone))
}
}
test("parsing timestamps using time zones") {
val localDate = "2018-12-02T10:11:12.001234"
val expectedMicros = Map(
"UTC" -> 1543745472001234L,
"PST" -> 1543774272001234L,
"CET" -> 1543741872001234L,
"Africa/Dakar" -> 1543745472001234L,
"America/Los_Angeles" -> 1543774272001234L,
"Antarctica/Vostok" -> 1543723872001234L,
"Asia/Hong_Kong" -> 1543716672001234L,
"Europe/Amsterdam" -> 1543741872001234L)
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = DateTimeFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
TimeZone.getTimeZone(timeZone),
Locale.US)
val microsSinceEpoch = formatter.parse(localDate)
assert(microsSinceEpoch === expectedMicros(timeZone))
}
}
test("format dates using time zones") {
val daysSinceEpoch = 17867
val expectedDate = Map(
"UTC" -> "2018-12-02",
"PST" -> "2018-12-01",
"CET" -> "2018-12-02",
"Africa/Dakar" -> "2018-12-02",
"America/Los_Angeles" -> "2018-12-01",
"Antarctica/Vostok" -> "2018-12-02",
"Asia/Hong_Kong" -> "2018-12-02",
"Europe/Amsterdam" -> "2018-12-02")
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US)
val date = formatter.format(daysSinceEpoch)
assert(date === expectedDate(timeZone))
}
}
test("format timestamps using time zones") {
val microsSinceEpoch = 1543745472001234L
val expectedTimestamp = Map(
"UTC" -> "2018-12-02T10:11:12.001234",
"PST" -> "2018-12-02T02:11:12.001234",
"CET" -> "2018-12-02T11:11:12.001234",
"Africa/Dakar" -> "2018-12-02T10:11:12.001234",
"America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
"Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
"Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
"Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = DateTimeFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
TimeZone.getTimeZone(timeZone),
Locale.US)
val timestamp = formatter.format(microsSinceEpoch)
assert(timestamp === expectedTimestamp(timeZone))
}
}
}

View file

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.util
import java.util.{Locale, TimeZone}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
class DateTimestampFormatterSuite extends SparkFunSuite with SQLHelper {
test("parsing dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val daysSinceEpoch = formatter.parse("2018-12-02")
assert(daysSinceEpoch === 17867)
}
}
}
test("parsing timestamps using time zones") {
val localDate = "2018-12-02T10:11:12.001234"
val expectedMicros = Map(
"UTC" -> 1543745472001234L,
"PST" -> 1543774272001234L,
"CET" -> 1543741872001234L,
"Africa/Dakar" -> 1543745472001234L,
"America/Los_Angeles" -> 1543774272001234L,
"Antarctica/Vostok" -> 1543723872001234L,
"Asia/Hong_Kong" -> 1543716672001234L,
"Europe/Amsterdam" -> 1543741872001234L)
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
TimeZone.getTimeZone(timeZone),
Locale.US)
val microsSinceEpoch = formatter.parse(localDate)
assert(microsSinceEpoch === expectedMicros(timeZone))
}
}
test("format dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val date = formatter.format(17867)
assert(date === "2018-12-02")
}
}
}
test("format timestamps using time zones") {
val microsSinceEpoch = 1543745472001234L
val expectedTimestamp = Map(
"UTC" -> "2018-12-02T10:11:12.001234",
"PST" -> "2018-12-02T02:11:12.001234",
"CET" -> "2018-12-02T11:11:12.001234",
"Africa/Dakar" -> "2018-12-02T10:11:12.001234",
"America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
"Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
"Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
"Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
TimeZone.getTimeZone(timeZone),
Locale.US)
val timestamp = formatter.format(microsSinceEpoch)
assert(timestamp === expectedTimestamp(timeZone))
}
}
test("roundtrip timestamp -> micros -> timestamp using timezones") {
Seq(
-58710115316212000L,
-18926315945345679L,
-9463427405253013L,
-244000001L,
0L,
99628200102030L,
1543749753123456L,
2177456523456789L,
11858049903010203L).foreach { micros =>
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
val timestamp = formatter.format(micros)
val parsed = formatter.parse(timestamp)
assert(micros === parsed)
}
}
}
test("roundtrip micros -> timestamp -> micros using timezones") {
Seq(
"0109-07-20T18:38:03.788000",
"1370-04-01T10:00:54.654321",
"1670-02-11T14:09:54.746987",
"1969-12-31T23:55:55.999999",
"1970-01-01T00:00:00.000000",
"1973-02-27T02:30:00.102030",
"2018-12-02T11:22:33.123456",
"2039-01-01T01:02:03.456789",
"2345-10-07T22:45:03.010203").foreach { timestamp =>
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
val micros = formatter.parse(timestamp)
val formatted = formatter.format(micros)
assert(timestamp === formatted)
}
}
}
test("roundtrip date -> days -> date") {
Seq(
"0050-01-01",
"0953-02-02",
"1423-03-08",
"1969-12-31",
"1972-08-25",
"1975-09-26",
"2018-12-12",
"2038-01-01",
"5010-11-17").foreach { date =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val days = formatter.parse(date)
val formatted = formatter.format(days)
assert(date === formatted)
}
}
}
}
test("roundtrip days -> date -> days") {
Seq(
-701265,
-371419,
-199722,
-1,
0,
967,
2094,
17877,
24837,
1110657).foreach { days =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val date = formatter.format(days)
val parsed = formatter.parse(date)
assert(days === parsed)
}
}
}
}
}

View file

@ -57,14 +57,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
val factory = new JsonFactory()
def enforceCorrectType(value: Any, dataType: DataType): Any = {
def enforceCorrectType(
value: Any,
dataType: DataType,
options: Map[String, String] = Map.empty): Any = {
val writer = new StringWriter()
Utils.tryWithResource(factory.createGenerator(writer)) { generator =>
generator.writeObject(value)
generator.flush()
}
val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone)
val dummySchema = StructType(Seq.empty)
val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)
@ -96,19 +99,27 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)),
enforceCorrectType(intNumber.toLong, TimestampType))
val strTime = "2014-09-30 12:34:56"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
enforceCorrectType(strTime, TimestampType))
checkTypePromotion(
expected = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
enforceCorrectType(strTime, TimestampType,
Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss")))
val strDate = "2014-10-15"
checkTypePromotion(
DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
enforceCorrectType(ISO8601Time1, TimestampType))
enforceCorrectType(
ISO8601Time1,
TimestampType,
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SX")))
val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
enforceCorrectType(ISO8601Time2, TimestampType))
enforceCorrectType(
ISO8601Time2,
TimestampType,
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX")))
val ISO8601Date = "1970-01-01"
checkTypePromotion(DateTimeUtils.millisToDays(32400000),
@ -1440,103 +1451,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("backward compatibility") {
// This test we make sure our JSON support can read JSON data generated by previous version
// of Spark generated through toJSON method and JSON data source.
// The data is generated by the following program.
// Here are a few notes:
// - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
// in the JSON object.
// - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
// JSON objects generated by those Spark versions (col17).
// - If the type is NullType, we do not write data out.
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
// This test we make sure our JSON support can read JSON data generated by previous version
// of Spark generated through toJSON method and JSON data source.
// The data is generated by the following program.
// Here are a few notes:
// - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
// in the JSON object.
// - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
// JSON objects generated by those Spark versions (col17).
// - If the type is NullType, we do not write data out.
// Create the schema.
val struct =
StructType(
StructField("f1", FloatType, true) ::
StructField("f2", ArrayType(BooleanType), true) :: Nil)
// Create the schema.
val struct =
StructType(
StructField("f1", FloatType, true) ::
StructField("f2", ArrayType(BooleanType), true) :: Nil)
val dataTypes =
Seq(
StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType), MapType(StringType, LongType), struct,
new TestUDT.MyDenseVectorUDT())
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, nullable = true)
}
val schema = StructType(fields)
val dataTypes =
Seq(
StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType), MapType(StringType, LongType), struct,
new TestUDT.MyDenseVectorUDT())
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, nullable = true)
}
val schema = StructType(fields)
val constantValues =
Seq(
"a string in binary".getBytes(StandardCharsets.UTF_8),
null,
true,
1.toByte,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75,
new java.math.BigDecimal(s"1234.23456"),
new java.math.BigDecimal(s"1.23456"),
java.sql.Date.valueOf("2015-01-01"),
java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
Seq(2, 3, 4),
Map("a string" -> 2000L),
Row(4.75.toFloat, Seq(false, true)),
new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
val data =
Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
val constantValues =
Seq(
"a string in binary".getBytes(StandardCharsets.UTF_8),
null,
true,
1.toByte,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75,
new java.math.BigDecimal(s"1234.23456"),
new java.math.BigDecimal(s"1.23456"),
java.sql.Date.valueOf("2015-01-01"),
java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
Seq(2, 3, 4),
Map("a string" -> 2000L),
Row(4.75.toFloat, Seq(false, true)),
new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
val data =
Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
// Data generated by previous versions.
// scalastyle:off
val existingJSONData =
// Data generated by previous versions.
// scalastyle:off
val existingJSONData =
"""{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
// scalastyle:on
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
// scalastyle:on
// Generate data for the current version.
val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
withTempPath { path =>
df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
// Generate data for the current version.
val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
withTempPath { path =>
df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
// df.toJSON will convert internal rows to external rows first and then generate
// JSON objects. While, df.write.format("json") will write internal rows directly.
val allJSON =
// df.toJSON will convert internal rows to external rows first and then generate
// JSON objects. While, df.write.format("json") will write internal rows directly.
val allJSON =
existingJSONData ++
df.toJSON.collect() ++
sparkContext.textFile(path.getCanonicalPath).collect()
Utils.deleteRecursively(path)
sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
Utils.deleteRecursively(path)
sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
// Read data back with the schema specified.
val col0Values =
Seq(
"Spark 1.2.2",
"Spark 1.3.1",
"Spark 1.3.1",
"Spark 1.4.1",
"Spark 1.4.1",
"Spark 1.5.0",
"Spark 1.5.0",
"Spark " + spark.sparkContext.version,
"Spark " + spark.sparkContext.version)
val expectedResult = col0Values.map { v =>
Row.fromSeq(Seq(v) ++ constantValues)
// Read data back with the schema specified.
val col0Values =
Seq(
"Spark 1.2.2",
"Spark 1.3.1",
"Spark 1.3.1",
"Spark 1.4.1",
"Spark 1.4.1",
"Spark 1.5.0",
"Spark 1.5.0",
"Spark " + spark.sparkContext.version,
"Spark " + spark.sparkContext.version)
val expectedResult = col0Values.map { v =>
Row.fromSeq(Seq(v) ++ constantValues)
}
checkAnswer(
spark.read.format("json").schema(schema).load(path.getCanonicalPath),
expectedResult
)
}
checkAnswer(
spark.read.format("json").schema(schema).load(path.getCanonicalPath),
expectedResult
)
}
}

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.sources
import java.io.File
import java.util.TimeZone
import scala.util.Random
@ -125,56 +126,62 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
} else {
Seq(false)
}
for (dataType <- supportedDataTypes) {
for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
val extraMessage = if (isParquetDataSource) {
s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
} else {
""
}
logInfo(s"Testing $dataType data type$extraMessage")
val extraOptions = Map[String, String](
"parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
)
withTempPath { file =>
val path = file.getCanonicalPath
val dataGenerator = RandomDataGenerator.forType(
dataType = dataType,
nullable = true,
new Random(System.nanoTime())
).getOrElse {
fail(s"Failed to create data generator for schema $dataType")
// TODO: Support new parser too, see SPARK-26374.
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
for (dataType <- supportedDataTypes) {
for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
val extraMessage = if (isParquetDataSource) {
s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
} else {
""
}
logInfo(s"Testing $dataType data type$extraMessage")
// Create a DF for the schema with random data. The index field is used to sort the
// DataFrame. This is a workaround for SPARK-10591.
val schema = new StructType()
.add("index", IntegerType, nullable = false)
.add("col", dataType, nullable = true)
val rdd =
spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
val extraOptions = Map[String, String](
"parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
)
df.write
.mode("overwrite")
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.options(extraOptions)
.save(path)
withTempPath { file =>
val path = file.getCanonicalPath
val loadedDF = spark
.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.schema(df.schema)
.options(extraOptions)
.load(path)
.orderBy("index")
val seed = System.nanoTime()
withClue(s"Random data generated with the seed: ${seed}") {
val dataGenerator = RandomDataGenerator.forType(
dataType = dataType,
nullable = true,
new Random(seed)
).getOrElse {
fail(s"Failed to create data generator for schema $dataType")
}
checkAnswer(loadedDF, df)
// Create a DF for the schema with random data. The index field is used to sort the
// DataFrame. This is a workaround for SPARK-10591.
val schema = new StructType()
.add("index", IntegerType, nullable = false)
.add("col", dataType, nullable = true)
val rdd =
spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
df.write
.mode("overwrite")
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.options(extraOptions)
.save(path)
val loadedDF = spark
.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.schema(df.schema)
.options(extraOptions)
.load(path)
.orderBy("index")
checkAnswer(loadedDF, df)
}
}
}
}
}