[SPARK-16434][SQL] Avoid per-record type dispatch in JSON when reading
## What changes were proposed in this pull request? Currently, `JacksonParser.parse` is doing type-based dispatch for each row to convert the tokens to appropriate values for Spark. It might not have to be done like this because the schema is already kept. So, appropriate converters can be created first according to the schema once, and then apply them to each row. This PR corrects `JacksonParser` so that it creates all converters for the schema once and then applies them to each row rather than type dispatching for every row. Benchmark was proceeded with the codes below: #### Parser tests **Before** ```scala test("Benchmark for JSON converter") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val data = List.fill(N)(row) val dummyOption = new JSONOptions(Map.empty[String, String]) val schema = InferSchema.infer(spark.sparkContext.parallelize(Seq(row)), "", dummyOption) val factory = new JsonFactory() val benchmark = new Benchmark("JSON converter", N) benchmark.addCase("convert JSON file", 10) { _ => data.foreach { input => val parser = factory.createParser(input) parser.nextToken() JacksonParser.convertRootField(factory, parser, schema) } } benchmark.run() } ``` ``` JSON converter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ convert JSON file 1697 / 1807 0.1 13256.9 1.0X ``` **After** ```scala test("Benchmark for JSON converter") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val data = List.fill(N)(row) val dummyOption = new JSONOptions(Map.empty[String, String], new SQLConf()) val schema = InferSchema.infer(spark.sparkContext.parallelize(Seq(row)), dummyOption) val benchmark = new Benchmark("JSON converter", N) benchmark.addCase("convert JSON file", 10) { _ => val parser = new JacksonParser(schema, dummyOption) data.foreach { input => parser.parse(input) } } benchmark.run() } ``` ``` JSON converter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ convert JSON file 1401 / 1461 0.1 10947.4 1.0X ``` It seems parsing time is improved by roughly ~20% #### End-to-End test ```scala test("Benchmark for JSON reader") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val df = spark.sqlContext.read.json(spark.sparkContext.parallelize(List.fill(N)(row))) withTempPath { path => df.write.format("json").save(path.getCanonicalPath) val benchmark = new Benchmark("JSON reader", N) benchmark.addCase("reading JSON file", 10) { _ => spark.read.format("json").load(path.getCanonicalPath).collect() } benchmark.run() } } ``` **Before** ``` JSON reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading JSON file 6485 / 6924 0.0 50665.0 1.0X ``` **After** ``` JSON reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading JSON file 6350 / 6529 0.0 49609.3 1.0X ``` ## How was this patch tested? Existing test cases should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14102 from HyukjinKwon/SPARK-16434.
This commit is contained in:
parent
7a9e25c383
commit
ac84fb64dd
|
@ -319,16 +319,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
|
|||
columnNameOfCorruptRecord,
|
||||
parsedOptions)
|
||||
}
|
||||
val parsed = jsonRDD.mapPartitions { iter =>
|
||||
val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions)
|
||||
iter.flatMap(parser.parse)
|
||||
}
|
||||
|
||||
Dataset.ofRows(
|
||||
sparkSession,
|
||||
LogicalRDD(
|
||||
schema.toAttributes,
|
||||
JacksonParser.parse(
|
||||
jsonRDD,
|
||||
schema,
|
||||
columnNameOfCorruptRecord,
|
||||
parsedOptions))(sparkSession))
|
||||
LogicalRDD(schema.toAttributes, parsed)(sparkSession))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -37,7 +37,7 @@ private[sql] object InferSchema {
|
|||
*/
|
||||
def infer(
|
||||
json: RDD[String],
|
||||
columnNameOfCorruptRecords: String,
|
||||
columnNameOfCorruptRecord: String,
|
||||
configOptions: JSONOptions): StructType = {
|
||||
require(configOptions.samplingRatio > 0,
|
||||
s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
|
||||
|
@ -60,13 +60,13 @@ private[sql] object InferSchema {
|
|||
}
|
||||
} catch {
|
||||
case _: JsonParseException if shouldHandleCorruptRecord =>
|
||||
Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))))
|
||||
Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType))))
|
||||
case _: JsonParseException =>
|
||||
None
|
||||
}
|
||||
}
|
||||
}.fold(StructType(Seq()))(
|
||||
compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord))
|
||||
compatibleRootType(columnNameOfCorruptRecord, shouldHandleCorruptRecord))
|
||||
|
||||
canonicalizeType(rootType) match {
|
||||
case Some(st: StructType) => st
|
||||
|
|
|
@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer
|
|||
import com.fasterxml.jackson.core._
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.util._
|
||||
|
@ -35,101 +34,126 @@ import org.apache.spark.util.Utils
|
|||
|
||||
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
|
||||
|
||||
object JacksonParser extends Logging {
|
||||
|
||||
def parse(
|
||||
input: RDD[String],
|
||||
class JacksonParser(
|
||||
schema: StructType,
|
||||
columnNameOfCorruptRecords: String,
|
||||
configOptions: JSONOptions): RDD[InternalRow] = {
|
||||
columnNameOfCorruptRecord: String,
|
||||
options: JSONOptions) extends Logging {
|
||||
|
||||
input.mapPartitions { iter =>
|
||||
parseJson(iter, schema, columnNameOfCorruptRecords, configOptions)
|
||||
import com.fasterxml.jackson.core.JsonToken._
|
||||
|
||||
// A `ValueConverter` is responsible for converting a value from `JsonParser`
|
||||
// to a value in a field for `InternalRow`.
|
||||
private type ValueConverter = (JsonParser) => Any
|
||||
|
||||
// `ValueConverter`s for the root schema for all fields in the schema
|
||||
private val rootConverter: ValueConverter = makeRootConverter(schema)
|
||||
|
||||
private val factory = new JsonFactory()
|
||||
options.setJacksonOptions(factory)
|
||||
|
||||
/**
|
||||
* This function deals with the cases it fails to parse. This function will be called
|
||||
* when exceptions are caught during converting. This functions also deals with `mode` option.
|
||||
*/
|
||||
private def failedRecord(record: String): Seq[InternalRow] = {
|
||||
// create a row even if no corrupt record column is present
|
||||
if (options.failFast) {
|
||||
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
|
||||
}
|
||||
if (options.dropMalformed) {
|
||||
logWarning(s"Dropping malformed line: $record")
|
||||
Nil
|
||||
} else {
|
||||
val row = new GenericMutableRow(schema.length)
|
||||
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
|
||||
require(schema(corruptIndex).dataType == StringType)
|
||||
row.update(corruptIndex, UTF8String.fromString(record))
|
||||
}
|
||||
Seq(row)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the current token (and related children) according to a desired schema
|
||||
* This is a wrapper for the method `convertField()` to handle a row wrapped
|
||||
* with an array.
|
||||
* Create a converter which converts the JSON documents held by the `JsonParser`
|
||||
* to a value according to a desired schema. This is a wrapper for the method
|
||||
* `makeConverter()` to handle a row wrapped with an array.
|
||||
*/
|
||||
def convertRootField(
|
||||
factory: JsonFactory,
|
||||
parser: JsonParser,
|
||||
schema: DataType): Any = {
|
||||
import com.fasterxml.jackson.core.JsonToken._
|
||||
(parser.getCurrentToken, schema) match {
|
||||
case (START_ARRAY, st: StructType) =>
|
||||
def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
|
||||
case st: StructType =>
|
||||
val elementConverter = makeConverter(st)
|
||||
val fieldConverters = st.map(_.dataType).map(makeConverter)
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case START_OBJECT => convertObject(parser, st, fieldConverters)
|
||||
// SPARK-3308: support reading top level JSON arrays and take every element
|
||||
// in such an array as a row
|
||||
convertArray(factory, parser, st)
|
||||
//
|
||||
// For example, we support, the JSON data as below:
|
||||
//
|
||||
// [{"a":"str_a_1"}]
|
||||
// [{"a":"str_a_2"}, {"b":"str_b_3"}]
|
||||
//
|
||||
// resulting in:
|
||||
//
|
||||
// List([str_a_1,null])
|
||||
// List([str_a_2,null], [null,str_b_3])
|
||||
//
|
||||
case START_ARRAY => convertArray(parser, elementConverter)
|
||||
}
|
||||
|
||||
case (START_OBJECT, ArrayType(st, _)) =>
|
||||
case ArrayType(st: StructType, _) =>
|
||||
val elementConverter = makeConverter(st)
|
||||
val fieldConverters = st.map(_.dataType).map(makeConverter)
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
// the business end of SPARK-3308:
|
||||
// when an object is found but an array is requested just wrap it in a list
|
||||
convertField(factory, parser, st) :: Nil
|
||||
|
||||
case _ =>
|
||||
convertField(factory, parser, schema)
|
||||
}
|
||||
// when an object is found but an array is requested just wrap it in a list.
|
||||
// This is being wrapped in `JacksonParser.parse`.
|
||||
case START_OBJECT => convertObject(parser, st, fieldConverters)
|
||||
case START_ARRAY => convertArray(parser, elementConverter)
|
||||
}
|
||||
|
||||
private def convertField(
|
||||
factory: JsonFactory,
|
||||
parser: JsonParser,
|
||||
schema: DataType): Any = {
|
||||
import com.fasterxml.jackson.core.JsonToken._
|
||||
(parser.getCurrentToken, schema) match {
|
||||
case (null | VALUE_NULL, _) =>
|
||||
null
|
||||
|
||||
case (FIELD_NAME, _) =>
|
||||
parser.nextToken()
|
||||
convertField(factory, parser, schema)
|
||||
|
||||
case (VALUE_STRING, StringType) =>
|
||||
UTF8String.fromString(parser.getText)
|
||||
|
||||
case (VALUE_STRING, _) if parser.getTextLength < 1 =>
|
||||
// guard the non string type
|
||||
null
|
||||
|
||||
case (VALUE_STRING, BinaryType) =>
|
||||
parser.getBinaryValue
|
||||
|
||||
case (VALUE_STRING, DateType) =>
|
||||
val stringValue = parser.getText
|
||||
if (stringValue.contains("-")) {
|
||||
// The format of this string will probably be "yyyy-mm-dd".
|
||||
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
|
||||
} else {
|
||||
// In Spark 1.5.0, we store the data as number of days since epoch in string.
|
||||
// So, we just convert it to Int.
|
||||
stringValue.toInt
|
||||
case _ => makeConverter(dataType)
|
||||
}
|
||||
|
||||
case (VALUE_STRING, TimestampType) =>
|
||||
// This one will lose microseconds parts.
|
||||
// See https://issues.apache.org/jira/browse/SPARK-10681.
|
||||
DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
|
||||
|
||||
case (VALUE_NUMBER_INT, TimestampType) =>
|
||||
parser.getLongValue * 1000000L
|
||||
|
||||
case (_, StringType) =>
|
||||
val writer = new ByteArrayOutputStream()
|
||||
Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
|
||||
generator => generator.copyCurrentStructure(parser)
|
||||
/**
|
||||
* Create a converter which converts the JSON documents held by the `JsonParser`
|
||||
* to a value according to a desired schema.
|
||||
*/
|
||||
private def makeConverter(dataType: DataType): ValueConverter = dataType match {
|
||||
case BooleanType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_TRUE => true
|
||||
case VALUE_FALSE => false
|
||||
}
|
||||
UTF8String.fromBytes(writer.toByteArray)
|
||||
|
||||
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
|
||||
case ByteType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_NUMBER_INT => parser.getByteValue
|
||||
}
|
||||
|
||||
case ShortType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_NUMBER_INT => parser.getShortValue
|
||||
}
|
||||
|
||||
case IntegerType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_NUMBER_INT => parser.getIntValue
|
||||
}
|
||||
|
||||
case LongType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_NUMBER_INT => parser.getLongValue
|
||||
}
|
||||
|
||||
case FloatType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
|
||||
parser.getFloatValue
|
||||
|
||||
case (VALUE_STRING, FloatType) =>
|
||||
case VALUE_STRING =>
|
||||
// Special case handling for NaN and Infinity.
|
||||
val value = parser.getText
|
||||
val lowerCaseValue = value.toLowerCase()
|
||||
val lowerCaseValue = value.toLowerCase
|
||||
if (lowerCaseValue.equals("nan") ||
|
||||
lowerCaseValue.equals("infinity") ||
|
||||
lowerCaseValue.equals("-infinity") ||
|
||||
|
@ -139,14 +163,17 @@ object JacksonParser extends Logging {
|
|||
} else {
|
||||
throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
|
||||
}
|
||||
}
|
||||
|
||||
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
|
||||
case DoubleType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
|
||||
parser.getDoubleValue
|
||||
|
||||
case (VALUE_STRING, DoubleType) =>
|
||||
case VALUE_STRING =>
|
||||
// Special case handling for NaN and Infinity.
|
||||
val value = parser.getText
|
||||
val lowerCaseValue = value.toLowerCase()
|
||||
val lowerCaseValue = value.toLowerCase
|
||||
if (lowerCaseValue.equals("nan") ||
|
||||
lowerCaseValue.equals("infinity") ||
|
||||
lowerCaseValue.equals("-infinity") ||
|
||||
|
@ -156,63 +183,140 @@ object JacksonParser extends Logging {
|
|||
} else {
|
||||
throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
|
||||
}
|
||||
}
|
||||
|
||||
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
|
||||
case StringType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_STRING =>
|
||||
UTF8String.fromString(parser.getText)
|
||||
|
||||
case _ =>
|
||||
// Note that it always tries to convert the data as string without the case of failure.
|
||||
val writer = new ByteArrayOutputStream()
|
||||
Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
|
||||
generator => generator.copyCurrentStructure(parser)
|
||||
}
|
||||
UTF8String.fromBytes(writer.toByteArray)
|
||||
}
|
||||
|
||||
case TimestampType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_STRING =>
|
||||
// This one will lose microseconds parts.
|
||||
// See https://issues.apache.org/jira/browse/SPARK-10681.
|
||||
DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
|
||||
|
||||
case VALUE_NUMBER_INT =>
|
||||
parser.getLongValue * 1000000L
|
||||
}
|
||||
|
||||
case DateType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_STRING =>
|
||||
val stringValue = parser.getText
|
||||
if (stringValue.contains("-")) {
|
||||
// The format of this string will probably be "yyyy-mm-dd".
|
||||
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
|
||||
} else {
|
||||
// In Spark 1.5.0, we store the data as number of days since epoch in string.
|
||||
// So, we just convert it to Int.
|
||||
stringValue.toInt
|
||||
}
|
||||
}
|
||||
|
||||
case BinaryType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case VALUE_STRING => parser.getBinaryValue
|
||||
}
|
||||
|
||||
case dt: DecimalType =>
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
|
||||
Decimal(parser.getDecimalValue, dt.precision, dt.scale)
|
||||
}
|
||||
|
||||
case (VALUE_NUMBER_INT, ByteType) =>
|
||||
parser.getByteValue
|
||||
case st: StructType =>
|
||||
val fieldConverters = st.map(_.dataType).map(makeConverter)
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case START_OBJECT => convertObject(parser, st, fieldConverters)
|
||||
}
|
||||
|
||||
case (VALUE_NUMBER_INT, ShortType) =>
|
||||
parser.getShortValue
|
||||
case at: ArrayType =>
|
||||
val elementConverter = makeConverter(at.elementType)
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case START_ARRAY => convertArray(parser, elementConverter)
|
||||
}
|
||||
|
||||
case (VALUE_NUMBER_INT, IntegerType) =>
|
||||
parser.getIntValue
|
||||
case mt: MapType =>
|
||||
val valueConverter = makeConverter(mt.valueType)
|
||||
(parser: JsonParser) => parseJsonToken(parser, dataType) {
|
||||
case START_OBJECT => convertMap(parser, valueConverter)
|
||||
}
|
||||
|
||||
case (VALUE_NUMBER_INT, LongType) =>
|
||||
parser.getLongValue
|
||||
case udt: UserDefinedType[_] =>
|
||||
makeConverter(udt.sqlType)
|
||||
|
||||
case (VALUE_TRUE, BooleanType) =>
|
||||
true
|
||||
case _ =>
|
||||
(parser: JsonParser) =>
|
||||
// Here, we pass empty `PartialFunction` so that this case can be
|
||||
// handled as a failed conversion. It will throw an exception as
|
||||
// long as the value is not null.
|
||||
parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
|
||||
}
|
||||
|
||||
case (VALUE_FALSE, BooleanType) =>
|
||||
false
|
||||
/**
|
||||
* This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying
|
||||
* to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
|
||||
* token, call `failedConversion` to handle the token.
|
||||
*/
|
||||
private def parseJsonToken(
|
||||
parser: JsonParser,
|
||||
dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
|
||||
parser.getCurrentToken match {
|
||||
case FIELD_NAME =>
|
||||
// There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
|
||||
parser.nextToken()
|
||||
parseJsonToken(parser, dataType)(f)
|
||||
|
||||
case (START_OBJECT, st: StructType) =>
|
||||
convertObject(factory, parser, st)
|
||||
case null | VALUE_NULL => null
|
||||
|
||||
case (START_ARRAY, ArrayType(st, _)) =>
|
||||
convertArray(factory, parser, st)
|
||||
|
||||
case (START_OBJECT, MapType(StringType, kt, _)) =>
|
||||
convertMap(factory, parser, kt)
|
||||
|
||||
case (_, udt: UserDefinedType[_]) =>
|
||||
convertField(factory, parser, udt.sqlType)
|
||||
|
||||
case (token, dataType) =>
|
||||
// We cannot parse this token based on the given data type. So, we throw a
|
||||
// SparkSQLJsonProcessingException and this exception will be caught by
|
||||
// parseJson method.
|
||||
throw new SparkSQLJsonProcessingException(
|
||||
s"Failed to parse a value for data type $dataType (current token: $token).")
|
||||
case other => f.applyOrElse(other, failedConversion(parser, dataType))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function throws an exception for failed conversion, but returns null for empty string,
|
||||
* to guard the non string types.
|
||||
*/
|
||||
private def failedConversion(
|
||||
parser: JsonParser,
|
||||
dataType: DataType): PartialFunction[JsonToken, Any] = {
|
||||
case VALUE_STRING if parser.getTextLength < 1 =>
|
||||
// If conversion is failed, this produces `null` rather than throwing exception.
|
||||
// This will protect the mismatch of types.
|
||||
null
|
||||
|
||||
case token =>
|
||||
// We cannot parse this token based on the given data type. So, we throw a
|
||||
// SparkSQLJsonProcessingException and this exception will be caught by
|
||||
// `parse` method.
|
||||
throw new SparkSQLJsonProcessingException(
|
||||
s"Failed to parse a value for data type $dataType (current token: $token).")
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse an object from the token stream into a new Row representing the schema.
|
||||
*
|
||||
* Fields in the json that are not defined in the requested schema will be dropped.
|
||||
*/
|
||||
private def convertObject(
|
||||
factory: JsonFactory,
|
||||
parser: JsonParser,
|
||||
schema: StructType): InternalRow = {
|
||||
schema: StructType,
|
||||
fieldConverters: Seq[ValueConverter]): InternalRow = {
|
||||
val row = new GenericMutableRow(schema.length)
|
||||
while (nextUntil(parser, JsonToken.END_OBJECT)) {
|
||||
schema.getFieldIndex(parser.getCurrentName) match {
|
||||
case Some(index) =>
|
||||
row.update(index, convertField(factory, parser, schema(index).dataType))
|
||||
row.update(index, fieldConverters(index).apply(parser))
|
||||
|
||||
case None =>
|
||||
parser.skipChildren()
|
||||
|
@ -223,87 +327,65 @@ object JacksonParser extends Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Parse an object as a Map, preserving all fields
|
||||
* Parse an object as a Map, preserving all fields.
|
||||
*/
|
||||
private def convertMap(
|
||||
factory: JsonFactory,
|
||||
parser: JsonParser,
|
||||
valueType: DataType): MapData = {
|
||||
fieldConverter: ValueConverter): MapData = {
|
||||
val keys = ArrayBuffer.empty[UTF8String]
|
||||
val values = ArrayBuffer.empty[Any]
|
||||
while (nextUntil(parser, JsonToken.END_OBJECT)) {
|
||||
keys += UTF8String.fromString(parser.getCurrentName)
|
||||
values += convertField(factory, parser, valueType)
|
||||
values += fieldConverter.apply(parser)
|
||||
}
|
||||
|
||||
ArrayBasedMapData(keys.toArray, values.toArray)
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse an object as a Array.
|
||||
*/
|
||||
private def convertArray(
|
||||
factory: JsonFactory,
|
||||
parser: JsonParser,
|
||||
elementType: DataType): ArrayData = {
|
||||
fieldConverter: ValueConverter): ArrayData = {
|
||||
val values = ArrayBuffer.empty[Any]
|
||||
while (nextUntil(parser, JsonToken.END_ARRAY)) {
|
||||
values += convertField(factory, parser, elementType)
|
||||
values += fieldConverter.apply(parser)
|
||||
}
|
||||
|
||||
new GenericArrayData(values.toArray)
|
||||
}
|
||||
|
||||
def parseJson(
|
||||
input: Iterator[String],
|
||||
schema: StructType,
|
||||
columnNameOfCorruptRecords: String,
|
||||
configOptions: JSONOptions): Iterator[InternalRow] = {
|
||||
|
||||
def failedRecord(record: String): Seq[InternalRow] = {
|
||||
// create a row even if no corrupt record column is present
|
||||
if (configOptions.failFast) {
|
||||
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
|
||||
}
|
||||
if (configOptions.dropMalformed) {
|
||||
logWarning(s"Dropping malformed line: $record")
|
||||
Nil
|
||||
} else {
|
||||
val row = new GenericMutableRow(schema.length)
|
||||
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
|
||||
require(schema(corruptIndex).dataType == StringType)
|
||||
row.update(corruptIndex, UTF8String.fromString(record))
|
||||
}
|
||||
Seq(row)
|
||||
}
|
||||
}
|
||||
|
||||
val factory = new JsonFactory()
|
||||
configOptions.setJacksonOptions(factory)
|
||||
|
||||
input.flatMap { record =>
|
||||
if (record.trim.isEmpty) {
|
||||
/**
|
||||
* Parse the string JSON input to the set of [[InternalRow]]s.
|
||||
*/
|
||||
def parse(input: String): Seq[InternalRow] = {
|
||||
if (input.trim.isEmpty) {
|
||||
Nil
|
||||
} else {
|
||||
try {
|
||||
Utils.tryWithResource(factory.createParser(record)) { parser =>
|
||||
Utils.tryWithResource(factory.createParser(input)) { parser =>
|
||||
parser.nextToken()
|
||||
|
||||
convertRootField(factory, parser, schema) match {
|
||||
case null => failedRecord(record)
|
||||
rootConverter.apply(parser) match {
|
||||
case null => failedRecord(input)
|
||||
case row: InternalRow => row :: Nil
|
||||
case array: ArrayData =>
|
||||
// Here, as we support reading top level JSON arrays and take every element
|
||||
// in such an array as a row, this case is possible.
|
||||
if (array.numElements() == 0) {
|
||||
Nil
|
||||
} else {
|
||||
array.toArray[InternalRow](schema)
|
||||
}
|
||||
case _ =>
|
||||
failedRecord(record)
|
||||
failedRecord(input)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case _: JsonProcessingException =>
|
||||
failedRecord(record)
|
||||
failedRecord(input)
|
||||
case _: SparkSQLJsonProcessingException =>
|
||||
failedRecord(record)
|
||||
}
|
||||
failedRecord(input)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,12 +106,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
|
|||
|
||||
(file: PartitionedFile) => {
|
||||
val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
|
||||
|
||||
JacksonParser.parseJson(
|
||||
lines,
|
||||
requiredSchema,
|
||||
columnNameOfCorruptRecord,
|
||||
parsedOptions)
|
||||
val parser = new JacksonParser(requiredSchema, columnNameOfCorruptRecord, parsedOptions)
|
||||
lines.flatMap(parser.parse)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -61,9 +61,14 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
generator.flush()
|
||||
}
|
||||
|
||||
Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
|
||||
parser.nextToken()
|
||||
JacksonParser.convertRootField(factory, parser, dataType)
|
||||
val dummyOption = new JSONOptions(Map.empty[String, String])
|
||||
val dummySchema = StructType(Seq.empty)
|
||||
val parser = new JacksonParser(dummySchema, "", dummyOption)
|
||||
|
||||
Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
|
||||
jsonParser.nextToken()
|
||||
val converter = parser.makeRootConverter(dataType)
|
||||
converter.apply(jsonParser)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue