c1f160e097
### What changes were proposed in this pull request? In the PR, I propose to support pushed down filters in JSON datasource. The reason of pushing a filter up to `JacksonParser` is to apply the filter as soon as all its attributes become available i.e. converted from JSON field values to desired values according to the schema. This allows to skip parsing of the rest of JSON record and conversions of other values if the filter returns `false`. This can improve performance when pushed filters are highly selective and conversion of JSON string fields to desired values are comparably expensive ( for example, the conversion to `TIMESTAMP` values). The main idea behind of `JsonFilters` is to group pushdown filters by their references, convert the grouped filters to expressions, and then compile to predicates. The predicates are indexed by schema field positions. Each predicate has a state with reference counter to non-set row fields. As soon as the counter reaches `0`, it can be applied to the row because all its dependencies has been set. Before processing new row, predicate's reference counter is reset to total number of predicate references (dependencies in a row). The common code shared between `CSVFilters` and `JsonFilters` is moved to the `StructFilters` class and its companion object. ### Why are the changes needed? The changes improve performance on synthetic benchmarks up to **27 times** on JDK 8 and **25** times on JDK 11: ``` OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 2.50GHz Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ w/o filters 25230 25255 22 0.0 252299.6 1.0X pushdown disabled 25248 25282 33 0.0 252475.6 1.0X w/ filters 905 911 8 0.1 9047.9 27.9X ``` ### Does this PR introduce any user-facing change? No ### How was this patch tested? - Added new test suites `JsonFiltersSuite` and `JacksonParserSuite`. - By new end-to-end and case sensitivity tests in `JsonSuite`. - By `CSVFiltersSuite`, `UnivocityParserSuite` and `CSVSuite`. - Re-running `CSVBenchmark` and `JsonBenchmark` using Amazon EC2: | Item | Description | | ---- | ----| | Region | us-west-2 (Oregon) | | Instance | r3.xlarge (spot instance) | | AMI | ami-06f2f779464715dc5 (ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1) | | Java | OpenJDK8/11 installed by`sudo add-apt-repository ppa:openjdk-r/ppa` & `sudo apt install openjdk-11-jdk`| and `./dev/run-benchmarks`: ```python #!/usr/bin/env python3 import os from sparktestsupport.shellutils import run_cmd benchmarks = [ ['sql/test', 'org.apache.spark.sql.execution.datasources.csv.CSVBenchmark'], ['sql/test', 'org.apache.spark.sql.execution.datasources.json.JsonBenchmark'] ] print('Set SPARK_GENERATE_BENCHMARK_FILES=1') os.environ['SPARK_GENERATE_BENCHMARK_FILES'] = '1' for b in benchmarks: print("Run benchmark: %s" % b[1]) run_cmd(['build/sbt', '%s:runMain %s' % (b[0], b[1])]) ``` Closes #27366 from MaxGekk/json-filters-pushdown. Lead-authored-by: Maxim Gekk <max.gekk@gmail.com> Co-authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> |
||
---|---|---|
.. | ||
benchmarks | ||
src | ||
v1.2/src | ||
v2.3/src | ||
pom.xml |