diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index c8de53a17a..8abc6fcacd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -130,11 +130,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value - val parser = new UnivocityParser( - StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), - StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), - parsedOptions) - val schema = if (columnPruning) requiredSchema else dataSchema + val actualDataSchema = StructType( + dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + val actualRequiredSchema = StructType( + requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + val parser = new UnivocityParser(actualDataSchema, actualRequiredSchema, parsedOptions) + val schema = if (columnPruning) actualRequiredSchema else actualDataSchema val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( schema, parsedOptions, source = s"CSV file: ${file.filePath}", isStartOfFile) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala index 28e310489c..828594ffb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala @@ -48,11 +48,15 @@ case class CSVPartitionReaderFactory( override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value + val actualDataSchema = StructType( + dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + val actualReadDataSchema = StructType( + readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) val parser = new UnivocityParser( - StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), - StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), + actualDataSchema, + actualReadDataSchema, parsedOptions) - val schema = if (columnPruning) readDataSchema else dataSchema + val schema = if (columnPruning) actualReadDataSchema else actualDataSchema val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( schema, parsedOptions, source = s"CSV file: ${file.filePath}", isStartOfFile) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 90deade2a5..2e7d682a3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2059,4 +2059,30 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .option("inferSchema", "true") .csv(Seq("1,2").toDS).schema.head.dataType === StringType) } + + test("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") { + Seq("csv", "").foreach { reader => + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> reader) { + withTempPath { path => + val df = Seq(("0", "2013-111-11")).toDF("a", "b") + df.write + .option("header", "true") + .csv(path.getAbsolutePath) + + val schema = StructType.fromDDL("a int, b date") + val columnNameOfCorruptRecord = "_unparsed" + val schemaWithCorrField = schema.add(columnNameOfCorruptRecord, StringType) + val readDF = spark + .read + .option("mode", "Permissive") + .option("header", "true") + .option("enforceSchema", false) + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schemaWithCorrField) + .csv(path.getAbsoluteFile.toString) + checkAnswer(readDF, Row(0, null, "0,2013-111-11") :: Nil) + } + } + } + } }