From 7d0a3ef4ced9684457ad6c5924c58b95249419e1 Mon Sep 17 00:00:00 2001 From: Jen-Ming Chung Date: Tue, 12 Sep 2017 22:47:12 +0900 Subject: [PATCH] [SPARK-21610][SQL][FOLLOWUP] Corrupt records are not handled properly when creating a dataframe from a file ## What changes were proposed in this pull request? When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query. ## How was this patch tested? Added unit test in `CSVSuite`. Author: Jen-Ming Chung Closes #19199 from jmchung/SPARK-21610-FOLLOWUP. --- .../datasources/csv/CSVFileFormat.scala | 14 +++++++ .../datasources/json/JsonFileFormat.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 42 +++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index a99bdfee5d..e20977a4ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -109,6 +109,20 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { } } + if (requiredSchema.length == 1 && + requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) { + throw new AnalysisException( + "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" + + "referenced columns only include the internal corrupt record column\n" + + s"(named _corrupt_record by default). For example:\n" + + "spark.read.schema(schema).csv(file).filter($\"_corrupt_record\".isNotNull).count()\n" + + "and spark.read.schema(schema).csv(file).select(\"_corrupt_record\").show().\n" + + "Instead, you can cache or save the parsed results and then send the same query.\n" + + "For example, val df = spark.read.schema(schema).csv(file).cache() and then\n" + + "df.filter($\"_corrupt_record\".isNotNull).count()." + ) + } + (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value val parser = new UnivocityParser( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index b5ed6e4636..0862c746ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -118,7 +118,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { throw new AnalysisException( "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" + "referenced columns only include the internal corrupt record column\n" + - s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For example:\n" + + s"(named _corrupt_record by default). For example:\n" + "spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" + "and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" + "Instead, you can cache or save the parsed results and then send the same query.\n" + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index be89141151..e439699605 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1203,4 +1203,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .csv(Seq("a").toDS()) checkAnswer(df, Row("a", null, "a")) } + + test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " + + "from a file") { + val columnNameOfCorruptRecord = "_corrupt_record" + val schema = new StructType() + .add("a", IntegerType) + .add("b", TimestampType) + .add(columnNameOfCorruptRecord, StringType) + // negative cases + val msg = intercept[AnalysisException] { + spark + .read + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema) + .csv(testFile(valueMalformedFile)) + .select(columnNameOfCorruptRecord) + .collect() + }.getMessage + assert(msg.contains("only include the internal corrupt record column")) + intercept[org.apache.spark.sql.catalyst.errors.TreeNodeException[_]] { + spark + .read + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema) + .csv(testFile(valueMalformedFile)) + .filter($"_corrupt_record".isNotNull) + .count() + } + // workaround + val df = spark + .read + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema) + .csv(testFile(valueMalformedFile)) + .cache() + assert(df.filter($"_corrupt_record".isNotNull).count() == 1) + assert(df.filter($"_corrupt_record".isNull).count() == 1) + checkAnswer( + df.select(columnNameOfCorruptRecord), + Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil + ) + } }