[SPARK-19967][SQL] Add from_json in FunctionRegistry
## What changes were proposed in this pull request? This pr added entries in `FunctionRegistry` and supported `from_json` in SQL. ## How was this patch tested? Added tests in `JsonFunctionsSuite` and `SQLQueryTestSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17320 from maropu/SPARK-19967.
This commit is contained in:
parent
bfdeea5c68
commit
7de66bae58
|
@ -426,6 +426,7 @@ object FunctionRegistry {
|
|||
|
||||
// json
|
||||
expression[StructToJson]("to_json"),
|
||||
expression[JsonToStruct]("from_json"),
|
||||
|
||||
// Cast aliases (SPARK-16730)
|
||||
castAlias("boolean", BooleanType),
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.fasterxml.jackson.core._
|
|||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
|
||||
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
|
||||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.json._
|
||||
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes}
|
||||
|
@ -483,6 +484,17 @@ case class JsonTuple(children: Seq[Expression])
|
|||
/**
|
||||
* Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema.
|
||||
*/
|
||||
// scalastyle:off line.size.limit
|
||||
@ExpressionDescription(
|
||||
usage = "_FUNC_(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.",
|
||||
extended = """
|
||||
Examples:
|
||||
> SELECT _FUNC_('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
|
||||
{"a":1, "b":0.8}
|
||||
> SELECT _FUNC_('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
|
||||
{"time":"2015-08-26 00:00:00.0"}
|
||||
""")
|
||||
// scalastyle:on line.size.limit
|
||||
case class JsonToStruct(
|
||||
schema: DataType,
|
||||
options: Map[String, String],
|
||||
|
@ -494,6 +506,21 @@ case class JsonToStruct(
|
|||
def this(schema: DataType, options: Map[String, String], child: Expression) =
|
||||
this(schema, options, child, None)
|
||||
|
||||
// Used in `FunctionRegistry`
|
||||
def this(child: Expression, schema: Expression) =
|
||||
this(
|
||||
schema = JsonExprUtils.validateSchemaLiteral(schema),
|
||||
options = Map.empty[String, String],
|
||||
child = child,
|
||||
timeZoneId = None)
|
||||
|
||||
def this(child: Expression, schema: Expression, options: Expression) =
|
||||
this(
|
||||
schema = JsonExprUtils.validateSchemaLiteral(schema),
|
||||
options = JsonExprUtils.convertToMapData(options),
|
||||
child = child,
|
||||
timeZoneId = None)
|
||||
|
||||
override def checkInputDataTypes(): TypeCheckResult = schema match {
|
||||
case _: StructType | ArrayType(_: StructType, _) =>
|
||||
super.checkInputDataTypes()
|
||||
|
@ -589,7 +616,7 @@ case class StructToJson(
|
|||
def this(child: Expression) = this(Map.empty, child, None)
|
||||
def this(child: Expression, options: Expression) =
|
||||
this(
|
||||
options = StructToJson.convertToMapData(options),
|
||||
options = JsonExprUtils.convertToMapData(options),
|
||||
child = child,
|
||||
timeZoneId = None)
|
||||
|
||||
|
@ -634,7 +661,12 @@ case class StructToJson(
|
|||
override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
|
||||
}
|
||||
|
||||
object StructToJson {
|
||||
object JsonExprUtils {
|
||||
|
||||
def validateSchemaLiteral(exp: Expression): StructType = exp match {
|
||||
case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString)
|
||||
case e => throw new AnalysisException(s"Expected a string literal instead of $e")
|
||||
}
|
||||
|
||||
def convertToMapData(exp: Expression): Map[String, String] = exp match {
|
||||
case m: CreateMap
|
||||
|
|
|
@ -5,4 +5,17 @@ select to_json(named_struct('a', 1, 'b', 2));
|
|||
select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
|
||||
-- Check if errors handled
|
||||
select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'));
|
||||
select to_json(named_struct('a', 1, 'b', 2), map('mode', 1));
|
||||
select to_json();
|
||||
|
||||
-- from_json
|
||||
describe function from_json;
|
||||
describe function extended from_json;
|
||||
select from_json('{"a":1}', 'a INT');
|
||||
select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
|
||||
-- Check if errors handled
|
||||
select from_json('{"a":1}', 1);
|
||||
select from_json('{"a":1}', 'a InvalidType');
|
||||
select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE'));
|
||||
select from_json('{"a":1}', 'a INT', map('mode', 1));
|
||||
select from_json();
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
-- Automatically generated by SQLQueryTestSuite
|
||||
-- Number of queries: 6
|
||||
-- Number of queries: 16
|
||||
|
||||
|
||||
-- !query 0
|
||||
|
@ -55,9 +55,112 @@ Must use a map() function for options;; line 1 pos 7
|
|||
|
||||
|
||||
-- !query 5
|
||||
select to_json()
|
||||
select to_json(named_struct('a', 1, 'b', 2), map('mode', 1))
|
||||
-- !query 5 schema
|
||||
struct<>
|
||||
-- !query 5 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 6
|
||||
select to_json()
|
||||
-- !query 6 schema
|
||||
struct<>
|
||||
-- !query 6 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Invalid number of arguments for function to_json; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 7
|
||||
describe function from_json
|
||||
-- !query 7 schema
|
||||
struct<function_desc:string>
|
||||
-- !query 7 output
|
||||
Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
|
||||
Function: from_json
|
||||
Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
|
||||
|
||||
|
||||
-- !query 8
|
||||
describe function extended from_json
|
||||
-- !query 8 schema
|
||||
struct<function_desc:string>
|
||||
-- !query 8 output
|
||||
Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
|
||||
Extended Usage:
|
||||
Examples:
|
||||
> SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
|
||||
{"a":1, "b":0.8}
|
||||
> SELECT from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
|
||||
{"time":"2015-08-26 00:00:00.0"}
|
||||
|
||||
Function: from_json
|
||||
Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
|
||||
|
||||
|
||||
-- !query 9
|
||||
select from_json('{"a":1}', 'a INT')
|
||||
-- !query 9 schema
|
||||
struct<jsontostruct({"a":1}):struct<a:int>>
|
||||
-- !query 9 output
|
||||
{"a":1}
|
||||
|
||||
|
||||
-- !query 10
|
||||
select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'))
|
||||
-- !query 10 schema
|
||||
struct<jsontostruct({"time":"26/08/2015"}):struct<time:timestamp>>
|
||||
-- !query 10 output
|
||||
{"time":2015-08-26 00:00:00.0}
|
||||
|
||||
|
||||
-- !query 11
|
||||
select from_json('{"a":1}', 1)
|
||||
-- !query 11 schema
|
||||
struct<>
|
||||
-- !query 11 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Expected a string literal instead of 1;; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 12
|
||||
select from_json('{"a":1}', 'a InvalidType')
|
||||
-- !query 12 schema
|
||||
struct<>
|
||||
-- !query 12 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
|
||||
DataType invalidtype() is not supported.(line 1, pos 2)
|
||||
|
||||
== SQL ==
|
||||
a InvalidType
|
||||
--^^^
|
||||
; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 13
|
||||
select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE'))
|
||||
-- !query 13 schema
|
||||
struct<>
|
||||
-- !query 13 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Must use a map() function for options;; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 14
|
||||
select from_json('{"a":1}', 'a INT', map('mode', 1))
|
||||
-- !query 14 schema
|
||||
struct<>
|
||||
-- !query 14 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 15
|
||||
select from_json()
|
||||
-- !query 15 schema
|
||||
struct<>
|
||||
-- !query 15 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Invalid number of arguments for function from_json; line 1 pos 7
|
||||
|
|
|
@ -220,4 +220,40 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
|
|||
assert(errMsg2.getMessage.startsWith(
|
||||
"A type of keys and values in map() must be string, but got"))
|
||||
}
|
||||
|
||||
test("SPARK-19967 Support from_json in SQL") {
|
||||
val df1 = Seq("""{"a": 1}""").toDS()
|
||||
checkAnswer(
|
||||
df1.selectExpr("from_json(value, 'a INT')"),
|
||||
Row(Row(1)) :: Nil)
|
||||
|
||||
val df2 = Seq("""{"c0": "a", "c1": 1, "c2": {"c20": 3.8, "c21": 8}}""").toDS()
|
||||
checkAnswer(
|
||||
df2.selectExpr("from_json(value, 'c0 STRING, c1 INT, c2 STRUCT<c20: DOUBLE, c21: INT>')"),
|
||||
Row(Row("a", 1, Row(3.8, 8))) :: Nil)
|
||||
|
||||
val df3 = Seq("""{"time": "26/08/2015 18:00"}""").toDS()
|
||||
checkAnswer(
|
||||
df3.selectExpr(
|
||||
"from_json(value, 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy HH:mm'))"),
|
||||
Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"))))
|
||||
|
||||
val errMsg1 = intercept[AnalysisException] {
|
||||
df3.selectExpr("from_json(value, 1)")
|
||||
}
|
||||
assert(errMsg1.getMessage.startsWith("Expected a string literal instead of"))
|
||||
val errMsg2 = intercept[AnalysisException] {
|
||||
df3.selectExpr("""from_json(value, 'time InvalidType')""")
|
||||
}
|
||||
assert(errMsg2.getMessage.contains("DataType invalidtype() is not supported"))
|
||||
val errMsg3 = intercept[AnalysisException] {
|
||||
df3.selectExpr("from_json(value, 'time Timestamp', named_struct('a', 1))")
|
||||
}
|
||||
assert(errMsg3.getMessage.startsWith("Must use a map() function for options"))
|
||||
val errMsg4 = intercept[AnalysisException] {
|
||||
df3.selectExpr("from_json(value, 'time Timestamp', map('a', 1))")
|
||||
}
|
||||
assert(errMsg4.getMessage.startsWith(
|
||||
"A type of keys and values in map() must be string, but got"))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue