[SPARK-32614][SQL] Don't apply comment processing if 'comment' unset for CSV
### What changes were proposed in this pull request? Spark's CSV source can optionally ignore lines starting with a comment char. Some code paths check to see if it's set before applying comment logic (i.e. not set to default of `\0`), but many do not, including the one that passes the option to Univocity. This means that rows beginning with a null char were being treated as comments even when 'disabled'. ### Why are the changes needed? To avoid dropping rows that start with a null char when this is not requested or intended. See JIRA for an example. ### Does this PR introduce _any_ user-facing change? Nothing beyond the effect of the bug fix. ### How was this patch tested? Existing tests plus new test case. Closes #29516 from srowen/SPARK-32614. Authored-by: Sean Owen <srowen@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
2feab4ef4f
commit
a9d4e60a90
|
@ -200,7 +200,7 @@ stream/2.9.6//stream-2.9.6.jar
|
|||
stringtemplate/3.2.1//stringtemplate-3.2.1.jar
|
||||
super-csv/2.2.0//super-csv-2.2.0.jar
|
||||
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
|
||||
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
|
||||
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
|
||||
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
|
||||
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
|
||||
xml-apis/1.4.01//xml-apis-1.4.01.jar
|
||||
|
|
|
@ -213,7 +213,7 @@ stream/2.9.6//stream-2.9.6.jar
|
|||
super-csv/2.2.0//super-csv-2.2.0.jar
|
||||
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
|
||||
transaction-api/1.1//transaction-api-1.1.jar
|
||||
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
|
||||
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
|
||||
velocity/1.5//velocity-1.5.jar
|
||||
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
|
||||
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
|
||||
|
|
|
@ -229,7 +229,7 @@ super-csv/2.2.0//super-csv-2.2.0.jar
|
|||
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
|
||||
token-provider/1.0.1//token-provider-1.0.1.jar
|
||||
transaction-api/1.1//transaction-api-1.1.jar
|
||||
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
|
||||
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
|
||||
velocity/1.5//velocity-1.5.jar
|
||||
woodstox-core/5.0.3//woodstox-core-5.0.3.jar
|
||||
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -2348,7 +2348,7 @@
|
|||
<dependency>
|
||||
<groupId>com.univocity</groupId>
|
||||
<artifactId>univocity-parsers</artifactId>
|
||||
<version>2.8.3</version>
|
||||
<version>2.9.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
|
|
|
@ -25,8 +25,13 @@ object CSVExprUtils {
|
|||
* This is currently being used in CSV reading path and CSV schema inference.
|
||||
*/
|
||||
def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
|
||||
iter.filter { line =>
|
||||
line.trim.nonEmpty && !line.startsWith(options.comment.toString)
|
||||
if (options.isCommentSet) {
|
||||
val commentPrefix = options.comment.toString
|
||||
iter.filter { line =>
|
||||
line.trim.nonEmpty && !line.startsWith(commentPrefix)
|
||||
}
|
||||
} else {
|
||||
iter.filter(_.trim.nonEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,7 +39,7 @@ object CSVExprUtils {
|
|||
if (options.isCommentSet) {
|
||||
val commentPrefix = options.comment.toString
|
||||
iter.dropWhile { line =>
|
||||
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
|
||||
line.trim.isEmpty || line.startsWith(commentPrefix)
|
||||
}
|
||||
} else {
|
||||
iter.dropWhile(_.trim.isEmpty)
|
||||
|
|
|
@ -220,7 +220,9 @@ class CSVOptions(
|
|||
format.setQuote(quote)
|
||||
format.setQuoteEscape(escape)
|
||||
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
|
||||
format.setComment(comment)
|
||||
if (isCommentSet) {
|
||||
format.setComment(comment)
|
||||
}
|
||||
lineSeparatorInWrite.foreach(format.setLineSeparator)
|
||||
|
||||
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
|
||||
|
@ -242,7 +244,11 @@ class CSVOptions(
|
|||
format.setQuoteEscape(escape)
|
||||
lineSeparator.foreach(format.setLineSeparator)
|
||||
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
|
||||
format.setComment(comment)
|
||||
if (isCommentSet) {
|
||||
format.setComment(comment)
|
||||
} else {
|
||||
settings.setCommentProcessingEnabled(false)
|
||||
}
|
||||
|
||||
settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
|
||||
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
|
||||
|
|
|
@ -1902,25 +1902,26 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
|
|||
|
||||
test("SPARK-25387: bad input should not cause NPE") {
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
|
||||
val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))
|
||||
|
||||
checkAnswer(spark.read.schema(schema).csv(input), Row(null))
|
||||
checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null))
|
||||
assert(spark.read.csv(input).collect().toSet == Set(Row()))
|
||||
assert(spark.read.schema(schema).csv(input).collect().toSet == Set(Row(null)))
|
||||
}
|
||||
|
||||
test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") {
|
||||
val schema = StructType(
|
||||
StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil)
|
||||
val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
|
||||
val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))
|
||||
|
||||
checkAnswer(
|
||||
spark.read
|
||||
.option("columnNameOfCorruptRecord", "_corrupt_record")
|
||||
.schema(schema)
|
||||
.csv(input),
|
||||
Row(null, null))
|
||||
assert(spark.read.csv(input).collect().toSet == Set(Row()))
|
||||
Row(null, "\u0001\u0000\u0001234"))
|
||||
assert(spark.read.schema(schema).csv(input).collect().toSet ==
|
||||
Set(Row(null, "\u0001\u0000\u0001234")))
|
||||
}
|
||||
|
||||
test("field names of inferred schema shouldn't compare to the first row") {
|
||||
|
@ -2366,6 +2367,17 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-32614: don't treat rows starting with null char as comment") {
|
||||
withTempPath { path =>
|
||||
Seq("\u0000foo", "bar", "baz").toDS.write.text(path.getCanonicalPath)
|
||||
val df = spark.read.format("csv")
|
||||
.option("header", "false")
|
||||
.option("inferSchema", "true")
|
||||
.load(path.getCanonicalPath)
|
||||
assert(df.count() == 3)
|
||||
}
|
||||
}
|
||||
|
||||
test("case sensitivity of filters references") {
|
||||
Seq(true, false).foreach { filterPushdown =>
|
||||
withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) {
|
||||
|
|
Loading…
Reference in a new issue