[SPARK-26246][SQL] Inferring TimestampType from JSON
## What changes were proposed in this pull request? The `JsonInferSchema` class is extended to support `TimestampType` inferring from string fields in JSON input: - If the `prefersDecimal` option is set to `true`, it tries to infer decimal type from the string field. - If decimal type inference fails or `prefersDecimal` is disabled, `JsonInferSchema` tries to infer `TimestampType`. - If timestamp type inference fails, `StringType` is returned as the inferred type. ## How was this patch tested? Added new test suite - `JsonInferSchemaSuite` to check date and timestamp types inferring from JSON using `JsonInferSchema` directly. A few tests were added `JsonSuite` to check type merging and roundtrip tests. This changes was tested by `JsonSuite`, `JsonExpressionsSuite` and `JsonFunctionsSuite` as well. Closes #23201 from MaxGekk/json-infer-time. Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Co-authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
parent
86100df54b
commit
d72571e51d
|
@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
|
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
|
||||||
import org.apache.spark.sql.catalyst.expressions.ExprUtils
|
import org.apache.spark.sql.catalyst.expressions.ExprUtils
|
||||||
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
|
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
|
||||||
import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode}
|
import org.apache.spark.sql.catalyst.util._
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
|
@ -37,6 +37,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
|
||||||
|
|
||||||
private val decimalParser = ExprUtils.getDecimalParser(options.locale)
|
private val decimalParser = ExprUtils.getDecimalParser(options.locale)
|
||||||
|
|
||||||
|
@transient
|
||||||
|
private lazy val timestampFormatter = TimestampFormatter(
|
||||||
|
options.timestampFormat,
|
||||||
|
options.timeZone,
|
||||||
|
options.locale)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Infer the type of a collection of json records in three stages:
|
* Infer the type of a collection of json records in three stages:
|
||||||
* 1. Infer the type of each record
|
* 1. Infer the type of each record
|
||||||
|
@ -115,13 +121,19 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
|
||||||
// record fields' types have been combined.
|
// record fields' types have been combined.
|
||||||
NullType
|
NullType
|
||||||
|
|
||||||
case VALUE_STRING if options.prefersDecimal =>
|
case VALUE_STRING =>
|
||||||
|
val field = parser.getText
|
||||||
val decimalTry = allCatch opt {
|
val decimalTry = allCatch opt {
|
||||||
val bigDecimal = decimalParser(parser.getText)
|
val bigDecimal = decimalParser(field)
|
||||||
DecimalType(bigDecimal.precision, bigDecimal.scale)
|
DecimalType(bigDecimal.precision, bigDecimal.scale)
|
||||||
}
|
}
|
||||||
decimalTry.getOrElse(StringType)
|
if (options.prefersDecimal && decimalTry.isDefined) {
|
||||||
case VALUE_STRING => StringType
|
decimalTry.get
|
||||||
|
} else if ((allCatch opt timestampFormatter.parse(field)).isDefined) {
|
||||||
|
TimestampType
|
||||||
|
} else {
|
||||||
|
StringType
|
||||||
|
}
|
||||||
|
|
||||||
case START_OBJECT =>
|
case START_OBJECT =>
|
||||||
val builder = Array.newBuilder[StructField]
|
val builder = Array.newBuilder[StructField]
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.sql.catalyst.json
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonFactory
|
||||||
|
|
||||||
|
import org.apache.spark.SparkFunSuite
|
||||||
|
import org.apache.spark.sql.catalyst.plans.SQLHelper
|
||||||
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
|
class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
|
||||||
|
|
||||||
|
def checkType(options: Map[String, String], json: String, dt: DataType): Unit = {
|
||||||
|
val jsonOptions = new JSONOptions(options, "UTC", "")
|
||||||
|
val inferSchema = new JsonInferSchema(jsonOptions)
|
||||||
|
val factory = new JsonFactory()
|
||||||
|
jsonOptions.setJacksonOptions(factory)
|
||||||
|
val parser = CreateJacksonParser.string(factory, json)
|
||||||
|
parser.nextToken()
|
||||||
|
val expectedType = StructType(Seq(StructField("a", dt, true)))
|
||||||
|
|
||||||
|
assert(inferSchema.inferField(parser) === expectedType)
|
||||||
|
}
|
||||||
|
|
||||||
|
def checkTimestampType(pattern: String, json: String): Unit = {
|
||||||
|
checkType(Map("timestampFormat" -> pattern), json, TimestampType)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("inferring timestamp type") {
|
||||||
|
Seq(true, false).foreach { legacyParser =>
|
||||||
|
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
|
||||||
|
checkTimestampType("yyyy", """{"a": "2018"}""")
|
||||||
|
checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
|
||||||
|
checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
|
||||||
|
checkTimestampType(
|
||||||
|
"yyyy-MM-dd'T'HH:mm:ss.SSS",
|
||||||
|
"""{"a": "2018-12-02T21:04:00.123"}""")
|
||||||
|
checkTimestampType(
|
||||||
|
"yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
|
||||||
|
"""{"a": "2018-12-02T21:04:00.123567+01:00"}""")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("prefer decimals over timestamps") {
|
||||||
|
Seq(true, false).foreach { legacyParser =>
|
||||||
|
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
|
||||||
|
checkType(
|
||||||
|
options = Map(
|
||||||
|
"prefersDecimal" -> "true",
|
||||||
|
"timestampFormat" -> "yyyyMMdd.HHmmssSSS"
|
||||||
|
),
|
||||||
|
json = """{"a": "20181202.210400123"}""",
|
||||||
|
dt = DecimalType(17, 9)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("skip decimal type inferring") {
|
||||||
|
Seq(true, false).foreach { legacyParser =>
|
||||||
|
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
|
||||||
|
checkType(
|
||||||
|
options = Map(
|
||||||
|
"prefersDecimal" -> "false",
|
||||||
|
"timestampFormat" -> "yyyyMMdd.HHmmssSSS"
|
||||||
|
),
|
||||||
|
json = """{"a": "20181202.210400123"}""",
|
||||||
|
dt = TimestampType
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("fallback to string type") {
|
||||||
|
Seq(true, false).foreach { legacyParser =>
|
||||||
|
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
|
||||||
|
checkType(
|
||||||
|
options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
|
||||||
|
json = """{"a": "20181202.210400123"}""",
|
||||||
|
dt = StringType
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.test.SharedSQLContext
|
import org.apache.spark.sql.test.SharedSQLContext
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
|
import org.apache.spark.sql.types.StructType.fromDDL
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
class TestFileFilter extends PathFilter {
|
class TestFileFilter extends PathFilter {
|
||||||
|
@ -2589,4 +2590,55 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
||||||
Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") ::
|
Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") ::
|
||||||
Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
|
Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("inferring timestamp type") {
|
||||||
|
Seq(true, false).foreach { legacyParser =>
|
||||||
|
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
|
||||||
|
def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema
|
||||||
|
|
||||||
|
assert(schemaOf(
|
||||||
|
"""{"a":"2018-12-17T10:11:12.123-01:00"}""",
|
||||||
|
"""{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp"))
|
||||||
|
|
||||||
|
assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""")
|
||||||
|
=== fromDDL("a string"))
|
||||||
|
assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""")
|
||||||
|
=== fromDDL("a string"))
|
||||||
|
|
||||||
|
assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""")
|
||||||
|
=== fromDDL("a timestamp"))
|
||||||
|
assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""")
|
||||||
|
=== fromDDL("a timestamp"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("roundtrip for timestamp type inferring") {
|
||||||
|
Seq(true, false).foreach { legacyParser =>
|
||||||
|
withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
|
||||||
|
val customSchema = new StructType().add("date", TimestampType)
|
||||||
|
withTempDir { dir =>
|
||||||
|
val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
|
||||||
|
val timestampsWithFormat = spark.read
|
||||||
|
.option("timestampFormat", "dd/MM/yyyy HH:mm")
|
||||||
|
.json(datesRecords)
|
||||||
|
assert(timestampsWithFormat.schema === customSchema)
|
||||||
|
|
||||||
|
timestampsWithFormat.write
|
||||||
|
.format("json")
|
||||||
|
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
|
||||||
|
.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
|
||||||
|
.save(timestampsWithFormatPath)
|
||||||
|
|
||||||
|
val readBack = spark.read
|
||||||
|
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
|
||||||
|
.option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
|
||||||
|
.json(timestampsWithFormatPath)
|
||||||
|
|
||||||
|
assert(readBack.schema === customSchema)
|
||||||
|
checkAnswer(readBack, timestampsWithFormat)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue