spark-instrumented-optimizer/sql/core
Bruce Robbins 7781c6fd73 [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables
## What changes were proposed in this pull request?

After [recent changes](11e5f1bcd4) to CSV parsing to return partial results for bad CSV records, queries of wide CSV tables slowed considerably. That recent change resulted in every row being recreated, even when the associated input record had no parsing issues and the user specified no corrupt record field in his/her schema.

The change to FailureSafeParser.scala also impacted queries against wide JSON tables as well.

In this PR, I propose that a row should be recreated only if columns need to be shifted due to the existence of a corrupt column field in the user-supplied schema. Otherwise, the code should use the row as-is (For CSV input, it will have values for the columns that could be converted, and also null values for columns that could not be converted).

See benchmarks below. The CSV benchmark for 1000 columns went from 120144 ms to 89069 ms, a savings of 25% (this only brings the cost down to baseline levels. Again, see benchmarks below).

Similarly, the JSON benchmark for 1000 columns (added in this PR) went from 109621 ms to 80871 ms, also a savings of 25%.

Still, partial results functionality is preserved:

<pre>
bash-3.2$ cat test2.csv
"hello",1999-08-01,"last"
"there","bad date","field"
"again","2017-11-22","in file"
bash-3.2$ bin/spark-shell
...etc...
scala> val df = spark.read.schema("a string, b date, c string").csv("test2.csv")
df: org.apache.spark.sql.DataFrame = [a: string, b: date ... 1 more field]
scala> df.show
+-----+----------+-------+
|    a|         b|      c|
+-----+----------+-------+
|hello|1999-08-01|   last|
|there|      null|  field|
|again|2017-11-22|in file|
+-----+----------+-------+
scala> val df = spark.read.schema("badRecord string, a string, b date, c string").
     | option("columnNameOfCorruptRecord", "badRecord").
     | csv("test2.csv")
df: org.apache.spark.sql.DataFrame = [badRecord: string, a: string ... 2 more fields]
scala> df.show
+--------------------+-----+----------+-------+
|           badRecord|    a|         b|      c|
+--------------------+-----+----------+-------+
|                null|hello|1999-08-01|   last|
|"there","bad date...|there|      null|  field|
|                null|again|2017-11-22|in file|
+--------------------+-----+----------+-------+
scala>
</pre>

### CSVBenchmark Benchmarks:

baseline = commit before partial results change
PR = this PR
master = master branch

[baseline_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697109/baseline_CSVBenchmark-results.txt)
[pr_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697110/pr_CSVBenchmark-results.txt)
[master_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697111/master_CSVBenchmark-results.txt)

### JSONBenchmark Benchmarks:

baseline = commit before partial results change
PR = this PR
master = master branch

[baseline_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711040/baseline_JSONBenchmark-results.txt)
[pr_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711041/pr_JSONBenchmark-results.txt)
[master_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711042/master_JSONBenchmark-results.txt)

## How was this patch tested?

- All SQL unit tests.
- Added 2 CSV benchmarks
- Python core and SQL tests

Closes #23336 from bersprockets/csv-wide-row-opt2.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2019-01-30 15:15:29 +08:00
..
benchmarks [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables 2019-01-30 15:15:29 +08:00
src [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables 2019-01-30 15:15:29 +08:00
pom.xml [SPARK-25956] Make Scala 2.12 as default Scala version in Spark 3.0 2018-11-14 16:22:23 -08:00