From cfc80d0eb18e1ec2866204da3500acd5f4dde2ea Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 30 Oct 2019 18:07:34 -0700 Subject: [PATCH] [SPARK-29277][SQL] Add early DSv2 filter and projection pushdown ### What changes were proposed in this pull request? This adds a new rule, `V2ScanRelationPushDown`, to push filters and projections in to a new `DataSourceV2ScanRelation` in the optimizer. That scan is then used when converting to a physical scan node. The new relation correctly reports stats based on the scan. To run scan pushdown before rules where stats are used, this adds a new optimizer override, `earlyScanPushDownRules` and a batch for early pushdown in the optimizer, before cost-based join reordering. The other early pushdown rule, `PruneFileSourcePartitions`, is moved into the early pushdown rule set. This also moves pushdown helper methods from `DataSourceV2Strategy` into a util class. ### Why are the changes needed? This is needed for DSv2 sources to supply stats for cost-based rules in the optimizer. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? This updates the implementation of stats from `DataSourceV2Relation` so tests will fail if stats are accessed before early pushdown for v2 relations. Closes #25955 from rdblue/move-v2-pushdown. Authored-by: Ryan Blue Signed-off-by: Ryan Blue --- .../sql/catalyst/analysis/Analyzer.scala | 14 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 14 ++ .../sql/catalyst/optimizer/Optimizer.scala | 14 +- .../catalyst/plans/logical/v2Commands.scala | 6 +- .../sql/connector/catalog/CatalogV2Util.scala | 7 +- .../datasources/v2/DataSourceV2Relation.scala | 52 +++++++- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../spark/sql/execution/SparkOptimizer.scala | 12 +- .../datasources/v2/DataSourceV2Strategy.scala | 121 ++---------------- .../datasources/v2/PushDownUtils.scala | 103 +++++++++++++++ .../v2/V2ScanRelationPushDown.scala | 66 ++++++++++ .../spark/sql/FileBasedDataSourceSuite.scala | 10 +- .../sql/connector/DataSourceV2Suite.scala | 8 +- .../execution/datasources/orc/OrcTest.scala | 11 +- .../orc/OrcV2SchemaPruningSuite.scala | 2 +- .../parquet/ParquetFilterSuite.scala | 11 +- .../datasources/orc/OrcFilterSuite.scala | 10 +- 17 files changed, 304 insertions(+), 161 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d92987887b..de8f3e2521 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -681,10 +681,18 @@ class Analyzer( .map(v2Relation => i.copy(table = v2Relation)) .getOrElse(i) + case desc @ DescribeTable(u: UnresolvedV2Relation, _) => + CatalogV2Util.loadRelation(u.catalog, u.tableName) + .map(rel => desc.copy(table = rel)) + .getOrElse(desc) + + case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) => + CatalogV2Util.loadRelation(u.catalog, u.tableName) + .map(rel => alter.copy(table = rel)) + .getOrElse(alter) + case u: UnresolvedV2Relation => - CatalogV2Util.loadTable(u.catalog, u.tableName).map { table => - DataSourceV2Relation.create(table) - }.getOrElse(u) + CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d9dc9ebbca..72612d1dc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -104,6 +104,20 @@ trait CheckAnalysis extends PredicateHelper { case u: UnresolvedV2Relation => u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) => + u.failAnalysis( + s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") + + case AlterTable(_, _, u: UnresolvedV2Relation, _) => + failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + + case DescribeTable(u: UnresolvedV2Relation, _) if isView(u.originalNameParts) => + u.failAnalysis( + s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") + + case DescribeTable(u: UnresolvedV2Relation, _) => + failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + case operator: LogicalPlan => // Check argument data types of higher-order functions downwards first. // If the arguments of the higher-order functions are resolved but the type check fails, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index eab4c3efe4..85b65edbb6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -119,7 +119,7 @@ abstract class Optimizer(catalogManager: CatalogManager) rulesWithoutInferFiltersFromConstraints: _*) :: Nil } - (Batch("Eliminate Distinct", Once, EliminateDistinct) :: + val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), @@ -170,6 +170,10 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ + // This batch pushes filters and projections into scan nodes. Before this batch, the logical + // plan may contain nodes that do not report stats. Anything that uses stats must run after + // this batch. + Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+ // Since join costs in AQP can change between multiple runs, there is no reason that we have an // idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once. Batch("Join Reorder", FixedPoint(1), @@ -196,6 +200,9 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveNoopOperators) :+ // This batch must be executed after the `RewriteSubquery` batch, which creates joins. Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) + + // remove any batches with no rules. this may happen when subclasses do not add optional rules. + batches.filter(_.rules.nonEmpty) } /** @@ -253,6 +260,11 @@ abstract class Optimizer(catalogManager: CatalogManager) */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + /** + * Override to provide additional rules for early projection and filter pushdown to scans. + */ + def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil + /** * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that * eventually run in the Optimizer. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index f587ee2928..3c625e9acb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -271,7 +271,7 @@ case class ShowNamespaces( */ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command { - override def children: Seq[LogicalPlan] = Seq(table) + override lazy val resolved: Boolean = table.resolved override def output: Seq[Attribute] = DescribeTableSchema.describeTableAttributes() } @@ -313,9 +313,7 @@ case class AlterTable( table: NamedRelation, changes: Seq[TableChange]) extends Command { - override def children: Seq[LogicalPlan] = Seq(table) - - override lazy val resolved: Boolean = childrenResolved && { + override lazy val resolved: Boolean = table.resolved && { changes.forall { case add: AddColumn => add.fieldNames match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 6d8c6f8456..0f313e7b96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -24,9 +24,10 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} +import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} import org.apache.spark.sql.catalyst.plans.logical.AlterTable import org.apache.spark.sql.connector.catalog.TableChange._ +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} private[sql] object CatalogV2Util { @@ -224,6 +225,10 @@ private[sql] object CatalogV2Util { case _: NoSuchNamespaceException => None } + def loadRelation(catalog: CatalogPlugin, ident: Identifier): Option[NamedRelation] = { + loadTable(catalog, ident).map(DataSourceV2Relation.create) + } + def isSessionCatalog(catalog: CatalogPlugin): Boolean = { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 7da502fc29..87d3419e81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2S import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils /** * A logical plan representing a data source v2 table. @@ -50,12 +51,53 @@ case class DataSourceV2Relation( s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" } - def newScanBuilder(): ScanBuilder = { - table.asReadable.newScanBuilder(options) + override def computeStats(): Statistics = { + if (Utils.isTesting) { + // when testing, throw an exception if this computeStats method is called because stats should + // not be accessed before pushing the projection and filters to create a scan. otherwise, the + // stats are not accurate because they are based on a full table scan of all columns. + throw new IllegalStateException( + s"BUG: computeStats called before pushdown on DSv2 relation: $name") + } else { + // when not testing, return stats because bad stats are better than failing a query + table.asReadable.newScanBuilder(options) match { + case r: SupportsReportStatistics => + val statistics = r.estimateStatistics() + DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes) + case _ => + Statistics(sizeInBytes = conf.defaultSizeInBytes) + } + } + } + + override def newInstance(): DataSourceV2Relation = { + copy(output = output.map(_.newInstance())) + } +} + +/** + * A logical plan for a DSv2 table with a scan already created. + * + * This is used in the optimizer to push filters and projection down before conversion to physical + * plan. This ensures that the stats that are used by the optimizer account for the filters and + * projection that will be pushed down. + * + * @param table a DSv2 [[Table]] + * @param scan a DSv2 [[Scan]] + * @param output the output attributes of this relation + */ +case class DataSourceV2ScanRelation( + table: Table, + scan: Scan, + output: Seq[AttributeReference]) extends LeafNode with NamedRelation { + + override def name: String = table.name() + + override def simpleString(maxFields: Int): String = { + s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" } override def computeStats(): Statistics = { - val scan = newScanBuilder().build() scan match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics() @@ -64,10 +106,6 @@ case class DataSourceV2Relation( Statistics(sizeInBytes = conf.defaultSizeInBytes) } } - - override def newInstance(): DataSourceV2Relation = { - copy(output = output.map(_.newInstance())) - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5f6e0a82be..607f495139 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -51,7 +51,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileTable} import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.execution.stat.StatFunctions import org.apache.spark.sql.internal.SQLConf @@ -3218,7 +3218,7 @@ class Dataset[T] private[sql]( fr.inputFiles case r: HiveTableRelation => r.tableMeta.storage.locationUri.map(_.toString).toArray - case DataSourceV2Relation(table: FileTable, _, _) => + case DataSourceV2ScanRelation(table: FileTable, _, _) => table.fileIndex.inputFiles }.flatten files.toSet.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 4a8b56fadd..e65faefad5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -20,10 +20,13 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.ExperimentalMethods import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning} import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions import org.apache.spark.sql.execution.datasources.SchemaPruning +import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs} class SparkOptimizer( @@ -32,10 +35,12 @@ class SparkOptimizer( experimentalMethods: ExperimentalMethods) extends Optimizer(catalogManager) { + override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = + // TODO: move SchemaPruning into catalyst + SchemaPruning :: PruneFileSourcePartitions :: V2ScanRelationPushDown :: Nil + override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ - Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+ - Batch("Schema Pruning", Once, SchemaPruning) :+ Batch("PartitionPruning", Once, PartitionPruning, OptimizeSubqueries) :+ @@ -64,7 +69,8 @@ class SparkOptimizer( override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+ ExtractPythonUDFFromJoinCondition.ruleName :+ ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+ - ExtractPythonUDFs.ruleName + ExtractPythonUDFs.ruleName :+ + V2ScanRelationPushDown.ruleName /** * Optimization batches that are executed before the regular optimization batches (also before diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3041e9e82d..bc66c154b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -18,127 +18,30 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} -import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} -import org.apache.spark.sql.sources import org.apache.spark.sql.util.CaseInsensitiveStringMap object DataSourceV2Strategy extends Strategy with PredicateHelper { - /** - * Pushes down filters to the data source reader - * - * @return pushed filter and post-scan filters. - */ - private def pushFilters( - scanBuilder: ScanBuilder, - filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { - scanBuilder match { - case r: SupportsPushDownFilters => - // A map from translated data source leaf node filters to original catalyst filter - // expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially - // pushed down. This map can be used to construct a catalyst filter expression from the - // input filter, or a superset(partial push down filter) of the input filter. - val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] - val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] - // Catalyst filter expression that can't be translated to data source filters. - val untranslatableExprs = mutable.ArrayBuffer.empty[Expression] - - for (filterExpr <- filters) { - val translated = - DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr)) - if (translated.isEmpty) { - untranslatableExprs += filterExpr - } else { - translatedFilters += translated.get - } - } - - // Data source filters that need to be evaluated again after scanning. which means - // the data source cannot guarantee the rows returned can pass these filters. - // As a result we must return it so Spark can plan an extra filter operator. - val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter => - DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) - } - // The filters which are marked as pushed to this data source - val pushedFilters = r.pushedFilters().map { filter => - DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) - } - (pushedFilters, untranslatableExprs ++ postScanFilters) - - case _ => (Nil, filters) - } - } - - /** - * Applies column pruning to the data source, w.r.t. the references of the given expressions. - * - * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), - * and new output attributes after column pruning. - */ - // TODO: nested column pruning. - private def pruneColumns( - scanBuilder: ScanBuilder, - relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { - scanBuilder match { - case r: SupportsPushDownRequiredColumns => - val requiredColumns = AttributeSet(exprs.flatMap(_.references)) - val neededOutput = relation.output.filter(requiredColumns.contains) - if (neededOutput != relation.output) { - r.pruneColumns(neededOutput.toStructType) - val scan = r.build() - val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap - scan -> scan.readSchema().toAttributes.map { - // We have to keep the attribute id during transformation. - a => a.withExprId(nameToAttr(a.name).exprId) - } - } else { - r.build() -> relation.output - } - - case _ => scanBuilder.build() -> relation.output - } - } - import DataSourceV2Implicits._ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => - val scanBuilder = relation.newScanBuilder() + case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) => + // projection and filters were already pushed down in the optimizer. + // this uses PhysicalOperation to get the projection and ensure that if the batch scan does + // not support columnar, a projection is added to convert the rows to UnsafeRow. + val batchExec = BatchScanExec(relation.output, relation.scan) - val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery) - val normalizedFilters = DataSourceStrategy.normalizeFilters( - withoutSubquery, relation.output) - - // `pushedFilters` will be pushed down and evaluated in the underlying data sources. - // `postScanFilters` need to be evaluated after the scan. - // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFiltersWithoutSubquery) = - pushFilters(scanBuilder, normalizedFilters) - val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery - val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters) - logInfo( - s""" - |Pushing operators to ${relation.name} - |Pushed Filters: ${pushedFilters.mkString(", ")} - |Post-Scan Filters: ${postScanFilters.mkString(",")} - |Output: ${output.mkString(", ")} - """.stripMargin) - - val batchExec = BatchScanExec(output, scan) - - val filterCondition = postScanFilters.reduceLeftOption(And) + val filterCondition = filters.reduceLeftOption(And) val withFilter = filterCondition.map(FilterExec(_, batchExec)).getOrElse(batchExec) val withProjection = if (withFilter.output != project || !batchExec.supportsColumnar) { @@ -254,19 +157,19 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { OverwritePartitionsDynamicExec( r.table.asWritable, writeOptions.asOptions, planLater(query)) :: Nil - case DeleteFromTable(r: DataSourceV2Relation, condition) => + case DeleteFromTable(DataSourceV2ScanRelation(table, _, output), condition) => if (condition.exists(SubqueryExpression.hasSubquery)) { throw new AnalysisException( s"Delete by condition with subquery is not supported: $condition") } // fail if any filter cannot be converted. correctness depends on removing all matching data. - val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, r.output) + val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, output) .flatMap(splitConjunctivePredicates(_).map { f => DataSourceStrategy.translateFilter(f).getOrElse( throw new AnalysisException(s"Exec update failed:" + s" cannot translate expression to source filter: $f")) }).toArray - DeleteFromTableExec(r.table.asDeletable, filters) :: Nil + DeleteFromTableExec(table.asDeletable, filters) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil @@ -283,8 +186,8 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { Nil } - case desc @ DescribeTable(r: DataSourceV2Relation, isExtended) => - DescribeTableExec(desc.output, r.table, isExtended) :: Nil + case desc @ DescribeTable(DataSourceV2Relation(table, _, _), isExtended) => + DescribeTableExec(desc.output, table, isExtended) :: Nil case DropTable(catalog, ident, ifExists) => DropTableExec(catalog, ident, ifExists) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala new file mode 100644 index 0000000000..634ecfdf7e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, PredicateHelper} +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources + +object PushDownUtils extends PredicateHelper { + /** + * Pushes down filters to the data source reader + * + * @return pushed filter and post-scan filters. + */ + def pushFilters( + scanBuilder: ScanBuilder, + filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { + scanBuilder match { + case r: SupportsPushDownFilters => + // A map from translated data source leaf node filters to original catalyst filter + // expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially + // pushed down. This map can be used to construct a catalyst filter expression from the + // input filter, or a superset(partial push down filter) of the input filter. + val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] + val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] + // Catalyst filter expression that can't be translated to data source filters. + val untranslatableExprs = mutable.ArrayBuffer.empty[Expression] + + for (filterExpr <- filters) { + val translated = + DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr)) + if (translated.isEmpty) { + untranslatableExprs += filterExpr + } else { + translatedFilters += translated.get + } + } + + // Data source filters that need to be evaluated again after scanning. which means + // the data source cannot guarantee the rows returned can pass these filters. + // As a result we must return it so Spark can plan an extra filter operator. + val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter => + DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) + } + // The filters which are marked as pushed to this data source + val pushedFilters = r.pushedFilters().map { filter => + DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) + } + (pushedFilters, untranslatableExprs ++ postScanFilters) + + case _ => (Nil, filters) + } + } + + /** + * Applies column pruning to the data source, w.r.t. the references of the given expressions. + * + * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), + * and new output attributes after column pruning. + */ + // TODO: nested column pruning. + def pruneColumns( + scanBuilder: ScanBuilder, + relation: DataSourceV2Relation, + exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + scanBuilder match { + case r: SupportsPushDownRequiredColumns => + val requiredColumns = AttributeSet(exprs.flatMap(_.references)) + val neededOutput = relation.output.filter(requiredColumns.contains) + if (neededOutput != relation.output) { + r.pruneColumns(neededOutput.toStructType) + val scan = r.build() + val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap + scan -> scan.readSchema().toAttributes.map { + // We have to keep the attribute id during transformation. + a => a.withExprId(nameToAttr(a.name).exprId) + } + } else { + r.build() -> relation.output + } + + case _ => scanBuilder.build() -> relation.output + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala new file mode 100644 index 0000000000..6aa8d98958 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.{And, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy + +object V2ScanRelationPushDown extends Rule[LogicalPlan] { + import DataSourceV2Implicits._ + + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { + case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => + val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) + + val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery) + val normalizedFilters = DataSourceStrategy.normalizeFilters( + withoutSubquery, relation.output) + + // `pushedFilters` will be pushed down and evaluated in the underlying data sources. + // `postScanFilters` need to be evaluated after the scan. + // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. + val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( + scanBuilder, normalizedFilters) + val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery + val (scan, output) = PushDownUtils.pruneColumns( + scanBuilder, relation, project ++ postScanFilters) + logInfo( + s""" + |Pushing operators to ${relation.name} + |Pushed Filters: ${pushedFilters.mkString(", ")} + |Post-Scan Filters: ${postScanFilters.mkString(",")} + |Output: ${output.mkString(", ")} + """.stripMargin) + + val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output) + + val filterCondition = postScanFilters.reduceLeftOption(And) + val withFilter = filterCondition.map(Filter(_, scanRelation)).getOrElse(scanRelation) + + val withProjection = if (withFilter.output != project) { + Project(project, withFilter) + } else { + withFilter + } + + withProjection + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index d08f4b9066..a7f3e81904 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.io.{File, FilenameFilter, FileNotFoundException} +import java.io.{File, FileNotFoundException} import java.nio.file.{Files, StandardOpenOption} import java.util.Locale @@ -27,9 +27,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} -import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT} +import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ @@ -664,7 +664,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { dir.delete() spark.range(1000).write.orc(dir.toString) val df = spark.read.orc(dir.toString) - assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(getLocalDirSize(dir))) + assert(df.queryExecution.optimizedPlan.stats.sizeInBytes === BigInt(getLocalDirSize(dir))) } } } @@ -720,7 +720,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { .option("path", paths.head.getCanonicalPath) .parquet(paths(1).getCanonicalPath, paths(2).getCanonicalPath) df.queryExecution.optimizedPlan match { - case PhysicalOperation(_, _, DataSourceV2Relation(table: ParquetTable, _, _)) => + case PhysicalOperation(_, _, DataSourceV2ScanRelation(table: ParquetTable, _, _)) => assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet) case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 138bbc3f04..55c71c7d02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapabil import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.read.partitioning.{ClusteredDistribution, Distribution, Partitioning} -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, DataSourceV2ScanRelation} import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.functions._ @@ -195,7 +195,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { withClue(cls.getName) { val df = spark.read.format(cls.getName).load() val logical = df.queryExecution.optimizedPlan.collect { - case d: DataSourceV2Relation => d + case d: DataSourceV2ScanRelation => d }.head val statics = logical.computeStats() @@ -332,7 +332,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { test("SPARK-23315: get output from canonicalized data source v2 related plans") { def checkCanonicalizedOutput( df: DataFrame, logicalNumOutput: Int, physicalNumOutput: Int): Unit = { - val logical = df.queryExecution.optimizedPlan.collect { + val logical = df.queryExecution.logical.collect { case d: DataSourceV2Relation => d }.head assert(logical.canonicalized.output.length == logicalNumOutput) @@ -356,7 +356,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession { .read .option(optionName, false) .format(classOf[DataSourceV2WithSessionConfig].getName).load() - val options = df.queryExecution.optimizedPlan.collectFirst { + val options = df.queryExecution.logical.collectFirst { case d: DataSourceV2Relation => d.options }.get assert(options.get(optionName) === "false") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index adbd93dcb4..528c3474a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -27,9 +27,9 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable +import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION @@ -119,11 +119,8 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor query.queryExecution.optimizedPlan match { case PhysicalOperation(_, filters, - DataSourceV2Relation(orcTable: OrcTable, _, options)) => + DataSourceV2ScanRelation(_, OrcScan(_, _, _, _, _, _, _, pushedFilters), _)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = orcTable.newScanBuilder(options) - scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) - val pushedFilters = scanBuilder.pushedFilters() if (noneSupported) { assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters") } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala index b626edf5dc..80cfbd6a02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.spark.SparkConf -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.execution.datasources.SchemaPruningSuite import org.apache.spark.sql.execution.datasources.v2.BatchScanExec diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 9671866fe1..286bb1e920 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -33,9 +33,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.datasources.orc.OrcFilters -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType @@ -1484,12 +1483,10 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { query.queryExecution.optimizedPlan.collectFirst { case PhysicalOperation(_, filters, - DataSourceV2Relation(parquetTable: ParquetTable, _, options)) => + DataSourceV2ScanRelation(_, scan: ParquetScan, _)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = parquetTable.newScanBuilder(options) val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter).toArray - scanBuilder.pushFilters(sourceFilters) - val pushedFilters = scanBuilder.pushedFilters() + val pushedFilters = scan.pushedFilters assert(pushedFilters.nonEmpty, "No filter is pushed down") val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) val parquetFilters = createParquetFilters(schema) diff --git a/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index b1a907f9cb..80e330b3f2 100644 --- a/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -29,9 +29,8 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -55,11 +54,8 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { query.queryExecution.optimizedPlan match { case PhysicalOperation(_, filters, - DataSourceV2Relation(orcTable: OrcTable, _, options)) => + DataSourceV2ScanRelation(_, OrcScan(_, _, _, _, _, _, _, pushedFilters), _)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = orcTable.newScanBuilder(options) - scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) - val pushedFilters = scanBuilder.pushedFilters() assert(pushedFilters.nonEmpty, "No filter is pushed down") val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pushedFilters")