spark-instrumented-optimizer/sql
Liang-Chi Hsieh 19af298bb6 [SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level for parquet reader
## What changes were proposed in this pull request?

The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.

The benchmark that excludes the time of writing Parquet file:

    test("Benchmark for Parquet") {
      val N = 500 << 12
        withParquetTable((0 until N).map(i => (101, i)), "t") {
          val benchmark = new Benchmark("Parquet reader", N)
          benchmark.addCase("reading Parquet file", 10) { iter =>
            sql("SELECT _1 FROM t where t._1 < 100").collect()
          }
          benchmark.run()
      }
    }

`withParquetTable` in default will run tests for vectorized reader non-vectorized readers. I only let it run vectorized reader.

When we set the block size of parquet as 1024 to have multiple row groups. The benchmark is:

Before this patch:

The retrieved row groups: 8063

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           825 / 1233          2.5         402.6       1.0X

After this patch:

The retrieved row groups: 0

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           306 /  503          6.7         149.6       1.0X

Next, I run the benchmark for non-pushdown case using the same benchmark code but with disabled pushdown configuration. This time the parquet block size is default value.

Before this patch:

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           136 /  238         15.0          66.5       1.0X

After this patch:

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
    Intel(R) Core(TM) i7-5557U CPU  3.10GHz
    Parquet reader:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    reading Parquet file                           124 /  193         16.5          60.7       1.0X

For non-pushdown case, from the results, I think this patch doesn't affect normal code path.

I've manually output the `totalRowCount` in `SpecificParquetRecordReaderBase` to see if this patch actually filter the row-groups. When running the above benchmark:

After this patch:
    `totalRowCount = 0`

Before this patch:
    `totalRowCount = 1024000`

## How was this patch tested?
Existing tests should be passed.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #13701 from viirya/vectorized-reader-push-down-filter2.
2016-08-10 10:03:55 -07:00
..
catalyst [SPARK-10601][SQL] Support MINUS set operator 2016-08-10 10:31:30 +02:00
core [SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level for parquet reader 2016-08-10 10:03:55 -07:00
hive [SPARK-16866][SQL] Infrastructure for file-based SQL end-to-end tests 2016-08-10 17:17:21 +08:00
hive-thriftserver [SPARK-16563][SQL] fix spark sql thrift server FetchResults bug 2016-08-08 18:00:04 -07:00
README.md [SPARK-16557][SQL] Remove stale doc in sql/README.md 2016-07-14 19:24:42 -07:00

Spark SQL

This module provides support for executing relational queries expressed in either SQL or the DataFrame/Dataset API.

Spark SQL is broken up into four subprojects:

  • Catalyst (sql/catalyst) - An implementation-agnostic framework for manipulating trees of relational operators and expressions.
  • Execution (sql/core) - A query planner / execution engine for translating Catalyst's logical query plans into Spark RDDs. This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
  • Hive Support (sql/hive) - Includes an extension of SQLContext called HiveContext that allows users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes. There are also wrappers that allows users to run queries that include Hive UDFs, UDAFs, and UDTFs.
  • HiveServer and CLI support (sql/hive-thriftserver) - Includes support for the SQL CLI (bin/spark-sql) and a HiveServer2 (for JDBC/ODBC) compatible server.