[SPARK-21610][SQL][FOLLOWUP] Corrupt records are not handled properly when creating a dataframe from a file

## What changes were proposed in this pull request?

When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query.

## How was this patch tested?

Added unit test in `CSVSuite`.

Author: Jen-Ming Chung <jenmingisme@gmail.com>

Closes #19199 from jmchung/SPARK-21610-FOLLOWUP.
This commit is contained in:
Jen-Ming Chung 2017-09-12 22:47:12 +09:00 committed by hyukjinkwon
parent dd78167585
commit 7d0a3ef4ce
3 changed files with 57 additions and 1 deletions

View file

@ -109,6 +109,20 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}
if (requiredSchema.length == 1 &&
requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
throw new AnalysisException(
"Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
"referenced columns only include the internal corrupt record column\n" +
s"(named _corrupt_record by default). For example:\n" +
"spark.read.schema(schema).csv(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
"and spark.read.schema(schema).csv(file).select(\"_corrupt_record\").show().\n" +
"Instead, you can cache or save the parsed results and then send the same query.\n" +
"For example, val df = spark.read.schema(schema).csv(file).cache() and then\n" +
"df.filter($\"_corrupt_record\".isNotNull).count()."
)
}
(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
val parser = new UnivocityParser(

View file

@ -118,7 +118,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
throw new AnalysisException(
"Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
"referenced columns only include the internal corrupt record column\n" +
s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For example:\n" +
s"(named _corrupt_record by default). For example:\n" +
"spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
"and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
"Instead, you can cache or save the parsed results and then send the same query.\n" +

View file

@ -1203,4 +1203,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.csv(Seq("a").toDS())
checkAnswer(df, Row("a", null, "a"))
}
test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " +
"from a file") {
val columnNameOfCorruptRecord = "_corrupt_record"
val schema = new StructType()
.add("a", IntegerType)
.add("b", TimestampType)
.add(columnNameOfCorruptRecord, StringType)
// negative cases
val msg = intercept[AnalysisException] {
spark
.read
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.schema(schema)
.csv(testFile(valueMalformedFile))
.select(columnNameOfCorruptRecord)
.collect()
}.getMessage
assert(msg.contains("only include the internal corrupt record column"))
intercept[org.apache.spark.sql.catalyst.errors.TreeNodeException[_]] {
spark
.read
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.schema(schema)
.csv(testFile(valueMalformedFile))
.filter($"_corrupt_record".isNotNull)
.count()
}
// workaround
val df = spark
.read
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.schema(schema)
.csv(testFile(valueMalformedFile))
.cache()
assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
assert(df.filter($"_corrupt_record".isNull).count() == 1)
checkAnswer(
df.select(columnNameOfCorruptRecord),
Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil
)
}
}