spark-instrumented-optimizer/sql/core/benchmarks/CSVBenchmark-jdk11-results.txt
Maxim Gekk 4e50f0291f [SPARK-30323][SQL] Support filters pushdown in CSV datasource
### What changes were proposed in this pull request?

In the PR, I propose to support pushed down filters in CSV datasource. The reason of pushing a filter up to `UnivocityParser` is to apply the filter as soon as all its attributes become available i.e. converted from CSV fields to desired values according to the schema. This allows to skip conversions of other values if the filter returns `false`. This can improve performance when pushed filters are highly selective and conversion of CSV string fields to desired values are comparably expensive ( for example, conversion to `TIMESTAMP` values).

Here are details of the implementation:
- `UnivocityParser.convert()` converts parsed CSV tokens one-by-one sequentially starting from index 0 up to `parsedSchema.length - 1`. At current index `i`, it applies filters that refer to attributes at row fields indexes `0..i`. If any filter returns `false`, it skips conversions of other input tokens.
- Pushed filters are converted to expressions. The expressions are bound to row positions according to `requiredSchema`. The expressions are compiled to predicates via generating Java code.
- To be able to apply predicates to partially initialized rows, the predicates are grouped, and combined via the `And` expression. Final predicate at index `N` can refer to row fields at the positions `0..N`, and can be applied to a row even if other fields at the positions `N+1..requiredSchema.lenght-1` are not set.

### Why are the changes needed?
The changes improve performance on synthetic benchmarks more **than 9 times** (on JDK 8 & 11):
```
OpenJDK 64-Bit Server VM 11.0.5+10 on Mac OS X 10.15.2
Intel(R) Core(TM) i7-4850HQ CPU  2.30GHz
Filters pushdown:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
w/o filters                                       11889          11945          52          0.0      118893.1       1.0X
pushdown disabled                                 11790          11860         115          0.0      117902.3       1.0X
w/ filters                                         1240           1278          33          0.1       12400.8       9.6X
```

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
- Added new test suite `CSVFiltersSuite`
- Added tests to `CSVSuite` and `UnivocityParserSuite`

Closes #26973 from MaxGekk/csv-filters-pushdown.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-16 13:10:08 +09:00

68 lines
6.2 KiB
Plaintext

================================================================================================
Benchmark to measure CSV read/write performance
================================================================================================
OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Parsing quoted values: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
One quoted string 44297 44515 373 0.0 885948.7 1.0X
OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Wide rows with 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Select 1000 columns 196720 197783 1560 0.0 196719.8 1.0X
Select 100 columns 46691 46861 219 0.0 46691.4 4.2X
Select one column 36811 36922 111 0.0 36811.3 5.3X
count() 8520 8610 106 0.1 8520.5 23.1X
Select 100 columns, one bad input field 67914 67994 136 0.0 67914.0 2.9X
Select 100 columns, corrupt record field 77272 77445 214 0.0 77272.0 2.5X
OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Count a dataset with 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Select 10 columns + count() 25965 26054 103 0.4 2596.5 1.0X
Select 1 column + count() 18591 18666 91 0.5 1859.1 1.4X
count() 6102 6119 18 1.6 610.2 4.3X
OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Create a dataset of timestamps 2142 2161 17 4.7 214.2 1.0X
to_csv(timestamp) 14744 14950 182 0.7 1474.4 0.1X
write timestamps to files 12078 12202 175 0.8 1207.8 0.2X
Create a dataset of dates 2275 2291 18 4.4 227.5 0.9X
to_csv(date) 11407 11464 51 0.9 1140.7 0.2X
write dates to files 7638 7702 90 1.3 763.8 0.3X
OpenJDK 64-Bit Server VM 11.0.5+10-post-Ubuntu-0ubuntu1.118.04 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
read timestamp text from files 2578 2590 10 3.9 257.8 1.0X
read timestamps from files 60103 60694 512 0.2 6010.3 0.0X
infer timestamps from files 107871 108268 351 0.1 10787.1 0.0X
read date text from files 2306 2310 4 4.3 230.6 1.1X
read date from files 47415 47657 367 0.2 4741.5 0.1X
infer date from files 35261 35447 164 0.3 3526.1 0.1X
timestamp strings 3045 3056 11 3.3 304.5 0.8X
parse timestamps from Dataset[String] 62221 63173 849 0.2 6222.1 0.0X
infer timestamps from Dataset[String] 118838 119629 697 0.1 11883.8 0.0X
date strings 3459 3481 19 2.9 345.9 0.7X
parse dates from Dataset[String] 51026 51447 503 0.2 5102.6 0.1X
from_csv(timestamp) 60738 61818 936 0.2 6073.8 0.0X
from_csv(date) 46012 46278 370 0.2 4601.2 0.1X
OpenJDK 64-Bit Server VM 11.0.5+10 on Mac OS X 10.15.2
Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
w/o filters 11889 11945 52 0.0 118893.1 1.0X
pushdown disabled 11790 11860 115 0.0 117902.3 1.0X
w/ filters 1240 1278 33 0.1 12400.8 9.6X