[SPARK-36489][SQL] Aggregate functions over no grouping keys, on tables with a single bucket, return multiple rows

### What changes were proposed in this pull request?

This PR fixes a bug in `DisableUnnecessaryBucketedScan`.
When running any aggregate function, without any grouping keys, on a table with a single bucket, multiple rows are returned.
This happens because the aggregate function satisfies the `AllTuples` distribution, no `Exchange` will be planned, and the bucketed scan will be disabled.

### Why are the changes needed?

Bug fixing. Aggregates over no grouping keys should return a single row.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new test in `DisableUnnecessaryBucketedScanSuite`.

Closes #33711 from IonutBoicuAms/fix-bug-disableunnecessarybucketedscan.

Authored-by: IonutBoicuAms <ionut.boicu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 2b665751d9)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
IonutBoicuAms 2021-08-12 15:22:38 +08:00 committed by Wenchen Fan
parent 3e3c33d2c9
commit 3b3eb6f8ea
2 changed files with 28 additions and 2 deletions

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.bucketing
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashClusteredDistribution}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashClusteredDistribution}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
@ -120,7 +120,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
private def hasInterestingPartition(plan: SparkPlan): Boolean = {
plan.requiredChildDistribution.exists {
case _: ClusteredDistribution | _: HashClusteredDistribution => true
case _: ClusteredDistribution | _: HashClusteredDistribution | AllTuples => true
case _ => false
}
}

View file

@ -258,4 +258,30 @@ abstract class DisableUnnecessaryBucketedScanSuite
}
}
}
test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows") {
withTable("t1") {
withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
sql(
"""
|CREATE TABLE t1 (`id` BIGINT, `event_date` DATE)
|USING PARQUET
|CLUSTERED BY (id)
|INTO 1 BUCKETS
|""".stripMargin)
sql(
"""
|INSERT INTO TABLE t1 VALUES(1.23, cast("2021-07-07" as date))
|""".stripMargin)
sql(
"""
|INSERT INTO TABLE t1 VALUES(2.28, cast("2021-08-08" as date))
|""".stripMargin)
val df = spark.sql("select sum(id) from t1 where id is not null")
assert(df.count == 1)
checkDisableBucketedScan(query = "SELECT SUM(id) FROM t1 WHERE id is not null",
expectedNumScanWithAutoScanEnabled = 1, expectedNumScanWithAutoScanDisabled = 1)
}
}
}
}