[SPARK-26151][SQL] Return partial results for bad CSV records

## What changes were proposed in this pull request?

In the PR, I propose to change behaviour of `UnivocityParser` and `FailureSafeParser`, and return all fields that were parsed and converted to expected types successfully instead of just returning a row with all `null`s for a bad input in the `PERMISSIVE` mode. For example, for CSV line `0,2013-111-11 12:13:14` and DDL schema `a int, b timestamp`, new result is `Row(0, null)`.

## How was this patch tested?

It was checked by existing tests from `CsvSuite` and `CsvFunctionsSuite`.

Closes #23120 from MaxGekk/failuresafe-partial-result.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Maxim Gekk 2018-12-03 18:25:38 +08:00 committed by Wenchen Fan
parent bfa3d32f77
commit 11e5f1bcd4
4 changed files with 27 additions and 29 deletions

View file

@ -243,21 +243,24 @@ class UnivocityParser(
() => getPartialResult(),
new RuntimeException("Malformed CSV record"))
} else {
try {
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to convert the tokens that correspond to the required columns.
var i = 0
while (i < requiredSchema.length) {
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to convert the tokens that correspond to the required columns.
var badRecordException: Option[Throwable] = None
var i = 0
while (i < requiredSchema.length) {
try {
row(i) = valueConverters(i).apply(getToken(tokens, i))
i += 1
} catch {
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
}
i += 1
}
if (badRecordException.isEmpty) {
row
} catch {
case NonFatal(e) =>
// For corrupted records with the number of tokens same as the schema,
// CSV reader doesn't support partial results. All fields other than the field
// configured by `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => getCurrentInput, () => None, e)
} else {
throw BadRecordException(() => getCurrentInput, () => Some(row), badRecordException.get)
}
}
}

View file

@ -33,26 +33,21 @@ class FailureSafeParser[IN](
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
private val resultRow = new GenericInternalRow(schema.length)
private val nullResult = new GenericInternalRow(schema.length)
// This function takes 2 parameters: an optional partial result, and the bad record. If the given
// schema doesn't contain a field for corrupted record, we just return the partial result or a
// row with all fields null. If the given schema contains a field for corrupted record, we will
// set the bad record to this field, and set other fields according to the partial result or null.
private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
if (corruptFieldIndex.isDefined) {
(row, badRecord) => {
var i = 0
while (i < actualSchema.length) {
val from = actualSchema(i)
resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
i += 1
}
resultRow(corruptFieldIndex.get) = badRecord()
resultRow
(row, badRecord) => {
var i = 0
while (i < actualSchema.length) {
val from = actualSchema(i)
resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
i += 1
}
} else {
(row, _) => row.getOrElse(nullResult)
corruptFieldIndex.foreach(index => resultRow(index) = badRecord())
resultRow
}
}

View file

@ -60,7 +60,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
"mode" -> "Permissive", "columnNameOfCorruptRecord" -> columnNameOfCorruptRecord)))
checkAnswer(df2, Seq(
Row(Row(null, null, "0,2013-111-11 12:13:14")),
Row(Row(0, null, "0,2013-111-11 12:13:14")),
Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
}

View file

@ -1116,7 +1116,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.schema(schema)
.csv(testFile(valueMalformedFile))
checkAnswer(df1,
Row(null, null) ::
Row(0, null) ::
Row(1, java.sql.Date.valueOf("1983-08-04")) ::
Nil)
@ -1131,7 +1131,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.schema(schemaWithCorrField1)
.csv(testFile(valueMalformedFile))
checkAnswer(df2,
Row(null, null, "0,2013-111-11 12:13:14") ::
Row(0, null, "0,2013-111-11 12:13:14") ::
Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
Nil)
@ -1148,7 +1148,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.schema(schemaWithCorrField2)
.csv(testFile(valueMalformedFile))
checkAnswer(df3,
Row(null, "0,2013-111-11 12:13:14", null) ::
Row(0, "0,2013-111-11 12:13:14", null) ::
Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
Nil)