[SPARK-19849][SQL] Support ArrayType in to_json to produce JSON array
## What changes were proposed in this pull request? This PR proposes to support an array of struct type in `to_json` as below: ```scala import org.apache.spark.sql.functions._ val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` +----------+ | json| +----------+ |[{"_1":1}]| +----------+ ``` Currently, it throws an exception as below (a newline manually inserted for readability): ``` org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' due to data type mismatch: structtojson requires that the expression is a struct expression.;; ``` This allows the roundtrip with `from_json` as below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", schema).as("array")) df.show() // Read back. df.select(to_json($"array").as("json")).show() ``` ``` +----------+ | array| +----------+ |[[1], [2]]| +----------+ +-----------------+ | json| +-----------------+ |[{"a":1},{"a":2}]| +-----------------+ ``` Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and `JsonToStruct` to `JsonToStructs`. ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, doctest for Python and test in `test_sparkSQL.R` for R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17192 from HyukjinKwon/SPARK-19849.
This commit is contained in:
parent
990af630d0
commit
0cdcf91145
|
@ -1795,10 +1795,10 @@ setMethod("to_date",
|
|||
|
||||
#' to_json
|
||||
#'
|
||||
#' Converts a column containing a \code{structType} into a Column of JSON string.
|
||||
#' Resolving the Column can fail if an unsupported type is encountered.
|
||||
#' Converts a column containing a \code{structType} or array of \code{structType} into a Column
|
||||
#' of JSON string. Resolving the Column can fail if an unsupported type is encountered.
|
||||
#'
|
||||
#' @param x Column containing the struct
|
||||
#' @param x Column containing the struct or array of the structs
|
||||
#' @param ... additional named properties to control how it is converted, accepts the same options
|
||||
#' as the JSON data source.
|
||||
#'
|
||||
|
@ -1809,8 +1809,13 @@ setMethod("to_date",
|
|||
#' @export
|
||||
#' @examples
|
||||
#' \dontrun{
|
||||
#' to_json(df$t, dateFormat = 'dd/MM/yyyy')
|
||||
#' select(df, to_json(df$t))
|
||||
#' # Converts a struct into a JSON object
|
||||
#' df <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
|
||||
#' select(df, to_json(df$d, dateFormat = 'dd/MM/yyyy'))
|
||||
#'
|
||||
#' # Converts an array of structs into a JSON array
|
||||
#' df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
|
||||
#' select(df, to_json(df$people))
|
||||
#'}
|
||||
#' @note to_json since 2.2.0
|
||||
setMethod("to_json", signature(x = "Column"),
|
||||
|
@ -2433,7 +2438,8 @@ setMethod("date_format", signature(y = "Column", x = "character"),
|
|||
#' from_json
|
||||
#'
|
||||
#' Parses a column containing a JSON string into a Column of \code{structType} with the specified
|
||||
#' \code{schema}. If the string is unparseable, the Column will contains the value NA.
|
||||
#' \code{schema} or array of \code{structType} if \code{asJsonArray} is set to \code{TRUE}.
|
||||
#' If the string is unparseable, the Column will contains the value NA.
|
||||
#'
|
||||
#' @param x Column containing the JSON string.
|
||||
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
|
||||
|
|
|
@ -1340,6 +1340,10 @@ test_that("column functions", {
|
|||
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
|
||||
|
||||
# Test to_json(), from_json()
|
||||
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
|
||||
j <- collect(select(df, alias(to_json(df$people), "json")))
|
||||
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
|
||||
|
||||
df <- read.json(mapTypeJsonPath)
|
||||
j <- collect(select(df, alias(to_json(df$info), "json")))
|
||||
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
|
||||
|
|
|
@ -1774,10 +1774,11 @@ def json_tuple(col, *fields):
|
|||
def from_json(col, schema, options={}):
|
||||
"""
|
||||
Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]]
|
||||
with the specified schema. Returns `null`, in the case of an unparseable string.
|
||||
of [[StructType]]s with the specified schema. Returns `null`, in the case of an unparseable
|
||||
string.
|
||||
|
||||
:param col: string column in json format
|
||||
:param schema: a StructType or ArrayType to use when parsing the json column
|
||||
:param schema: a StructType or ArrayType of StructType to use when parsing the json column
|
||||
:param options: options to control parsing. accepts the same options as the json datasource
|
||||
|
||||
>>> from pyspark.sql.types import *
|
||||
|
@ -1802,10 +1803,10 @@ def from_json(col, schema, options={}):
|
|||
@since(2.1)
|
||||
def to_json(col, options={}):
|
||||
"""
|
||||
Converts a column containing a [[StructType]] into a JSON string. Throws an exception,
|
||||
in the case of an unsupported type.
|
||||
Converts a column containing a [[StructType]] or [[ArrayType]] of [[StructType]]s into a
|
||||
JSON string. Throws an exception, in the case of an unsupported type.
|
||||
|
||||
:param col: name of column containing the struct
|
||||
:param col: name of column containing the struct or array of the structs
|
||||
:param options: options to control converting. accepts the same options as the json datasource
|
||||
|
||||
>>> from pyspark.sql import Row
|
||||
|
@ -1814,6 +1815,10 @@ def to_json(col, options={}):
|
|||
>>> df = spark.createDataFrame(data, ("key", "value"))
|
||||
>>> df.select(to_json(df.value).alias("json")).collect()
|
||||
[Row(json=u'{"age":2,"name":"Alice"}')]
|
||||
>>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])]
|
||||
>>> df = spark.createDataFrame(data, ("key", "value"))
|
||||
>>> df.select(to_json(df.value).alias("json")).collect()
|
||||
[Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')]
|
||||
"""
|
||||
|
||||
sc = SparkContext._active_spark_context
|
||||
|
|
|
@ -425,8 +425,8 @@ object FunctionRegistry {
|
|||
expression[BitwiseXor]("^"),
|
||||
|
||||
// json
|
||||
expression[StructToJson]("to_json"),
|
||||
expression[JsonToStruct]("from_json"),
|
||||
expression[StructsToJson]("to_json"),
|
||||
expression[JsonToStructs]("from_json"),
|
||||
|
||||
// Cast aliases (SPARK-16730)
|
||||
castAlias("boolean", BooleanType),
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
|
|||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.json._
|
||||
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes}
|
||||
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, ParseModes}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.unsafe.types.UTF8String
|
||||
import org.apache.spark.util.Utils
|
||||
|
@ -482,7 +482,8 @@ case class JsonTuple(children: Seq[Expression])
|
|||
}
|
||||
|
||||
/**
|
||||
* Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema.
|
||||
* Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s
|
||||
* with the specified schema.
|
||||
*/
|
||||
// scalastyle:off line.size.limit
|
||||
@ExpressionDescription(
|
||||
|
@ -495,7 +496,7 @@ case class JsonTuple(children: Seq[Expression])
|
|||
{"time":"2015-08-26 00:00:00.0"}
|
||||
""")
|
||||
// scalastyle:on line.size.limit
|
||||
case class JsonToStruct(
|
||||
case class JsonToStructs(
|
||||
schema: DataType,
|
||||
options: Map[String, String],
|
||||
child: Expression,
|
||||
|
@ -590,7 +591,7 @@ case class JsonToStruct(
|
|||
}
|
||||
|
||||
/**
|
||||
* Converts a [[StructType]] to a json output string.
|
||||
* Converts a [[StructType]] or [[ArrayType]] of [[StructType]]s to a json output string.
|
||||
*/
|
||||
// scalastyle:off line.size.limit
|
||||
@ExpressionDescription(
|
||||
|
@ -601,9 +602,11 @@ case class JsonToStruct(
|
|||
{"a":1,"b":2}
|
||||
> SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
|
||||
{"time":"26/08/2015"}
|
||||
> SELECT _FUNC_(array(named_struct('a', 1, 'b', 2));
|
||||
[{"a":1,"b":2}]
|
||||
""")
|
||||
// scalastyle:on line.size.limit
|
||||
case class StructToJson(
|
||||
case class StructsToJson(
|
||||
options: Map[String, String],
|
||||
child: Expression,
|
||||
timeZoneId: Option[String] = None)
|
||||
|
@ -624,41 +627,58 @@ case class StructToJson(
|
|||
lazy val writer = new CharArrayWriter()
|
||||
|
||||
@transient
|
||||
lazy val gen =
|
||||
new JacksonGenerator(
|
||||
child.dataType.asInstanceOf[StructType],
|
||||
writer,
|
||||
new JSONOptions(options, timeZoneId.get))
|
||||
lazy val gen = new JacksonGenerator(
|
||||
rowSchema, writer, new JSONOptions(options, timeZoneId.get))
|
||||
|
||||
@transient
|
||||
lazy val rowSchema = child.dataType match {
|
||||
case st: StructType => st
|
||||
case ArrayType(st: StructType, _) => st
|
||||
}
|
||||
|
||||
// This converts rows to the JSON output according to the given schema.
|
||||
@transient
|
||||
lazy val converter: Any => UTF8String = {
|
||||
def getAndReset(): UTF8String = {
|
||||
gen.flush()
|
||||
val json = writer.toString
|
||||
writer.reset()
|
||||
UTF8String.fromString(json)
|
||||
}
|
||||
|
||||
child.dataType match {
|
||||
case _: StructType =>
|
||||
(row: Any) =>
|
||||
gen.write(row.asInstanceOf[InternalRow])
|
||||
getAndReset()
|
||||
case ArrayType(_: StructType, _) =>
|
||||
(arr: Any) =>
|
||||
gen.write(arr.asInstanceOf[ArrayData])
|
||||
getAndReset()
|
||||
}
|
||||
}
|
||||
|
||||
override def dataType: DataType = StringType
|
||||
|
||||
override def checkInputDataTypes(): TypeCheckResult = {
|
||||
if (StructType.acceptsType(child.dataType)) {
|
||||
override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
|
||||
case _: StructType | ArrayType(_: StructType, _) =>
|
||||
try {
|
||||
JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType])
|
||||
JacksonUtils.verifySchema(rowSchema)
|
||||
TypeCheckResult.TypeCheckSuccess
|
||||
} catch {
|
||||
case e: UnsupportedOperationException =>
|
||||
TypeCheckResult.TypeCheckFailure(e.getMessage)
|
||||
}
|
||||
} else {
|
||||
TypeCheckResult.TypeCheckFailure(
|
||||
s"$prettyName requires that the expression is a struct expression.")
|
||||
}
|
||||
case _ => TypeCheckResult.TypeCheckFailure(
|
||||
s"Input type ${child.dataType.simpleString} must be a struct or array of structs.")
|
||||
}
|
||||
|
||||
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
|
||||
copy(timeZoneId = Option(timeZoneId))
|
||||
|
||||
override def nullSafeEval(row: Any): Any = {
|
||||
gen.write(row.asInstanceOf[InternalRow])
|
||||
gen.flush()
|
||||
val json = writer.toString
|
||||
writer.reset()
|
||||
UTF8String.fromString(json)
|
||||
}
|
||||
override def nullSafeEval(value: Any): Any = converter(value)
|
||||
|
||||
override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
|
||||
override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, StructType) :: Nil
|
||||
}
|
||||
|
||||
object JsonExprUtils {
|
||||
|
|
|
@ -37,6 +37,10 @@ private[sql] class JacksonGenerator(
|
|||
|
||||
// `ValueWriter`s for all fields of the schema
|
||||
private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
|
||||
// `ValueWriter` for array data storing rows of the schema.
|
||||
private val arrElementWriter: ValueWriter = (arr: SpecializedGetters, i: Int) => {
|
||||
writeObject(writeFields(arr.getStruct(i, schema.length), schema, rootFieldWriters))
|
||||
}
|
||||
|
||||
private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
|
||||
|
||||
|
@ -185,17 +189,18 @@ private[sql] class JacksonGenerator(
|
|||
def flush(): Unit = gen.flush()
|
||||
|
||||
/**
|
||||
* Transforms a single InternalRow to JSON using Jackson
|
||||
* Transforms a single `InternalRow` to JSON object using Jackson
|
||||
*
|
||||
* @param row The row to convert
|
||||
*/
|
||||
def write(row: InternalRow): Unit = {
|
||||
writeObject {
|
||||
writeFields(row, schema, rootFieldWriters)
|
||||
}
|
||||
}
|
||||
def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters))
|
||||
|
||||
def writeLineEnding(): Unit = {
|
||||
gen.writeRaw('\n')
|
||||
}
|
||||
/**
|
||||
* Transforms multiple `InternalRow`s to JSON array using Jackson
|
||||
*
|
||||
* @param array The array of rows to convert
|
||||
*/
|
||||
def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter))
|
||||
|
||||
def writeLineEnding(): Unit = gen.writeRaw('\n')
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.Calendar
|
|||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
|
||||
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.unsafe.types.UTF8String
|
||||
|
||||
|
@ -352,7 +352,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
val jsonData = """{"a": 1}"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
|
||||
InternalRow(1)
|
||||
)
|
||||
}
|
||||
|
@ -361,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
val jsonData = """{"a" 1}"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
|
||||
null
|
||||
)
|
||||
|
||||
// Other modes should still return `null`.
|
||||
checkEvaluation(
|
||||
JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
|
||||
JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
|
||||
null
|
||||
)
|
||||
}
|
||||
|
@ -376,62 +376,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
val input = """[{"a": 1}, {"a": 2}]"""
|
||||
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val output = InternalRow(1) :: InternalRow(2) :: Nil
|
||||
checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
}
|
||||
|
||||
test("from_json - input=object, schema=array, output=array of single row") {
|
||||
val input = """{"a": 1}"""
|
||||
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val output = InternalRow(1) :: Nil
|
||||
checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
}
|
||||
|
||||
test("from_json - input=empty array, schema=array, output=empty array") {
|
||||
val input = "[ ]"
|
||||
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val output = Nil
|
||||
checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
}
|
||||
|
||||
test("from_json - input=empty object, schema=array, output=array of single row with null") {
|
||||
val input = "{ }"
|
||||
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val output = InternalRow(null) :: Nil
|
||||
checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
}
|
||||
|
||||
test("from_json - input=array of single object, schema=struct, output=single row") {
|
||||
val input = """[{"a": 1}]"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val output = InternalRow(1)
|
||||
checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
}
|
||||
|
||||
test("from_json - input=array, schema=struct, output=null") {
|
||||
val input = """[{"a": 1}, {"a": 2}]"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val output = null
|
||||
checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
}
|
||||
|
||||
test("from_json - input=empty array, schema=struct, output=null") {
|
||||
val input = """[]"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val output = null
|
||||
checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
}
|
||||
|
||||
test("from_json - input=empty object, schema=struct, output=single row with null") {
|
||||
val input = """{ }"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val output = InternalRow(null)
|
||||
checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
}
|
||||
|
||||
test("from_json null input column") {
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId),
|
||||
null
|
||||
)
|
||||
}
|
||||
|
@ -444,14 +444,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
c.set(2016, 0, 1, 0, 0, 0)
|
||||
c.set(Calendar.MILLISECOND, 123)
|
||||
checkEvaluation(
|
||||
JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
|
||||
InternalRow(c.getTimeInMillis * 1000L)
|
||||
)
|
||||
// The result doesn't change because the json string includes timezone string ("Z" here),
|
||||
// which means the string represents the timestamp string in the timezone regardless of
|
||||
// the timeZoneId parameter.
|
||||
checkEvaluation(
|
||||
JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
|
||||
InternalRow(c.getTimeInMillis * 1000L)
|
||||
)
|
||||
|
||||
|
@ -461,7 +461,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
c.set(2016, 0, 1, 0, 0, 0)
|
||||
c.set(Calendar.MILLISECOND, 0)
|
||||
checkEvaluation(
|
||||
JsonToStruct(
|
||||
JsonToStructs(
|
||||
schema,
|
||||
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
|
||||
Literal(jsonData2),
|
||||
|
@ -469,7 +469,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
InternalRow(c.getTimeInMillis * 1000L)
|
||||
)
|
||||
checkEvaluation(
|
||||
JsonToStruct(
|
||||
JsonToStructs(
|
||||
schema,
|
||||
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
|
||||
DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
|
||||
|
@ -483,25 +483,52 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
test("SPARK-19543: from_json empty input column") {
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
|
||||
null
|
||||
)
|
||||
}
|
||||
|
||||
test("to_json") {
|
||||
test("to_json - struct") {
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val struct = Literal.create(create_row(1), schema)
|
||||
checkEvaluation(
|
||||
StructToJson(Map.empty, struct, gmtId),
|
||||
StructsToJson(Map.empty, struct, gmtId),
|
||||
"""{"a":1}"""
|
||||
)
|
||||
}
|
||||
|
||||
test("to_json - array") {
|
||||
val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)
|
||||
val output = """[{"a":1},{"a":2}]"""
|
||||
checkEvaluation(
|
||||
StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
|
||||
output)
|
||||
}
|
||||
|
||||
test("to_json - array with single empty row") {
|
||||
val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val input = new GenericArrayData(InternalRow(null) :: Nil)
|
||||
val output = """[{}]"""
|
||||
checkEvaluation(
|
||||
StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
|
||||
output)
|
||||
}
|
||||
|
||||
test("to_json - empty array") {
|
||||
val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val input = new GenericArrayData(Nil)
|
||||
val output = """[]"""
|
||||
checkEvaluation(
|
||||
StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
|
||||
output)
|
||||
}
|
||||
|
||||
test("to_json null input column") {
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val struct = Literal.create(null, schema)
|
||||
checkEvaluation(
|
||||
StructToJson(Map.empty, struct, gmtId),
|
||||
StructsToJson(Map.empty, struct, gmtId),
|
||||
null
|
||||
)
|
||||
}
|
||||
|
@ -514,16 +541,16 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
|
||||
|
||||
checkEvaluation(
|
||||
StructToJson(Map.empty, struct, gmtId),
|
||||
StructsToJson(Map.empty, struct, gmtId),
|
||||
"""{"t":"2016-01-01T00:00:00.000Z"}"""
|
||||
)
|
||||
checkEvaluation(
|
||||
StructToJson(Map.empty, struct, Option("PST")),
|
||||
StructsToJson(Map.empty, struct, Option("PST")),
|
||||
"""{"t":"2015-12-31T16:00:00.000-08:00"}"""
|
||||
)
|
||||
|
||||
checkEvaluation(
|
||||
StructToJson(
|
||||
StructsToJson(
|
||||
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
|
||||
DateTimeUtils.TIMEZONE_OPTION -> gmtId.get),
|
||||
struct,
|
||||
|
@ -531,7 +558,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
"""{"t":"2016-01-01T00:00:00"}"""
|
||||
)
|
||||
checkEvaluation(
|
||||
StructToJson(
|
||||
StructsToJson(
|
||||
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
|
||||
DateTimeUtils.TIMEZONE_OPTION -> "PST"),
|
||||
struct,
|
||||
|
|
|
@ -2978,7 +2978,8 @@ object functions {
|
|||
|
||||
/**
|
||||
* (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
|
||||
* with the specified schema. Returns `null`, in the case of an unparseable string.
|
||||
* of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
|
||||
* string.
|
||||
*
|
||||
* @param e a string column containing JSON data.
|
||||
* @param schema the schema to use when parsing the json string
|
||||
|
@ -2989,7 +2990,7 @@ object functions {
|
|||
* @since 2.2.0
|
||||
*/
|
||||
def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
|
||||
JsonToStruct(schema, options, e.expr)
|
||||
JsonToStructs(schema, options, e.expr)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3009,7 +3010,8 @@ object functions {
|
|||
|
||||
/**
|
||||
* (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
|
||||
* with the specified schema. Returns `null`, in the case of an unparseable string.
|
||||
* of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
|
||||
* string.
|
||||
*
|
||||
* @param e a string column containing JSON data.
|
||||
* @param schema the schema to use when parsing the json string
|
||||
|
@ -3036,7 +3038,7 @@ object functions {
|
|||
from_json(e, schema, Map.empty[String, String])
|
||||
|
||||
/**
|
||||
* Parses a column containing a JSON string into a `StructType` or `ArrayType`
|
||||
* Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s
|
||||
* with the specified schema. Returns `null`, in the case of an unparseable string.
|
||||
*
|
||||
* @param e a string column containing JSON data.
|
||||
|
@ -3049,7 +3051,7 @@ object functions {
|
|||
from_json(e, schema, Map.empty[String, String])
|
||||
|
||||
/**
|
||||
* Parses a column containing a JSON string into a `StructType` or `ArrayType`
|
||||
* Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s
|
||||
* with the specified schema. Returns `null`, in the case of an unparseable string.
|
||||
*
|
||||
* @param e a string column containing JSON data.
|
||||
|
@ -3062,10 +3064,11 @@ object functions {
|
|||
from_json(e, DataType.fromJson(schema), options)
|
||||
|
||||
/**
|
||||
* (Scala-specific) Converts a column containing a `StructType` into a JSON string with the
|
||||
* specified schema. Throws an exception, in the case of an unsupported type.
|
||||
* (Scala-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s
|
||||
* into a JSON string with the specified schema. Throws an exception, in the case of an
|
||||
* unsupported type.
|
||||
*
|
||||
* @param e a struct column.
|
||||
* @param e a column containing a struct or array of the structs.
|
||||
* @param options options to control how the struct column is converted into a json string.
|
||||
* accepts the same options and the json data source.
|
||||
*
|
||||
|
@ -3073,14 +3076,15 @@ object functions {
|
|||
* @since 2.1.0
|
||||
*/
|
||||
def to_json(e: Column, options: Map[String, String]): Column = withExpr {
|
||||
StructToJson(options, e.expr)
|
||||
StructsToJson(options, e.expr)
|
||||
}
|
||||
|
||||
/**
|
||||
* (Java-specific) Converts a column containing a `StructType` into a JSON string with the
|
||||
* specified schema. Throws an exception, in the case of an unsupported type.
|
||||
* (Java-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s
|
||||
* into a JSON string with the specified schema. Throws an exception, in the case of an
|
||||
* unsupported type.
|
||||
*
|
||||
* @param e a struct column.
|
||||
* @param e a column containing a struct or array of the structs.
|
||||
* @param options options to control how the struct column is converted into a json string.
|
||||
* accepts the same options and the json data source.
|
||||
*
|
||||
|
@ -3091,10 +3095,10 @@ object functions {
|
|||
to_json(e, options.asScala.toMap)
|
||||
|
||||
/**
|
||||
* Converts a column containing a `StructType` into a JSON string with the
|
||||
* specified schema. Throws an exception, in the case of an unsupported type.
|
||||
* Converts a column containing a `StructType` or `ArrayType` of `StructType`s into a JSON string
|
||||
* with the specified schema. Throws an exception, in the case of an unsupported type.
|
||||
*
|
||||
* @param e a struct column.
|
||||
* @param e a column containing a struct or array of the structs.
|
||||
*
|
||||
* @group collection_funcs
|
||||
* @since 2.1.0
|
||||
|
|
|
@ -3,6 +3,7 @@ describe function to_json;
|
|||
describe function extended to_json;
|
||||
select to_json(named_struct('a', 1, 'b', 2));
|
||||
select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
|
||||
select to_json(array(named_struct('a', 1, 'b', 2)));
|
||||
-- Check if errors handled
|
||||
select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'));
|
||||
select to_json(named_struct('a', 1, 'b', 2), map('mode', 1));
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
-- Automatically generated by SQLQueryTestSuite
|
||||
-- Number of queries: 16
|
||||
-- Number of queries: 17
|
||||
|
||||
|
||||
-- !query 0
|
||||
|
@ -7,7 +7,7 @@ describe function to_json
|
|||
-- !query 0 schema
|
||||
struct<function_desc:string>
|
||||
-- !query 0 output
|
||||
Class: org.apache.spark.sql.catalyst.expressions.StructToJson
|
||||
Class: org.apache.spark.sql.catalyst.expressions.StructsToJson
|
||||
Function: to_json
|
||||
Usage: to_json(expr[, options]) - Returns a json string with a given struct value
|
||||
|
||||
|
@ -17,13 +17,15 @@ describe function extended to_json
|
|||
-- !query 1 schema
|
||||
struct<function_desc:string>
|
||||
-- !query 1 output
|
||||
Class: org.apache.spark.sql.catalyst.expressions.StructToJson
|
||||
Class: org.apache.spark.sql.catalyst.expressions.StructsToJson
|
||||
Extended Usage:
|
||||
Examples:
|
||||
> SELECT to_json(named_struct('a', 1, 'b', 2));
|
||||
{"a":1,"b":2}
|
||||
> SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
|
||||
{"time":"26/08/2015"}
|
||||
> SELECT to_json(array(named_struct('a', 1, 'b', 2));
|
||||
[{"a":1,"b":2}]
|
||||
|
||||
Function: to_json
|
||||
Usage: to_json(expr[, options]) - Returns a json string with a given struct value
|
||||
|
@ -32,7 +34,7 @@ Usage: to_json(expr[, options]) - Returns a json string with a given struct valu
|
|||
-- !query 2
|
||||
select to_json(named_struct('a', 1, 'b', 2))
|
||||
-- !query 2 schema
|
||||
struct<structtojson(named_struct(a, 1, b, 2)):string>
|
||||
struct<structstojson(named_struct(a, 1, b, 2)):string>
|
||||
-- !query 2 output
|
||||
{"a":1,"b":2}
|
||||
|
||||
|
@ -40,54 +42,62 @@ struct<structtojson(named_struct(a, 1, b, 2)):string>
|
|||
-- !query 3
|
||||
select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'))
|
||||
-- !query 3 schema
|
||||
struct<structtojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd'))):string>
|
||||
struct<structstojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd'))):string>
|
||||
-- !query 3 output
|
||||
{"time":"26/08/2015"}
|
||||
|
||||
|
||||
-- !query 4
|
||||
select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))
|
||||
select to_json(array(named_struct('a', 1, 'b', 2)))
|
||||
-- !query 4 schema
|
||||
struct<>
|
||||
struct<structstojson(array(named_struct(a, 1, b, 2))):string>
|
||||
-- !query 4 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Must use a map() function for options;; line 1 pos 7
|
||||
[{"a":1,"b":2}]
|
||||
|
||||
|
||||
-- !query 5
|
||||
select to_json(named_struct('a', 1, 'b', 2), map('mode', 1))
|
||||
select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))
|
||||
-- !query 5 schema
|
||||
struct<>
|
||||
-- !query 5 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
|
||||
Must use a map() function for options;; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 6
|
||||
select to_json()
|
||||
select to_json(named_struct('a', 1, 'b', 2), map('mode', 1))
|
||||
-- !query 6 schema
|
||||
struct<>
|
||||
-- !query 6 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Invalid number of arguments for function to_json; line 1 pos 7
|
||||
A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 7
|
||||
describe function from_json
|
||||
select to_json()
|
||||
-- !query 7 schema
|
||||
struct<function_desc:string>
|
||||
struct<>
|
||||
-- !query 7 output
|
||||
Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Invalid number of arguments for function to_json; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 8
|
||||
describe function from_json
|
||||
-- !query 8 schema
|
||||
struct<function_desc:string>
|
||||
-- !query 8 output
|
||||
Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs
|
||||
Function: from_json
|
||||
Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
|
||||
|
||||
|
||||
-- !query 8
|
||||
-- !query 9
|
||||
describe function extended from_json
|
||||
-- !query 8 schema
|
||||
-- !query 9 schema
|
||||
struct<function_desc:string>
|
||||
-- !query 8 output
|
||||
Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
|
||||
-- !query 9 output
|
||||
Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs
|
||||
Extended Usage:
|
||||
Examples:
|
||||
> SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
|
||||
|
@ -99,36 +109,36 @@ Function: from_json
|
|||
Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
|
||||
|
||||
|
||||
-- !query 9
|
||||
-- !query 10
|
||||
select from_json('{"a":1}', 'a INT')
|
||||
-- !query 9 schema
|
||||
struct<jsontostruct({"a":1}):struct<a:int>>
|
||||
-- !query 9 output
|
||||
-- !query 10 schema
|
||||
struct<jsontostructs({"a":1}):struct<a:int>>
|
||||
-- !query 10 output
|
||||
{"a":1}
|
||||
|
||||
|
||||
-- !query 10
|
||||
-- !query 11
|
||||
select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'))
|
||||
-- !query 10 schema
|
||||
struct<jsontostruct({"time":"26/08/2015"}):struct<time:timestamp>>
|
||||
-- !query 10 output
|
||||
-- !query 11 schema
|
||||
struct<jsontostructs({"time":"26/08/2015"}):struct<time:timestamp>>
|
||||
-- !query 11 output
|
||||
{"time":2015-08-26 00:00:00.0}
|
||||
|
||||
|
||||
-- !query 11
|
||||
-- !query 12
|
||||
select from_json('{"a":1}', 1)
|
||||
-- !query 11 schema
|
||||
-- !query 12 schema
|
||||
struct<>
|
||||
-- !query 11 output
|
||||
-- !query 12 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Expected a string literal instead of 1;; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 12
|
||||
-- !query 13
|
||||
select from_json('{"a":1}', 'a InvalidType')
|
||||
-- !query 12 schema
|
||||
-- !query 13 schema
|
||||
struct<>
|
||||
-- !query 12 output
|
||||
-- !query 13 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
|
||||
DataType invalidtype() is not supported.(line 1, pos 2)
|
||||
|
@ -139,28 +149,28 @@ a InvalidType
|
|||
; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 13
|
||||
select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE'))
|
||||
-- !query 13 schema
|
||||
struct<>
|
||||
-- !query 13 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Must use a map() function for options;; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 14
|
||||
select from_json('{"a":1}', 'a INT', map('mode', 1))
|
||||
select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE'))
|
||||
-- !query 14 schema
|
||||
struct<>
|
||||
-- !query 14 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
|
||||
Must use a map() function for options;; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 15
|
||||
select from_json()
|
||||
select from_json('{"a":1}', 'a INT', map('mode', 1))
|
||||
-- !query 15 schema
|
||||
struct<>
|
||||
-- !query 15 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 16
|
||||
select from_json()
|
||||
-- !query 16 schema
|
||||
struct<>
|
||||
-- !query 16 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Invalid number of arguments for function from_json; line 1 pos 7
|
||||
|
|
|
@ -156,7 +156,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
|
|||
Row(Seq(Row(1, "a"), Row(2, null), Row(null, null))))
|
||||
}
|
||||
|
||||
test("to_json") {
|
||||
test("to_json - struct") {
|
||||
val df = Seq(Tuple1(Tuple1(1))).toDF("a")
|
||||
|
||||
checkAnswer(
|
||||
|
@ -164,6 +164,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
|
|||
Row("""{"_1":1}""") :: Nil)
|
||||
}
|
||||
|
||||
test("to_json - array") {
|
||||
val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a")
|
||||
|
||||
checkAnswer(
|
||||
df.select(to_json($"a")),
|
||||
Row("""[{"_1":1}]""") :: Nil)
|
||||
}
|
||||
|
||||
test("to_json with option") {
|
||||
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
|
||||
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
|
||||
|
@ -184,7 +192,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
|
|||
"Unable to convert column a of type calendarinterval to JSON."))
|
||||
}
|
||||
|
||||
test("roundtrip in to_json and from_json") {
|
||||
test("roundtrip in to_json and from_json - struct") {
|
||||
val dfOne = Seq(Tuple1(Tuple1(1)), Tuple1(null)).toDF("struct")
|
||||
val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType]
|
||||
val readBackOne = dfOne.select(to_json($"struct").as("json"))
|
||||
|
@ -198,6 +206,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
|
|||
checkAnswer(dfTwo, readBackTwo)
|
||||
}
|
||||
|
||||
test("roundtrip in to_json and from_json - array") {
|
||||
val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array")
|
||||
val schemaOne = dfOne.schema(0).dataType
|
||||
val readBackOne = dfOne.select(to_json($"array").as("json"))
|
||||
.select(from_json($"json", schemaOne).as("array"))
|
||||
checkAnswer(dfOne, readBackOne)
|
||||
|
||||
val dfTwo = Seq(Some("""[{"a":1}]"""), None).toDF("json")
|
||||
val schemaTwo = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val readBackTwo = dfTwo.select(from_json($"json", schemaTwo).as("array"))
|
||||
.select(to_json($"array").as("json"))
|
||||
checkAnswer(dfTwo, readBackTwo)
|
||||
}
|
||||
|
||||
test("SPARK-19637 Support to_json in SQL") {
|
||||
val df1 = Seq(Tuple1(Tuple1(1))).toDF("a")
|
||||
checkAnswer(
|
||||
|
|
Loading…
Reference in a new issue