From cd2342691d1182b14f6076f69793441d2aa03e85 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 23 Aug 2021 15:31:13 -0700 Subject: [PATCH] [SPARK-34952][SQL][FOLLOWUP] Move aggregates to a separate package ### What changes were proposed in this pull request? Add `aggregate` package under `sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions` and move all the aggregates (e.g. `Count`, `Max`, `Min`, etc.) there. ### Why are the changes needed? Right now these aggregates are under `sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions`. It looks OK now, but we plan to add a new `filter` package under `expressions` for all the DSV2 filters. It will look strange that filters have their own package, but aggregates don't. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #33815 from huaxingao/agg_package. Authored-by: Huaxin Gao Signed-off-by: Liang-Chi Hsieh --- .../expressions/{ => aggregate}/AggregateFunc.java | 7 ++++--- .../connector/expressions/{ => aggregate}/Aggregation.java | 7 ++++--- .../sql/connector/expressions/{ => aggregate}/Count.java | 3 ++- .../connector/expressions/{ => aggregate}/CountStar.java | 2 +- .../sql/connector/expressions/{ => aggregate}/Max.java | 3 ++- .../sql/connector/expressions/{ => aggregate}/Min.java | 3 ++- .../sql/connector/expressions/{ => aggregate}/Sum.java | 3 ++- .../sql/connector/read/SupportsPushDownAggregates.java | 2 +- .../apache/spark/sql/execution/DataSourceScanExec.scala | 2 +- .../sql/execution/datasources/DataSourceStrategy.scala | 3 ++- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../spark/sql/execution/datasources/v2/PushDownUtils.scala | 3 ++- .../execution/datasources/v2/V2ScanRelationPushDown.scala | 2 +- .../execution/datasources/v2/jdbc/JDBCScanBuilder.scala | 2 +- 14 files changed, 26 insertions(+), 18 deletions(-) rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/{ => aggregate}/AggregateFunc.java (89%) rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/{ => aggregate}/Aggregation.java (91%) rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/{ => aggregate}/Count.java (92%) rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/{ => aggregate}/CountStar.java (94%) rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/{ => aggregate}/Max.java (91%) rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/{ => aggregate}/Min.java (91%) rename sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/{ => aggregate}/Sum.java (92%) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/AggregateFunc.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/AggregateFunc.java similarity index 89% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/AggregateFunc.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/AggregateFunc.java index eea8c3152e..6683f73f50 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/AggregateFunc.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/AggregateFunc.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.expressions; - -import org.apache.spark.annotation.Evolving; +package org.apache.spark.sql.connector.expressions.aggregate; import java.io.Serializable; +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Expression; + /** * Base class of the Aggregate Functions. * diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Aggregation.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Aggregation.java similarity index 91% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Aggregation.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Aggregation.java index 8eb3491ea1..039252348d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Aggregation.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Aggregation.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.expressions; - -import org.apache.spark.annotation.Evolving; +package org.apache.spark.sql.connector.expressions.aggregate; import java.io.Serializable; +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.FieldReference; + /** * Aggregation in SQL statement. * diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Count.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Count.java similarity index 92% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Count.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Count.java index fecde71d56..14493a4339 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Count.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Count.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.expressions; +package org.apache.spark.sql.connector.expressions.aggregate; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.FieldReference; /** * An aggregate function that returns the number of the specific row in a group. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/CountStar.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/CountStar.java similarity index 94% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/CountStar.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/CountStar.java index 8e799cd23e..f566ad164b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/CountStar.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/CountStar.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.expressions; +package org.apache.spark.sql.connector.expressions.aggregate; import org.apache.spark.annotation.Evolving; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Max.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Max.java similarity index 91% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Max.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Max.java index 3ce45cae91..985fd80552 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Max.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Max.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.expressions; +package org.apache.spark.sql.connector.expressions.aggregate; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.FieldReference; /** * An aggregate function that returns the maximum value in a group. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Min.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Min.java similarity index 91% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Min.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Min.java index 2449358f7c..7b7b557844 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Min.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Min.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.expressions; +package org.apache.spark.sql.connector.expressions.aggregate; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.FieldReference; /** * An aggregate function that returns the minimum value in a group. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Sum.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Sum.java similarity index 92% rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Sum.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Sum.java index 345194f27a..66ce436e70 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Sum.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Sum.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.connector.expressions; +package org.apache.spark.sql.connector.expressions.aggregate; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.FieldReference; /** * An aggregate function that returns the summation of all the values in a group. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java index 8ec9a2597a..3e643b5493 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector.read; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.expressions.Aggregation; +import org.apache.spark.sql.connector.expressions.aggregate.Aggregation; /** * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5d09d120e4..4f282edaf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 11d23f482f..7a5c343133 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -40,7 +40,8 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog.SupportsRead import org.apache.spark.sql.connector.catalog.TableCapability._ -import org.apache.spark.sql.connector.expressions.{AggregateFunc, Count, CountStar, FieldReference, Max, Min, Sum} +import org.apache.spark.sql.connector.expressions.FieldReference +import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Count, CountStar, Max, Min, Sum} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{InSubqueryExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index c575e95485..f26897d2d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -25,7 +25,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskCon import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.expressions.{AggregateFunc, Count, CountStar, Max, Min, Sum} +import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Count, CountStar, Max, Min, Sum} import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 6eedeba4d0..acc6457418 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -22,7 +22,8 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, NamedExpression, PredicateHelper, SchemaPruning} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.connector.expressions.{Aggregation, FieldReference} +import org.apache.spark.sql.connector.expressions.FieldReference +import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.execution.datasources.DataSourceStrategy diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 8b253da3d6..046155b55c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala index 89fa6213f5..b0de7c015c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala @@ -20,7 +20,7 @@ import scala.util.control.NonFatal import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.expressions.Aggregation +import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRDD, JDBCRelation}