From 2b665751d9c7e4fb07ea18ce6611328e24f3dce9 Mon Sep 17 00:00:00 2001 From: IonutBoicuAms Date: Thu, 12 Aug 2021 15:22:38 +0800 Subject: [PATCH] [SPARK-36489][SQL] Aggregate functions over no grouping keys, on tables with a single bucket, return multiple rows ### What changes were proposed in this pull request? This PR fixes a bug in `DisableUnnecessaryBucketedScan`. When running any aggregate function, without any grouping keys, on a table with a single bucket, multiple rows are returned. This happens because the aggregate function satisfies the `AllTuples` distribution, no `Exchange` will be planned, and the bucketed scan will be disabled. ### Why are the changes needed? Bug fixing. Aggregates over no grouping keys should return a single row. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new test in `DisableUnnecessaryBucketedScanSuite`. Closes #33711 from IonutBoicuAms/fix-bug-disableunnecessarybucketedscan. Authored-by: IonutBoicuAms Signed-off-by: Wenchen Fan --- .../DisableUnnecessaryBucketedScan.scala | 4 +-- .../DisableUnnecessaryBucketedScanSuite.scala | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala index 98bcab2a83..5bd70c61fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.bucketing -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, HashClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashClusteredDistribution} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec @@ -120,7 +120,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] { private def hasInterestingPartition(plan: SparkPlan): Boolean = { plan.requiredChildDistribution.exists { - case _: ClusteredDistribution | _: HashClusteredDistribution => true + case _: ClusteredDistribution | _: HashClusteredDistribution | AllTuples => true case _ => false } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala index 1a19824a31..737cffc42f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala @@ -258,4 +258,30 @@ abstract class DisableUnnecessaryBucketedScanSuite } } } + + test("Aggregates with no groupby over tables having 1 BUCKET, return multiple rows") { + withTable("t1") { + withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") { + sql( + """ + |CREATE TABLE t1 (`id` BIGINT, `event_date` DATE) + |USING PARQUET + |CLUSTERED BY (id) + |INTO 1 BUCKETS + |""".stripMargin) + sql( + """ + |INSERT INTO TABLE t1 VALUES(1.23, cast("2021-07-07" as date)) + |""".stripMargin) + sql( + """ + |INSERT INTO TABLE t1 VALUES(2.28, cast("2021-08-08" as date)) + |""".stripMargin) + val df = spark.sql("select sum(id) from t1 where id is not null") + assert(df.count == 1) + checkDisableBucketedScan(query = "SELECT SUM(id) FROM t1 WHERE id is not null", + expectedNumScanWithAutoScanEnabled = 1, expectedNumScanWithAutoScanDisabled = 1) + } + } + } }