[SPARK-33270][SQL] Return SQL schema instead of Catalog string from the SchemaOfJson
expression
### What changes were proposed in this pull request? Return schema in SQL format instead of Catalog string from the `SchemaOfJson` expression. ### Why are the changes needed? In some cases, `from_json()` cannot parse schemas returned by `schema_of_json`, for instance, when JSON fields have spaces (gaps). Such fields will be quoted after the changes, and can be parsed by `from_json()`. Here is the example: ```scala val in = Seq("""{"a b": 1}""").toDS() in.select(from_json('value, schema_of_json("""{"a b": 100}""")) as "parsed") ``` raises the exception: ``` == SQL == struct<a b:bigint> ------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:263) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:130) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseTableSchema(ParseDriver.scala:76) at org.apache.spark.sql.types.DataType$.fromDDL(DataType.scala:131) at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:33) at org.apache.spark.sql.catalyst.expressions.JsonToStructs.<init>(jsonExpressions.scala:537) at org.apache.spark.sql.functions$.from_json(functions.scala:4141) ``` ### Does this PR introduce _any_ user-facing change? Yes. For example, `schema_of_json` for the input `{"col":0}`. Before: `struct<col:bigint>` After: `STRUCT<`col`: BIGINT>` ### How was this patch tested? By existing test suites `JsonFunctionsSuite` and `JsonExpressionsSuite`. Closes #30172 from MaxGekk/schema_of_json-sql-schema. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
c592ae6ed8
commit
9d5e48ea95
|
@ -1717,9 +1717,9 @@ test_that("column functions", {
|
||||||
|
|
||||||
df <- as.DataFrame(list(list("col" = "1")))
|
df <- as.DataFrame(list(list("col" = "1")))
|
||||||
c <- collect(select(df, schema_of_json('{"name":"Bob"}')))
|
c <- collect(select(df, schema_of_json('{"name":"Bob"}')))
|
||||||
expect_equal(c[[1]], "struct<name:string>")
|
expect_equal(c[[1]], "STRUCT<`name`: STRING>")
|
||||||
c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}'))))
|
c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}'))))
|
||||||
expect_equal(c[[1]], "struct<name:string>")
|
expect_equal(c[[1]], "STRUCT<`name`: STRING>")
|
||||||
|
|
||||||
# Test to_json() supports arrays of primitive types and arrays
|
# Test to_json() supports arrays of primitive types and arrays
|
||||||
df <- sql("SELECT array(19, 42, 70) as age")
|
df <- sql("SELECT array(19, 42, 70) as age")
|
||||||
|
|
|
@ -50,6 +50,8 @@ license: |
|
||||||
|
|
||||||
- In Spark 3.1, loading and saving of timestamps from/to parquet files fails if the timestamps are before 1900-01-01 00:00:00Z, and loaded (saved) as the INT96 type. In Spark 3.0, the actions don't fail but might lead to shifting of the input timestamps due to rebasing from/to Julian to/from Proleptic Gregorian calendar. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.parquet.int96RebaseModeInRead` or/and `spark.sql.legacy.parquet.int96RebaseModeInWrite` to `LEGACY`.
|
- In Spark 3.1, loading and saving of timestamps from/to parquet files fails if the timestamps are before 1900-01-01 00:00:00Z, and loaded (saved) as the INT96 type. In Spark 3.0, the actions don't fail but might lead to shifting of the input timestamps due to rebasing from/to Julian to/from Proleptic Gregorian calendar. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.parquet.int96RebaseModeInRead` or/and `spark.sql.legacy.parquet.int96RebaseModeInWrite` to `LEGACY`.
|
||||||
|
|
||||||
|
- In Spark 3.1, the `schema_of_json` function returns the schema in the SQL format in which field names are quoted. In Spark 3.0, the function returns a catalog string without field quoting and in lower case.
|
||||||
|
|
||||||
## Upgrading from Spark SQL 3.0 to 3.0.1
|
## Upgrading from Spark SQL 3.0 to 3.0.1
|
||||||
|
|
||||||
- In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference.
|
- In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference.
|
||||||
|
|
|
@ -2937,10 +2937,10 @@ def schema_of_json(json, options={}):
|
||||||
|
|
||||||
>>> df = spark.range(1)
|
>>> df = spark.range(1)
|
||||||
>>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
|
>>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
|
||||||
[Row(json='struct<a:bigint>')]
|
[Row(json='STRUCT<`a`: BIGINT>')]
|
||||||
>>> schema = schema_of_json('{a: 1}', {'allowUnquotedFieldNames':'true'})
|
>>> schema = schema_of_json('{a: 1}', {'allowUnquotedFieldNames':'true'})
|
||||||
>>> df.select(schema.alias("json")).collect()
|
>>> df.select(schema.alias("json")).collect()
|
||||||
[Row(json='struct<a:bigint>')]
|
[Row(json='STRUCT<`a`: BIGINT>')]
|
||||||
"""
|
"""
|
||||||
if isinstance(json, str):
|
if isinstance(json, str):
|
||||||
col = _create_column_from_literal(json)
|
col = _create_column_from_literal(json)
|
||||||
|
|
|
@ -741,9 +741,9 @@ case class StructsToJson(
|
||||||
examples = """
|
examples = """
|
||||||
Examples:
|
Examples:
|
||||||
> SELECT _FUNC_('[{"col":0}]');
|
> SELECT _FUNC_('[{"col":0}]');
|
||||||
array<struct<col:bigint>>
|
ARRAY<STRUCT<`col`: BIGINT>>
|
||||||
> SELECT _FUNC_('[{"col":01}]', map('allowNumericLeadingZeros', 'true'));
|
> SELECT _FUNC_('[{"col":01}]', map('allowNumericLeadingZeros', 'true'));
|
||||||
array<struct<col:bigint>>
|
ARRAY<STRUCT<`col`: BIGINT>>
|
||||||
""",
|
""",
|
||||||
group = "json_funcs",
|
group = "json_funcs",
|
||||||
since = "2.4.0")
|
since = "2.4.0")
|
||||||
|
@ -801,7 +801,7 @@ case class SchemaOfJson(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UTF8String.fromString(dt.catalogString)
|
UTF8String.fromString(dt.sql)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def prettyName: String = "schema_of_json"
|
override def prettyName: String = "schema_of_json"
|
||||||
|
|
|
@ -735,17 +735,17 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
||||||
|
|
||||||
test("SPARK-24709: infer schema of json strings") {
|
test("SPARK-24709: infer schema of json strings") {
|
||||||
checkEvaluation(new SchemaOfJson(Literal.create("""{"col":0}""")),
|
checkEvaluation(new SchemaOfJson(Literal.create("""{"col":0}""")),
|
||||||
"struct<col:bigint>")
|
"STRUCT<`col`: BIGINT>")
|
||||||
checkEvaluation(
|
checkEvaluation(
|
||||||
new SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": "b"}}""")),
|
new SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": "b"}}""")),
|
||||||
"struct<col0:array<string>,col1:struct<col2:string>>")
|
"STRUCT<`col0`: ARRAY<STRING>, `col1`: STRUCT<`col2`: STRING>>")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("infer schema of JSON strings by using options") {
|
test("infer schema of JSON strings by using options") {
|
||||||
checkEvaluation(
|
checkEvaluation(
|
||||||
new SchemaOfJson(Literal.create("""{"col":01}"""),
|
new SchemaOfJson(Literal.create("""{"col":01}"""),
|
||||||
CreateMap(Seq(Literal.create("allowNumericLeadingZeros"), Literal.create("true")))),
|
CreateMap(Seq(Literal.create("allowNumericLeadingZeros"), Literal.create("true")))),
|
||||||
"struct<col:bigint>")
|
"STRUCT<`col`: BIGINT>")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("parse date with locale") {
|
test("parse date with locale") {
|
||||||
|
@ -810,7 +810,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
||||||
}
|
}
|
||||||
|
|
||||||
Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach {
|
Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach {
|
||||||
checkDecimalInfer(_, """struct<d:decimal(7,3)>""")
|
checkDecimalInfer(_, """STRUCT<`d`: DECIMAL(7,3)>""")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -213,7 +213,7 @@ select schema_of_json('{"c1":0, "c2":[1]}')
|
||||||
-- !query schema
|
-- !query schema
|
||||||
struct<schema_of_json({"c1":0, "c2":[1]}):string>
|
struct<schema_of_json({"c1":0, "c2":[1]}):string>
|
||||||
-- !query output
|
-- !query output
|
||||||
struct<c1:bigint,c2:array<bigint>>
|
STRUCT<`c1`: BIGINT, `c2`: ARRAY<BIGINT>>
|
||||||
|
|
||||||
|
|
||||||
-- !query
|
-- !query
|
||||||
|
@ -352,7 +352,7 @@ select schema_of_json('{"c1":1}', map('primitivesAsString', 'true'))
|
||||||
-- !query schema
|
-- !query schema
|
||||||
struct<schema_of_json({"c1":1}):string>
|
struct<schema_of_json({"c1":1}):string>
|
||||||
-- !query output
|
-- !query output
|
||||||
struct<c1:string>
|
STRUCT<`c1`: STRING>
|
||||||
|
|
||||||
|
|
||||||
-- !query
|
-- !query
|
||||||
|
@ -360,7 +360,7 @@ select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'tr
|
||||||
-- !query schema
|
-- !query schema
|
||||||
struct<schema_of_json({"c1":01, "c2":0.1}):string>
|
struct<schema_of_json({"c1":01, "c2":0.1}):string>
|
||||||
-- !query output
|
-- !query output
|
||||||
struct<c1:bigint,c2:decimal(1,1)>
|
STRUCT<`c1`: BIGINT, `c2`: DECIMAL(1,1)>
|
||||||
|
|
||||||
|
|
||||||
-- !query
|
-- !query
|
||||||
|
|
|
@ -411,7 +411,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
|
||||||
test("infers schemas using options") {
|
test("infers schemas using options") {
|
||||||
val df = spark.range(1)
|
val df = spark.range(1)
|
||||||
.select(schema_of_json(lit("{a:1}"), Map("allowUnquotedFieldNames" -> "true").asJava))
|
.select(schema_of_json(lit("{a:1}"), Map("allowUnquotedFieldNames" -> "true").asJava))
|
||||||
checkAnswer(df, Seq(Row("struct<a:bigint>")))
|
checkAnswer(df, Seq(Row("STRUCT<`a`: BIGINT>")))
|
||||||
}
|
}
|
||||||
|
|
||||||
test("from_json - array of primitive types") {
|
test("from_json - array of primitive types") {
|
||||||
|
@ -684,14 +684,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
|
||||||
val input = regexp_replace(lit("""{"item_id": 1, "item_price": 0.1}"""), "item_", "")
|
val input = regexp_replace(lit("""{"item_id": 1, "item_price": 0.1}"""), "item_", "")
|
||||||
checkAnswer(
|
checkAnswer(
|
||||||
spark.range(1).select(schema_of_json(input)),
|
spark.range(1).select(schema_of_json(input)),
|
||||||
Seq(Row("struct<id:bigint,price:double>")))
|
Seq(Row("STRUCT<`id`: BIGINT, `price`: DOUBLE>")))
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-31065: schema_of_json - null and empty strings as strings") {
|
test("SPARK-31065: schema_of_json - null and empty strings as strings") {
|
||||||
Seq("""{"id": null}""", """{"id": ""}""").foreach { input =>
|
Seq("""{"id": null}""", """{"id": ""}""").foreach { input =>
|
||||||
checkAnswer(
|
checkAnswer(
|
||||||
spark.range(1).select(schema_of_json(input)),
|
spark.range(1).select(schema_of_json(input)),
|
||||||
Seq(Row("struct<id:string>")))
|
Seq(Row("STRUCT<`id`: STRING>")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -703,7 +703,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
|
||||||
schema_of_json(
|
schema_of_json(
|
||||||
lit("""{"id": "a", "drop": {"drop": null}}"""),
|
lit("""{"id": "a", "drop": {"drop": null}}"""),
|
||||||
options.asJava)),
|
options.asJava)),
|
||||||
Seq(Row("struct<id:string>")))
|
Seq(Row("STRUCT<`id`: STRING>")))
|
||||||
|
|
||||||
// Array of structs
|
// Array of structs
|
||||||
checkAnswer(
|
checkAnswer(
|
||||||
|
@ -711,7 +711,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
|
||||||
schema_of_json(
|
schema_of_json(
|
||||||
lit("""[{"id": "a", "drop": {"drop": null}}]"""),
|
lit("""[{"id": "a", "drop": {"drop": null}}]"""),
|
||||||
options.asJava)),
|
options.asJava)),
|
||||||
Seq(Row("array<struct<id:string>>")))
|
Seq(Row("ARRAY<STRUCT<`id`: STRING>>")))
|
||||||
|
|
||||||
// Other types are not affected.
|
// Other types are not affected.
|
||||||
checkAnswer(
|
checkAnswer(
|
||||||
|
@ -719,7 +719,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
|
||||||
schema_of_json(
|
schema_of_json(
|
||||||
lit("""null"""),
|
lit("""null"""),
|
||||||
options.asJava)),
|
options.asJava)),
|
||||||
Seq(Row("string")))
|
Seq(Row("STRING")))
|
||||||
}
|
}
|
||||||
|
|
||||||
test("optional datetime parser does not affect json time formatting") {
|
test("optional datetime parser does not affect json time formatting") {
|
||||||
|
@ -747,4 +747,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
|
||||||
val df4 = Seq("""{"c2": [19]}""").toDF("c0")
|
val df4 = Seq("""{"c2": [19]}""").toDF("c0")
|
||||||
checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null))
|
checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-33270: infers schema for JSON field with spaces and pass them to from_json") {
|
||||||
|
val in = Seq("""{"a b": 1}""").toDS()
|
||||||
|
val out = in.select(from_json('value, schema_of_json("""{"a b": 100}""")) as "parsed")
|
||||||
|
val expected = new StructType().add("parsed", new StructType().add("a b", LongType))
|
||||||
|
assert(out.schema == expected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue