[SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate col name and group by col name

### What changes were proposed in this pull request?
Normalize pushed down aggregate col names and group by col names ...

### Why are the changes needed?
to handle case sensitive col names

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Modify existing test

Closes #33739 from huaxingao/normalize.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 3f8ec0dae4)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Huaxin Gao 2021-08-13 22:31:21 -07:00 committed by Dongjoon Hyun
parent c898a940e2
commit ede1d1e9a7
2 changed files with 9 additions and 5 deletions

View file

@ -93,8 +93,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
agg
}
}
val pushedAggregates = PushDownUtils
.pushAggregates(sHolder.builder, aggregates, groupingExpressions)
val normalizedAggregates = DataSourceStrategy.normalizeExprs(
aggregates, sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]]
val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs(
groupingExpressions, sHolder.relation.output)
val pushedAggregates = PushDownUtils.pushAggregates(
sHolder.builder, normalizedAggregates, normalizedGroupingExpressions)
if (pushedAggregates.isEmpty) {
aggNode // return original plan node
} else {
@ -115,7 +119,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// scalastyle:on
val newOutput = scan.readSchema().toAttributes
assert(newOutput.length == groupingExpressions.length + aggregates.length)
val groupAttrs = groupingExpressions.zip(newOutput).map {
val groupAttrs = normalizedGroupingExpressions.zip(newOutput).map {
case (a: Attribute, b: Attribute) => b.withExprId(a.exprId)
case (_, b) => b
}

View file

@ -239,8 +239,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
}
test("scan with aggregate push-down: MAX MIN with filter and group by") {
val df = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" +
" group by DEPT")
val df = sql("select MAX(SaLaRY), MIN(BONUS) FROM h2.test.employee where dept > 0" +
" group by DePt")
val filters = df.queryExecution.optimizedPlan.collect {
case f: Filter => f
}