[SPARK-25952][SQL] Passing actual schema to JacksonParser

## What changes were proposed in this pull request?

The PR fixes an issue when the corrupt record column specified via `spark.sql.columnNameOfCorruptRecord` or JSON options `columnNameOfCorruptRecord` is propagated to JacksonParser, and returned row breaks an assumption in `FailureSafeParser` that the row must contain only actual data. The issue is fixed by passing actual schema without the corrupt record field into `JacksonParser`.

## How was this patch tested?

Added a test with the corrupt record column in the middle of user's schema.

Closes #22958 from MaxGekk/from_json-corrupt-record-schema.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
This commit is contained in:
Maxim Gekk 2018-11-08 14:48:23 +08:00 committed by hyukjinkwon
parent d68f3a726f
commit 17449a2e6b
2 changed files with 21 additions and 6 deletions

View file

@ -569,14 +569,16 @@ case class JsonToStructs(
throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " +
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")
}
val rawParser = new JacksonParser(nullableSchema, parsedOptions, allowArrayAsStructs = false)
val createParser = CreateJacksonParser.utf8String _
val parserSchema = nullableSchema match {
case s: StructType => s
case other => StructType(StructField("value", other) :: Nil)
val (parserSchema, actualSchema) = nullableSchema match {
case s: StructType =>
(s, StructType(s.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)))
case other =>
(StructType(StructField("value", other) :: Nil), other)
}
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)
val createParser = CreateJacksonParser.utf8String _
new FailureSafeParser[UTF8String](
input => rawParser.parse(input, createParser, identity[UTF8String]),
mode,

View file

@ -578,4 +578,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
"Acceptable modes are PERMISSIVE and FAILFAST."))
}
}
test("corrupt record column in the middle") {
val schema = new StructType()
.add("a", IntegerType)
.add("_unparsed", StringType)
.add("b", IntegerType)
val badRec = """{"a" 1, "b": 11}"""
val df = Seq(badRec, """{"a": 2, "b": 12}""").toDS()
checkAnswer(
df.select(from_json($"value", schema, Map("columnNameOfCorruptRecord" -> "_unparsed"))),
Row(Row(null, badRec, null)) :: Row(Row(2, null, 12)) :: Nil)
}
}