spark-instrumented-optimizer/sql
Liang-Chi Hsieh 77c7e91e02 [SPARK-28445][SQL][PYTHON] Fix error when PythonUDF is used in both group by and aggregate expression
## What changes were proposed in this pull request?

When PythonUDF is used in group by, and it is also in aggregate expression, like

```
SELECT pyUDF(a + 1), COUNT(b) FROM testData GROUP BY pyUDF(a + 1)
```

It causes analysis exception in `CheckAnalysis`, like
```
org.apache.spark.sql.AnalysisException: expression 'testdata.`a`' is neither present in the group by, nor is it an aggregate function.
```

First, `CheckAnalysis` can't check semantic equality between PythonUDFs.
Second, even we make it possible, runtime exception will be thrown

```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF1#8615
...
Cause: java.lang.RuntimeException: Couldn't find pythonUDF1#8615 in [cast(pythonUDF0#8614 as int)#8617,count(b#8599)#8607L]
```

The cause is, `ExtractPythonUDFs` extracts both PythonUDFs in group by and aggregate expression. The PythonUDFs are two different aliases now in the logical aggregate. In runtime, we can't bind the resulting expression in aggregate to its grouping and aggregate attributes.

This patch proposes a rule `ExtractGroupingPythonUDFFromAggregate` to extract PythonUDFs in group by and evaluate them before aggregate. We replace the group by PythonUDF in aggregate expression with aliased result.

The query plan of query `SELECT pyUDF(a + 1), pyUDF(COUNT(b)) FROM testData GROUP BY pyUDF(a + 1)`, like

```
== Optimized Logical Plan ==
Project [CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, cast(pythonUDF0#8616 as bigint) AS CAST(pyUDF(cast(count(b) as string)) AS BIGINT)#8610L]
+- BatchEvalPython [pyUDF(cast(agg#8613L as string))], [pythonUDF0#8616]
   +- Aggregate [cast(groupingPythonUDF#8614 as int)], [cast(groupingPythonUDF#8614 as int) AS CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, count(b#8599) AS agg#8613L]
      +- Project [pythonUDF0#8615 AS groupingPythonUDF#8614, b#8599]
         +- BatchEvalPython [pyUDF(cast((a#8598 + 1) as string))], [pythonUDF0#8615]
            +- LocalRelation [a#8598, b#8599]

== Physical Plan ==
*(3) Project [CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, cast(pythonUDF0#8616 as bigint) AS CAST(pyUDF(cast(count(b) as string)) AS BIGINT)#8610L]
+- BatchEvalPython [pyUDF(cast(agg#8613L as string))], [pythonUDF0#8616]
   +- *(2) HashAggregate(keys=[cast(groupingPythonUDF#8614 as int)#8617], functions=[count(b#8599)], output=[CAST(pyUDF(cast((a + 1) as string)) AS INT)#8608, agg#8613L])
      +- Exchange hashpartitioning(cast(groupingPythonUDF#8614 as int)#8617, 5), true
         +- *(1) HashAggregate(keys=[cast(groupingPythonUDF#8614 as int) AS cast(groupingPythonUDF#8614 as int)#8617], functions=[partial_count(b#8599)], output=[cast(groupingPythonUDF#8614 as int)#8617, count#8619L])
            +- *(1) Project [pythonUDF0#8615 AS groupingPythonUDF#8614, b#8599]
               +- BatchEvalPython [pyUDF(cast((a#8598 + 1) as string))], [pythonUDF0#8615]
                  +- LocalTableScan [a#8598, b#8599]
```

## How was this patch tested?

Added tests.

Closes #25215 from viirya/SPARK-28445.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-08-02 19:47:29 +09:00
..
catalyst [SPARK-28445][SQL][PYTHON] Fix error when PythonUDF is used in both group by and aggregate expression 2019-08-02 19:47:29 +09:00
core [SPARK-28445][SQL][PYTHON] Fix error when PythonUDF is used in both group by and aggregate expression 2019-08-02 19:47:29 +09:00
hive [SPARK-25474][SQL] Support spark.sql.statistics.fallBackToHdfs in data source tables 2019-07-28 15:35:37 -07:00
hive-thriftserver [SPARK-28549][BUILD][CORE][SQL] Use text.StringEscapeUtils instead lang3.StringEscapeUtils 2019-07-29 11:45:29 +09:00
create-docs.sh
gen-sql-markdown.py [SPARK-27328][SQL] Add 'deprecated' in ExpressionDescription for extended usage and SQL doc 2019-04-09 13:49:42 +08:00
mkdocs.yml
README.md [SPARK-28473][DOC] Stylistic consistency of build command in README 2019-07-23 16:29:46 -07:00

Spark SQL

This module provides support for executing relational queries expressed in either SQL or the DataFrame/Dataset API.

Spark SQL is broken up into four subprojects:

  • Catalyst (sql/catalyst) - An implementation-agnostic framework for manipulating trees of relational operators and expressions.
  • Execution (sql/core) - A query planner / execution engine for translating Catalyst's logical query plans into Spark RDDs. This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
  • Hive Support (sql/hive) - Includes an extension of SQLContext called HiveContext that allows users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes. There are also wrappers that allow users to run queries that include Hive UDFs, UDAFs, and UDTFs.
  • HiveServer and CLI support (sql/hive-thriftserver) - Includes support for the SQL CLI (bin/spark-sql) and a HiveServer2 (for JDBC/ODBC) compatible server.

Running ./sql/create-docs.sh generates SQL documentation for built-in functions under sql/site.