[SPARK-27873][SQL] columnNameOfCorruptRecord should not be checked with column names in CSV header when disabling enforceSchema

## What changes were proposed in this pull request?

If we want to keep corrupt record when reading CSV, we provide a new column into the schema, that is `columnNameOfCorruptRecord`. But this new column isn't actually a column in CSV header. So if `enforceSchema` is disabled, `CSVHeaderChecker` throws a exception complaining that number of column in CSV header isn't equal to that in the schema.

## How was this patch tested?

Added test.

Closes #24757 from viirya/SPARK-27873.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Liang-Chi Hsieh 2019-06-03 11:09:26 +09:00 committed by HyukjinKwon
parent f5317f10b2
commit 2a88fffacb
3 changed files with 39 additions and 8 deletions

View file

@ -130,11 +130,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
val parser = new UnivocityParser(
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
parsedOptions)
val schema = if (columnPruning) requiredSchema else dataSchema
val actualDataSchema = StructType(
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val parser = new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions)
val schema = if (columnPruning) actualRequiredSchema else actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
schema, parsedOptions, source = s"CSV file: ${file.filePath}", isStartOfFile)

View file

@ -48,11 +48,15 @@ case class CSVPartitionReaderFactory(
override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = {
val conf = broadcastedConf.value.value
val actualDataSchema = StructType(
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualReadDataSchema = StructType(
readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val parser = new UnivocityParser(
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
actualDataSchema,
actualReadDataSchema,
parsedOptions)
val schema = if (columnPruning) readDataSchema else dataSchema
val schema = if (columnPruning) actualReadDataSchema else actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
schema, parsedOptions, source = s"CSV file: ${file.filePath}", isStartOfFile)

View file

@ -2059,4 +2059,30 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.option("inferSchema", "true")
.csv(Seq("1,2").toDS).schema.head.dataType === StringType)
}
test("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") {
Seq("csv", "").foreach { reader =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> reader) {
withTempPath { path =>
val df = Seq(("0", "2013-111-11")).toDF("a", "b")
df.write
.option("header", "true")
.csv(path.getAbsolutePath)
val schema = StructType.fromDDL("a int, b date")
val columnNameOfCorruptRecord = "_unparsed"
val schemaWithCorrField = schema.add(columnNameOfCorruptRecord, StringType)
val readDF = spark
.read
.option("mode", "Permissive")
.option("header", "true")
.option("enforceSchema", false)
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.schema(schemaWithCorrField)
.csv(path.getAbsoluteFile.toString)
checkAnswer(readDF, Row(0, null, "0,2013-111-11") :: Nil)
}
}
}
}
}