[SPARK-17699] Support for parsing JSON string columns

Spark SQL has great support for reading text files that contain JSON data.  However, in many cases the JSON data is just one column amongst others.  This is particularly true when reading from sources such as Kafka.  This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema.

Example usage:
```scala
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

df.select(from_json($"value", schema) as 'json) // => [json: <a: int>]
```

This PR adds support for java, scala and python.  I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it).  I left SQL out for now, because I'm not sure how users would specify a schema.

Author: Michael Armbrust <michael@databricks.com>

Closes #15274 from marmbrus/jsonParser.
This commit is contained in:
Michael Armbrust 2016-09-29 13:01:10 -07:00 committed by Yin Huai
parent 027dea8f29
commit fe33121a53
19 changed files with 198 additions and 23 deletions

View file

@ -1706,6 +1706,29 @@ def json_tuple(col, *fields):
return Column(jc)
@since(2.1)
def from_json(col, schema, options={}):
"""
Parses a column containing a JSON string into a [[StructType]] with the
specified schema. Returns `null`, in the case of an unparseable string.
:param col: string column in json format
:param schema: a StructType to use when parsing the json column
:param options: options to control parsing. accepts the same options as the json datasource
>>> from pyspark.sql.types import *
>>> data = [(1, '''{"a": 1}''')]
>>> schema = StructType([StructField("a", IntegerType())])
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
"""
sc = SparkContext._active_spark_context
jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
return Column(jc)
@since(1.5)
def size(col):
"""

View file

@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
import org.apache.spark.sql.catalyst.util.ParseModes
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
}
}
/**
* Converts an json input string to a [[StructType]] with the specified schema.
*/
case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
extends Expression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
@transient
lazy val parser =
new JacksonParser(
schema,
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
override def dataType: DataType = schema
override def children: Seq[Expression] = child :: Nil
override def eval(input: InternalRow): Any = {
try parser.parse(child.eval(input).toString).head catch {
case _: SparkSQLJsonProcessingException => null
}
}
override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
}

View file

@ -15,16 +15,16 @@
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
/**
* Options for the JSON data source.
* Options for parsing JSON data into Spark SQL rows.
*
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json
import java.io.ByteArrayOutputStream
@ -28,19 +28,22 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
/**
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
*/
class JacksonParser(
schema: StructType,
columnNameOfCorruptRecord: String,
options: JSONOptions) extends Logging {
import JacksonUtils._
import ParseModes._
import com.fasterxml.jackson.core.JsonToken._
// A `ValueConverter` is responsible for converting a value from `JsonParser`
@ -65,7 +68,7 @@ class JacksonParser(
private def failedRecord(record: String): Seq[InternalRow] = {
// create a row even if no corrupt record column is present
if (options.failFast) {
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
if (!isWarningPrintedForMalformedRecord) {

View file

@ -15,11 +15,11 @@
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
private object JacksonUtils {
object JacksonUtils {
/**
* Advance the parser until a null or a specific token is found
*/

View file

@ -15,15 +15,15 @@
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
package org.apache.spark.sql.catalyst.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
import org.apache.hadoop.io.compress._
import org.apache.spark.util.Utils
private[datasources] object CompressionCodecs {
object CompressionCodecs {
private val shortCompressionCodecNames = Map(
"none" -> null,
"uncompressed" -> null,

View file

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
package org.apache.spark.sql.catalyst.util
private[datasources] object ParseModes {
object ParseModes {
val PERMISSIVE_MODE = "PERMISSIVE"
val DROP_MALFORMED_MODE = "DROPMALFORMED"
val FAIL_FAST_MODE = "FAILFAST"

View file

@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ParseModes
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
}
test("from_json") {
val jsonData = """{"a": 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStruct(schema, Map.empty, Literal(jsonData)),
InternalRow.fromSeq(1 :: Nil)
)
}
test("from_json - invalid data") {
val jsonData = """{"a" 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStruct(schema, Map.empty, Literal(jsonData)),
null
)
// Other modes should still return `null`.
checkEvaluation(
JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
null
)
}
}

View file

@ -21,14 +21,15 @@ import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.spark.Partition
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.datasources.json.InferSchema
import org.apache.spark.sql.types.StructType
/**

View file

@ -29,6 +29,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

View file

@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
extends Logging with Serializable {

View file

@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

View file

@ -21,8 +21,9 @@ import java.io.Writer
import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._

View file

@ -32,6 +32,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType

View file

@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.Try
@ -2818,6 +2819,63 @@ object functions {
JsonTuple(json.expr +: fields.map(Literal.apply))
}
/**
* (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
* specified schema. Returns `null`, in the case of an unparseable string.
*
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
* @param e a string column containing JSON data.
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
JsonToStruct(schema, options, e.expr)
}
/**
* (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the
* specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
from_json(e, schema, options.asScala.toMap)
/**
* Parses a column containing a JSON string into a [[StructType]] with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: StructType): Column =
from_json(e, schema, Map.empty[String, String])
/**
* Parses a column containing a JSON string into a [[StructType]] with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string as a json string
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
/**
* Returns length of array or map.
*

View file

@ -17,7 +17,9 @@
package org.apache.spark.sql
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(expr, expected)
}
test("json_parser") {
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)
checkAnswer(
df.select(from_json($"value", schema)),
Row(Row(1)) :: Nil)
}
test("json_parser missing columns") {
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("b", IntegerType)
checkAnswer(
df.select(from_json($"value", schema)),
Row(Row(null)) :: Nil)
}
test("json_parser invalid json") {
val df = Seq("""{"a" 1}""").toDS()
val schema = new StructType().add("a", IntegerType)
checkAnswer(
df.select(from_json($"value", schema)),
Row(null) :: Nil)
}
}

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.test.SharedSQLContext
/**

View file

@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType