From ede1d1e9a7a6498f09b3d14704432b2603a2951f Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 13 Aug 2021 22:31:21 -0700 Subject: [PATCH] [SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate col name and group by col name ### What changes were proposed in this pull request? Normalize pushed down aggregate col names and group by col names ... ### Why are the changes needed? to handle case sensitive col names ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modify existing test Closes #33739 from huaxingao/normalize. Authored-by: Huaxin Gao Signed-off-by: Dongjoon Hyun (cherry picked from commit 3f8ec0dae4ddfd7ee55370dad5d44d03a9f10387) Signed-off-by: Dongjoon Hyun --- .../datasources/v2/V2ScanRelationPushDown.scala | 10 +++++++--- .../scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index ab5a0feb62..8b253da3d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -93,8 +93,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { agg } } - val pushedAggregates = PushDownUtils - .pushAggregates(sHolder.builder, aggregates, groupingExpressions) + val normalizedAggregates = DataSourceStrategy.normalizeExprs( + aggregates, sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]] + val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs( + groupingExpressions, sHolder.relation.output) + val pushedAggregates = PushDownUtils.pushAggregates( + sHolder.builder, normalizedAggregates, normalizedGroupingExpressions) if (pushedAggregates.isEmpty) { aggNode // return original plan node } else { @@ -115,7 +119,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // scalastyle:on val newOutput = scan.readSchema().toAttributes assert(newOutput.length == groupingExpressions.length + aggregates.length) - val groupAttrs = groupingExpressions.zip(newOutput).map { + val groupAttrs = normalizedGroupingExpressions.zip(newOutput).map { case (a: Attribute, b: Attribute) => b.withExprId(a.exprId) case (_, b) => b } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 37bc35210e..526dad91e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -239,8 +239,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } test("scan with aggregate push-down: MAX MIN with filter and group by") { - val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" + - " group by DEPT") + val df = sql("select MAX(SaLaRY), MIN(BONUS) FROM h2.test.employee where dept > 0" + + " group by DePt") val filters = df.queryExecution.optimizedPlan.collect { case f: Filter => f }