spark-instrumented-optimizer/sql/catalyst
Maxim Gekk 4e50f0291f [SPARK-30323][SQL] Support filters pushdown in CSV datasource
### What changes were proposed in this pull request?

In the PR, I propose to support pushed down filters in CSV datasource. The reason of pushing a filter up to `UnivocityParser` is to apply the filter as soon as all its attributes become available i.e. converted from CSV fields to desired values according to the schema. This allows to skip conversions of other values if the filter returns `false`. This can improve performance when pushed filters are highly selective and conversion of CSV string fields to desired values are comparably expensive ( for example, conversion to `TIMESTAMP` values).

Here are details of the implementation:
- `UnivocityParser.convert()` converts parsed CSV tokens one-by-one sequentially starting from index 0 up to `parsedSchema.length - 1`. At current index `i`, it applies filters that refer to attributes at row fields indexes `0..i`. If any filter returns `false`, it skips conversions of other input tokens.
- Pushed filters are converted to expressions. The expressions are bound to row positions according to `requiredSchema`. The expressions are compiled to predicates via generating Java code.
- To be able to apply predicates to partially initialized rows, the predicates are grouped, and combined via the `And` expression. Final predicate at index `N` can refer to row fields at the positions `0..N`, and can be applied to a row even if other fields at the positions `N+1..requiredSchema.lenght-1` are not set.

### Why are the changes needed?
The changes improve performance on synthetic benchmarks more **than 9 times** (on JDK 8 & 11):
```
OpenJDK 64-Bit Server VM 11.0.5+10 on Mac OS X 10.15.2
Intel(R) Core(TM) i7-4850HQ CPU  2.30GHz
Filters pushdown:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
w/o filters                                       11889          11945          52          0.0      118893.1       1.0X
pushdown disabled                                 11790          11860         115          0.0      117902.3       1.0X
w/ filters                                         1240           1278          33          0.1       12400.8       9.6X
```

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
- Added new test suite `CSVFiltersSuite`
- Added tests to `CSVSuite` and `UnivocityParserSuite`

Closes #26973 from MaxGekk/csv-filters-pushdown.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-01-16 13:10:08 +09:00
..
benchmarks [SPARK-29300][TESTS] Compare catalyst and avro module benchmark in JDK8/11 2019-09-30 17:59:43 -07:00
src [SPARK-30323][SQL] Support filters pushdown in CSV datasource 2020-01-16 13:10:08 +09:00
pom.xml [INFRA] Reverts commit 56dcd79 and c216ef1 2019-12-16 19:57:44 -07:00