spark-instrumented-optimizer/sql
Cheng Su 2ff0032e01 [SPARK-34796][SQL] Initialize counter variable for LIMIT code-gen in doProduce()
### What changes were proposed in this pull request?

This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from `BaseLimitExec` is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. `ColumnarToRowExec` operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling `BaseLimitExec`'s `doConsume()`, e.g. [HashJoin.codegenInner](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L402). So if we have query that `LocalLimit - BroadcastHashJoin - FileScan` in the same stage, the whole stage code-gen compilation will be failed.

Here is an example:

```
  test("failed limit query") {
    withTable("left_table", "empty_right_table", "output_table") {
      spark.range(5).toDF("k").write.saveAsTable("left_table")
      spark.range(0).toDF("k").write.saveAsTable("empty_right_table")

      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        spark.sql("CREATE TABLE output_table (k INT) USING parquet")
        spark.sql(
          s"""
             |INSERT INTO TABLE output_table
             |SELECT t1.k FROM left_table t1
             |JOIN empty_right_table t2
             |ON t1.k = t2.k
             |LIMIT 3
             |""".stripMargin)
      }
    }
  }
```

Query plan:

```
Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable(
Database: default
Table: output_table
Created Time: Thu Mar 18 21:46:26 PDT 2021
Last Access: UNKNOWN
Created By: Spark 3.2.0-SNAPSHOT
Type: MANAGED
Provider: parquet
Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table
Schema: root
 |-- k: integer (nullable = true)
), org.apache.spark.sql.execution.datasources.InMemoryFileIndexb25d08b, [k]
+- *(3) Project [ansi_cast(k#228L as int) AS k#231]
   +- *(3) GlobalLimit 3
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#179]
         +- *(2) LocalLimit 3
            +- *(2) Project [k#228L]
               +- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false
                  :- *(2) Filter isnotnull(k#228L)
                  :  +- *(2) ColumnarToRow
                  :     +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
                  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#173]
                     +- *(1) Filter isnotnull(k#229L)
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
```

Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 .

The uninitialized variable `_limit_counter_1` from `LocalLimitExec` is referenced in `ColumnarToRowExec`, but `BroadcastHashJoinExec` does not call `LocalLimitExec.doConsume()` to initialize the counter variable.

The fix is to move the counter variable initialization to `doProduce()`, as in whole stage code-gen framework, `doProduce()` will definitely be called if upstream operators `doProduce()`/`doConsume()` is called.

Note: this only happens in AQE disabled case, because we have an AQE optimization rule [EliminateUnnecessaryJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala#L69) to change the whole query to an empty `LocalRelation` if inner join broadcast side is empty with AQE enabled.

### Why are the changes needed?

Fix query failure.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `SQLQuerySuite.scala`.

Closes #31892 from c21/limit-fix.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-20 11:20:52 +09:00
..
catalyst [SPARK-33122][SQL] Remove redundant aggregates in the Optimzier 2021-03-20 11:16:39 +09:00
core [SPARK-34796][SQL] Initialize counter variable for LIMIT code-gen in doProduce() 2021-03-20 11:20:52 +09:00
hive [SPARK-34772][SQL] RebaseDateTime loadRebaseRecords should use Spark classloader instead of context 2021-03-19 12:51:43 +08:00
hive-thriftserver [SPARK-34558][SQL][FOLLOWUP] Use final Hadoop conf to instantiate FileSystem in SharedState 2021-03-19 22:02:15 +08:00
create-docs.sh [SPARK-34010][SQL][DODCS] Use python3 instead of python in SQL documentation build 2021-01-05 19:48:10 +09:00
gen-sql-api-docs.py [SPARK-34747][SQL][DOCS] Add virtual operators to the built-in function document 2021-03-19 10:19:26 +09:00
gen-sql-config-docs.py [SPARK-31550][SQL][DOCS] Set nondeterministic configurations with general meanings in sql configuration doc 2020-04-27 17:08:52 +09:00
gen-sql-functions-docs.py [SPARK-31562][SQL] Update ExpressionDescription for substring, current_date, and current_timestamp 2020-04-26 11:46:52 -07:00
mkdocs.yml [SPARK-30731] Update deprecated Mkdocs option 2020-02-19 17:28:58 +09:00
README.md [SPARK-30510][SQL][DOCS] Publicly document Spark SQL configuration options 2020-02-09 19:20:47 +09:00

Spark SQL

This module provides support for executing relational queries expressed in either SQL or the DataFrame/Dataset API.

Spark SQL is broken up into four subprojects:

  • Catalyst (sql/catalyst) - An implementation-agnostic framework for manipulating trees of relational operators and expressions.
  • Execution (sql/core) - A query planner / execution engine for translating Catalyst's logical query plans into Spark RDDs. This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
  • Hive Support (sql/hive) - Includes extensions that allow users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes. There are also wrappers that allow users to run queries that include Hive UDFs, UDAFs, and UDTFs.
  • HiveServer and CLI support (sql/hive-thriftserver) - Includes support for the SQL CLI (bin/spark-sql) and a HiveServer2 (for JDBC/ODBC) compatible server.

Running ./sql/create-docs.sh generates SQL documentation for built-in functions under sql/site, and SQL configuration documentation that gets included as part of configuration.md in the main docs directory.