spark-instrumented-optimizer/sql
Takeshi Yamamuro 95073fb62b [SPARK-29008][SQL] Define an individual method for each common subexpression in HashAggregateExec
### What changes were proposed in this pull request?

This pr proposes to define an individual method for each common subexpression in HashAggregateExec. In the current master, the common subexpr elimination code in HashAggregateExec is expanded in a single method; 4664a082c2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala (L397)

The method size can be too big for JIT compilation, so I believe splitting it is beneficial for performance. For example, in a query `SELECT SUM(a + b), AVG(a + b + c) FROM VALUES (1, 1, 1) t(a, b, c)`,

the current master generates;
```
/* 098 */   private void agg_doConsume_0(InternalRow localtablescan_row_0, int agg_expr_0_0, int agg_expr_1_0, int agg_expr_2_0) throws java.io.IOException {
/* 099 */     // do aggregate
/* 100 */     // common sub-expressions
/* 101 */     int agg_value_6 = -1;
/* 102 */
/* 103 */     agg_value_6 = agg_expr_0_0 + agg_expr_1_0;
/* 104 */
/* 105 */     int agg_value_5 = -1;
/* 106 */
/* 107 */     agg_value_5 = agg_value_6 + agg_expr_2_0;
/* 108 */     boolean agg_isNull_4 = false;
/* 109 */     long agg_value_4 = -1L;
/* 110 */     if (!false) {
/* 111 */       agg_value_4 = (long) agg_value_5;
/* 112 */     }
/* 113 */     int agg_value_10 = -1;
/* 114 */
/* 115 */     agg_value_10 = agg_expr_0_0 + agg_expr_1_0;
/* 116 */     // evaluate aggregate functions and update aggregation buffers
/* 117 */     agg_doAggregate_sum_0(agg_value_10);
/* 118 */     agg_doAggregate_avg_0(agg_value_4, agg_isNull_4);
/* 119 */
/* 120 */   }
```

On the other hand, this pr generates;
```
/* 121 */   private void agg_doConsume_0(InternalRow localtablescan_row_0, int agg_expr_0_0, int agg_expr_1_0, int agg_expr_2_0) throws java.io.IOException {
/* 122 */     // do aggregate
/* 123 */     // common sub-expressions
/* 124 */     long agg_subExprValue_0 = agg_subExpr_0(agg_expr_2_0, agg_expr_0_0, agg_expr_1_0);
/* 125 */     int agg_subExprValue_1 = agg_subExpr_1(agg_expr_0_0, agg_expr_1_0);
/* 126 */     // evaluate aggregate functions and update aggregation buffers
/* 127 */     agg_doAggregate_sum_0(agg_subExprValue_1);
/* 128 */     agg_doAggregate_avg_0(agg_subExprValue_0);
/* 129 */
/* 130 */   }
```

I run some micro benchmarks for this pr;
```
(base) maropu~:$system_profiler SPHardwareDataType
Hardware:
    Hardware Overview:
      Processor Name: Intel Core i5
      Processor Speed: 2 GHz
      Number of Processors: 1
      Total Number of Cores: 2
      L2 Cache (per Core): 256 KB
      L3 Cache: 4 MB
      Memory: 8 GB

(base) maropu~:$java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

(base) maropu~:$ /bin/spark-shell --master=local[1] --conf spark.driver.memory=8g --conf spark.sql.shurtitions=1 -v

val numCols = 40
val colExprs = "id AS key" +: (0 until numCols).map { i => s"id AS _c$i" }
spark.range(3000000).selectExpr(colExprs: _*).createOrReplaceTempView("t")

val aggExprs = (2 until numCols).map { i =>
  (0 until i).map(d => s"_c$d")
    .mkString("AVG(", " + ", ")")
}

// Drops the time of a first run then pick that of a second run
timer { sql(s"SELECT ${aggExprs.mkString(", ")} FROM t").write.format("noop").save() }

// the master
maxCodeGen: 12957
Elapsed time: 36.309858661s

// this pr
maxCodeGen=4184
Elapsed time: 2.399490285s
```

### Why are the changes needed?

To avoid the too-long-function issue in JVMs.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests in `WholeStageCodegenSuite`

Closes #25710 from maropu/SplitSubexpr.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-09-17 11:09:55 +09:00
..
catalyst [SPARK-29008][SQL] Define an individual method for each common subexpression in HashAggregateExec 2019-09-17 11:09:55 +09:00
core [SPARK-29008][SQL] Define an individual method for each common subexpression in HashAggregateExec 2019-09-17 11:09:55 +09:00
hive [SPARK-26929][SQL] fix table owner use user instead of principal when create table through spark-sql or beeline 2019-09-16 11:07:50 -07:00
hive-thriftserver [SPARK-29056] ThriftServerSessionPage displays 1970/01/01 finish and close time when unset 2019-09-13 09:13:57 -07:00
create-docs.sh
gen-sql-markdown.py [SPARK-27328][SQL] Add 'deprecated' in ExpressionDescription for extended usage and SQL doc 2019-04-09 13:49:42 +08:00
mkdocs.yml
README.md [SPARK-28980][CORE][SQL][STREAMING][MLLIB] Remove most items deprecated in Spark 2.2.0 or earlier, for Spark 3 2019-09-09 10:19:40 -05:00

Spark SQL

This module provides support for executing relational queries expressed in either SQL or the DataFrame/Dataset API.

Spark SQL is broken up into four subprojects:

  • Catalyst (sql/catalyst) - An implementation-agnostic framework for manipulating trees of relational operators and expressions.
  • Execution (sql/core) - A query planner / execution engine for translating Catalyst's logical query plans into Spark RDDs. This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
  • Hive Support (sql/hive) - Includes extensions that allow users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes. There are also wrappers that allow users to run queries that include Hive UDFs, UDAFs, and UDTFs.
  • HiveServer and CLI support (sql/hive-thriftserver) - Includes support for the SQL CLI (bin/spark-sql) and a HiveServer2 (for JDBC/ODBC) compatible server.

Running ./sql/create-docs.sh generates SQL documentation for built-in functions under sql/site.