[SPARK-12727][SQL] support SQL generation for aggregate with multi-distinct
## What changes were proposed in this pull request? This PR add SQL generation support for aggregate with multi-distinct, by simply moving the `DistinctAggregationRewriter` rule to optimizer. More discussions are needed as this breaks an import contract: analyzed plan should be able to run without optimization. However, the `ComputeCurrentTime` rule has kind of broken it already, and I think maybe we should add a new phase for this kind of rules, because strictly speaking they don't belong to analysis and is coupled with the physical plan implementation. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #11579 from cloud-fan/distinct.
This commit is contained in:
parent
ad3c9a9730
commit
46881b4ea2
|
@ -91,7 +91,6 @@ class Analyzer(
|
|||
ExtractWindowExpressions ::
|
||||
GlobalAggregates ::
|
||||
ResolveAggregateFunctions ::
|
||||
DistinctAggregationRewriter(conf) ::
|
||||
HiveTypeCoercion.typeCoercionRules ++
|
||||
extendedResolutionRules : _*),
|
||||
Batch("Nondeterministic", Once,
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.spark.sql.catalyst.analysis
|
||||
|
||||
import org.apache.spark.sql.catalyst.CatalystConf
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan}
|
||||
|
@ -100,13 +99,10 @@ import org.apache.spark.sql.types.IntegerType
|
|||
* we could improve this in the current rule by applying more advanced expression cannocalization
|
||||
* techniques.
|
||||
*/
|
||||
case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] {
|
||||
object DistinctAggregationRewriter extends Rule[LogicalPlan] {
|
||||
|
||||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
|
||||
case p if !p.resolved => p
|
||||
// We need to wait until this Aggregate operator is resolved.
|
||||
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
|
||||
case a: Aggregate => rewrite(a)
|
||||
case p => p
|
||||
}
|
||||
|
||||
def rewrite(a: Aggregate): Aggregate = {
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
|
|||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable.HashSet
|
||||
|
||||
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubqueryAliases}
|
||||
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases}
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions.aggregate._
|
||||
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
|
||||
|
@ -42,7 +42,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
|
|||
// we do not eliminate subqueries or compute current time in the analyzer.
|
||||
Batch("Finish Analysis", Once,
|
||||
EliminateSubqueryAliases,
|
||||
ComputeCurrentTime) ::
|
||||
ComputeCurrentTime,
|
||||
DistinctAggregationRewriter) ::
|
||||
//////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Optimizer rules start here
|
||||
//////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -139,7 +139,6 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|
|||
""".stripMargin)
|
||||
}
|
||||
|
||||
|
||||
test("intersect") {
|
||||
checkHiveQl("SELECT * FROM t0 INTERSECT SELECT * FROM t0")
|
||||
}
|
||||
|
@ -367,9 +366,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|
|||
checkHiveQl("SELECT * FROM parquet_t0 TABLESAMPLE(0.1 PERCENT) WHERE 1=0")
|
||||
}
|
||||
|
||||
// TODO Enable this
|
||||
// Query plans transformed by DistinctAggregationRewriter are not recognized yet
|
||||
ignore("multi-distinct columns") {
|
||||
test("multi-distinct columns") {
|
||||
checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM parquet_t2 GROUP BY a")
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue