[SPARK-36489][SQL] Aggregate functions over no grouping keys, on tables with a single bucket, return multiple rows
### What changes were proposed in this pull request? This PR fixes a bug in `DisableUnnecessaryBucketedScan`. When running any aggregate function, without any grouping keys, on a table with a single bucket, multiple rows are returned. This happens because the aggregate function satisfies the `AllTuples` distribution, no `Exchange` will be planned, and the bucketed scan will be disabled. ### Why are the changes needed? Bug fixing. Aggregates over no grouping keys should return a single row. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new test in `DisableUnnecessaryBucketedScanSuite`. Closes #33711 from IonutBoicuAms/fix-bug-disableunnecessarybucketedscan. Authored-by: IonutBoicuAms <ionut.boicu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
00a4364f38
commit
2b665751d9
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.execution.bucketing
|
package org.apache.spark.sql.execution.bucketing
|
||||||
|
|
||||||
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashClusteredDistribution}
|
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashClusteredDistribution}
|
||||||
import org.apache.spark.sql.catalyst.rules.Rule
|
import org.apache.spark.sql.catalyst.rules.Rule
|
||||||
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan}
|
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan}
|
||||||
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
|
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
|
||||||
|
@ -120,7 +120,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
|
||||||
|
|
||||||
private def hasInterestingPartition(plan: SparkPlan): Boolean = {
|
private def hasInterestingPartition(plan: SparkPlan): Boolean = {
|
||||||
plan.requiredChildDistribution.exists {
|
plan.requiredChildDistribution.exists {
|
||||||
case _: ClusteredDistribution | _: HashClusteredDistribution => true
|
case _: ClusteredDistribution | _: HashClusteredDistribution | AllTuples => true
|
||||||
case _ => false
|
case _ => false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,4 +258,30 @@ abstract class DisableUnnecessaryBucketedScanSuite
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows") {
|
||||||
|
withTable("t1") {
|
||||||
|
withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
|
||||||
|
sql(
|
||||||
|
"""
|
||||||
|
|CREATE TABLE t1 (`id` BIGINT, `event_date` DATE)
|
||||||
|
|USING PARQUET
|
||||||
|
|CLUSTERED BY (id)
|
||||||
|
|INTO 1 BUCKETS
|
||||||
|
|""".stripMargin)
|
||||||
|
sql(
|
||||||
|
"""
|
||||||
|
|INSERT INTO TABLE t1 VALUES(1.23, cast("2021-07-07" as date))
|
||||||
|
|""".stripMargin)
|
||||||
|
sql(
|
||||||
|
"""
|
||||||
|
|INSERT INTO TABLE t1 VALUES(2.28, cast("2021-08-08" as date))
|
||||||
|
|""".stripMargin)
|
||||||
|
val df = spark.sql("select sum(id) from t1 where id is not null")
|
||||||
|
assert(df.count == 1)
|
||||||
|
checkDisableBucketedScan(query = "SELECT SUM(id) FROM t1 WHERE id is not null",
|
||||||
|
expectedNumScanWithAutoScanEnabled = 1, expectedNumScanWithAutoScanDisabled = 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue