[SPARK-31261][SQL] Avoid npe when reading bad csv input with columnNameCorruptRecord
specified
### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang <wzh_zju@163.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
34c7476cb5
commit
791d2ba346
|
@ -101,7 +101,8 @@ class UnivocityParser(
|
|||
|
||||
// Retrieve the raw record string.
|
||||
private def getCurrentInput: UTF8String = {
|
||||
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
|
||||
val currentContent = tokenizer.getContext.currentParsedContent()
|
||||
if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd)
|
||||
}
|
||||
|
||||
// This parser first picks some tokens from the input tokens, according to the required schema,
|
||||
|
|
|
@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
|
|||
assert(spark.read.csv(input).collect().toSet == Set(Row()))
|
||||
}
|
||||
|
||||
test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") {
|
||||
val schema = StructType(
|
||||
StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil)
|
||||
val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
|
||||
|
||||
checkAnswer(
|
||||
spark.read
|
||||
.option("columnNameOfCorruptRecord", "_corrupt_record")
|
||||
.schema(schema)
|
||||
.csv(input),
|
||||
Row(null, null))
|
||||
assert(spark.read.csv(input).collect().toSet == Set(Row()))
|
||||
}
|
||||
|
||||
test("field names of inferred schema shouldn't compare to the first row") {
|
||||
val input = Seq("1,2").toDS()
|
||||
val df = spark.read.option("enforceSchema", false).csv(input)
|
||||
|
|
Loading…
Reference in a new issue