[SPARK-29277][SQL] Add early DSv2 filter and projection pushdown

### What changes were proposed in this pull request?

This adds a new rule, `V2ScanRelationPushDown`, to push filters and projections in to a new `DataSourceV2ScanRelation` in the optimizer. That scan is then used when converting to a physical scan node. The new relation correctly reports stats based on the scan.

To run scan pushdown before rules where stats are used, this adds a new optimizer override, `earlyScanPushDownRules` and a batch for early pushdown in the optimizer, before cost-based join reordering. The other early pushdown rule, `PruneFileSourcePartitions`, is moved into the early pushdown rule set.

This also moves pushdown helper methods from `DataSourceV2Strategy` into a util class.

### Why are the changes needed?

This is needed for DSv2 sources to supply stats for cost-based rules in the optimizer.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

This updates the implementation of stats from `DataSourceV2Relation` so tests will fail if stats are accessed before early pushdown for v2 relations.

Closes #25955 from rdblue/move-v2-pushdown.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Ryan Blue <blue@apache.org>
This commit is contained in:
Ryan Blue 2019-10-30 18:07:34 -07:00
parent 8207c835b4
commit cfc80d0eb1
17 changed files with 304 additions and 161 deletions

View file

@ -681,10 +681,18 @@ class Analyzer(
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)
case desc @ DescribeTable(u: UnresolvedV2Relation, _) =>
CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => desc.copy(table = rel))
.getOrElse(desc)
case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => alter.copy(table = rel))
.getOrElse(alter)
case u: UnresolvedV2Relation =>
CatalogV2Util.loadTable(u.catalog, u.tableName).map { table =>
DataSourceV2Relation.create(table)
}.getOrElse(u)
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
}
}

View file

@ -104,6 +104,20 @@ trait CheckAnalysis extends PredicateHelper {
case u: UnresolvedV2Relation =>
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")
case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
case DescribeTable(u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")
case DescribeTable(u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,

View file

@ -119,7 +119,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
rulesWithoutInferFiltersFromConstraints: _*) :: Nil
}
(Batch("Eliminate Distinct", Once, EliminateDistinct) ::
val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
@ -170,6 +170,10 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
// This batch pushes filters and projections into scan nodes. Before this batch, the logical
// plan may contain nodes that do not report stats. Anything that uses stats must run after
// this batch.
Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+
// Since join costs in AQP can change between multiple runs, there is no reason that we have an
// idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once.
Batch("Join Reorder", FixedPoint(1),
@ -196,6 +200,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveNoopOperators) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
// remove any batches with no rules. this may happen when subclasses do not add optional rules.
batches.filter(_.rules.nonEmpty)
}
/**
@ -253,6 +260,11 @@ abstract class Optimizer(catalogManager: CatalogManager)
*/
def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
/**
* Override to provide additional rules for early projection and filter pushdown to scans.
*/
def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil
/**
* Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that
* eventually run in the Optimizer.

View file

@ -271,7 +271,7 @@ case class ShowNamespaces(
*/
case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
override lazy val resolved: Boolean = table.resolved
override def output: Seq[Attribute] = DescribeTableSchema.describeTableAttributes()
}
@ -313,9 +313,7 @@ case class AlterTable(
table: NamedRelation,
changes: Seq[TableChange]) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
override lazy val resolved: Boolean = childrenResolved && {
override lazy val resolved: Boolean = table.resolved && {
changes.forall {
case add: AddColumn =>
add.fieldNames match {

View file

@ -24,9 +24,10 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
private[sql] object CatalogV2Util {
@ -224,6 +225,10 @@ private[sql] object CatalogV2Util {
case _: NoSuchNamespaceException => None
}
def loadRelation(catalog: CatalogPlugin, ident: Identifier): Option[NamedRelation] = {
loadTable(catalog, ident).map(DataSourceV2Relation.create)
}
def isSessionCatalog(catalog: CatalogPlugin): Boolean = {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}

View file

@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2S
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
/**
* A logical plan representing a data source v2 table.
@ -50,12 +51,53 @@ case class DataSourceV2Relation(
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}
def newScanBuilder(): ScanBuilder = {
table.asReadable.newScanBuilder(options)
override def computeStats(): Statistics = {
if (Utils.isTesting) {
// when testing, throw an exception if this computeStats method is called because stats should
// not be accessed before pushing the projection and filters to create a scan. otherwise, the
// stats are not accurate because they are based on a full table scan of all columns.
throw new IllegalStateException(
s"BUG: computeStats called before pushdown on DSv2 relation: $name")
} else {
// when not testing, return stats because bad stats are better than failing a query
table.asReadable.newScanBuilder(options) match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes)
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}
}
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
}
/**
* A logical plan for a DSv2 table with a scan already created.
*
* This is used in the optimizer to push filters and projection down before conversion to physical
* plan. This ensures that the stats that are used by the optimizer account for the filters and
* projection that will be pushed down.
*
* @param table a DSv2 [[Table]]
* @param scan a DSv2 [[Scan]]
* @param output the output attributes of this relation
*/
case class DataSourceV2ScanRelation(
table: Table,
scan: Scan,
output: Seq[AttributeReference]) extends LeafNode with NamedRelation {
override def name: String = table.name()
override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}
override def computeStats(): Statistics = {
val scan = newScanBuilder().build()
scan match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
@ -64,10 +106,6 @@ case class DataSourceV2Relation(
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
}
/**

View file

@ -51,7 +51,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.internal.SQLConf
@ -3218,7 +3218,7 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
case DataSourceV2Relation(table: FileTable, _, _) =>
case DataSourceV2ScanRelation(table: FileTable, _, _) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray

View file

@ -20,10 +20,13 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning}
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
import org.apache.spark.sql.execution.datasources.SchemaPruning
import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs}
class SparkOptimizer(
@ -32,10 +35,12 @@ class SparkOptimizer(
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalogManager) {
override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
// TODO: move SchemaPruning into catalyst
SchemaPruning :: PruneFileSourcePartitions :: V2ScanRelationPushDown :: Nil
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
Batch("Schema Pruning", Once, SchemaPruning) :+
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
@ -64,7 +69,8 @@ class SparkOptimizer(
override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+
ExtractPythonUDFFromJoinCondition.ruleName :+
ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+
ExtractPythonUDFs.ruleName
ExtractPythonUDFs.ruleName :+
V2ScanRelationPushDown.ruleName
/**
* Optimization batches that are executed before the regular optimization batches (also before

View file

@ -18,127 +18,30 @@
package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources
import org.apache.spark.sql.util.CaseInsensitiveStringMap
object DataSourceV2Strategy extends Strategy with PredicateHelper {
/**
* Pushes down filters to the data source reader
*
* @return pushed filter and post-scan filters.
*/
private def pushFilters(
scanBuilder: ScanBuilder,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
scanBuilder match {
case r: SupportsPushDownFilters =>
// A map from translated data source leaf node filters to original catalyst filter
// expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially
// pushed down. This map can be used to construct a catalyst filter expression from the
// input filter, or a superset(partial push down filter) of the input filter.
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
for (filterExpr <- filters) {
val translated =
DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr))
if (translated.isEmpty) {
untranslatableExprs += filterExpr
} else {
translatedFilters += translated.get
}
}
// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(pushedFilters, untranslatableExprs ++ postScanFilters)
case _ => (Nil, filters)
}
}
/**
* Applies column pruning to the data source, w.r.t. the references of the given expressions.
*
* @return the created `ScanConfig`(since column pruning is the last step of operator pushdown),
* and new output attributes after column pruning.
*/
// TODO: nested column pruning.
private def pruneColumns(
scanBuilder: ScanBuilder,
relation: DataSourceV2Relation,
exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
scanBuilder match {
case r: SupportsPushDownRequiredColumns =>
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
val neededOutput = relation.output.filter(requiredColumns.contains)
if (neededOutput != relation.output) {
r.pruneColumns(neededOutput.toStructType)
val scan = r.build()
val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
scan -> scan.readSchema().toAttributes.map {
// We have to keep the attribute id during transformation.
a => a.withExprId(nameToAttr(a.name).exprId)
}
} else {
r.build() -> relation.output
}
case _ => scanBuilder.build() -> relation.output
}
}
import DataSourceV2Implicits._
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.newScanBuilder()
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
// projection and filters were already pushed down in the optimizer.
// this uses PhysicalOperation to get the projection and ensure that if the batch scan does
// not support columnar, a projection is added to convert the rows to UnsafeRow.
val batchExec = BatchScanExec(relation.output, relation.scan)
val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery)
val normalizedFilters = DataSourceStrategy.normalizeFilters(
withoutSubquery, relation.output)
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFiltersWithoutSubquery) =
pushFilters(scanBuilder, normalizedFilters)
val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
logInfo(
s"""
|Pushing operators to ${relation.name}
|Pushed Filters: ${pushedFilters.mkString(", ")}
|Post-Scan Filters: ${postScanFilters.mkString(",")}
|Output: ${output.mkString(", ")}
""".stripMargin)
val batchExec = BatchScanExec(output, scan)
val filterCondition = postScanFilters.reduceLeftOption(And)
val filterCondition = filters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, batchExec)).getOrElse(batchExec)
val withProjection = if (withFilter.output != project || !batchExec.supportsColumnar) {
@ -254,19 +157,19 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
OverwritePartitionsDynamicExec(
r.table.asWritable, writeOptions.asOptions, planLater(query)) :: Nil
case DeleteFromTable(r: DataSourceV2Relation, condition) =>
case DeleteFromTable(DataSourceV2ScanRelation(table, _, output), condition) =>
if (condition.exists(SubqueryExpression.hasSubquery)) {
throw new AnalysisException(
s"Delete by condition with subquery is not supported: $condition")
}
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, r.output)
val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, output)
.flatMap(splitConjunctivePredicates(_).map {
f => DataSourceStrategy.translateFilter(f).getOrElse(
throw new AnalysisException(s"Exec update failed:" +
s" cannot translate expression to source filter: $f"))
}).toArray
DeleteFromTableExec(r.table.asDeletable, filters) :: Nil
DeleteFromTableExec(table.asDeletable, filters) :: Nil
case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
@ -283,8 +186,8 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
Nil
}
case desc @ DescribeTable(r: DataSourceV2Relation, isExtended) =>
DescribeTableExec(desc.output, r.table, isExtended) :: Nil
case desc @ DescribeTable(DataSourceV2Relation(table, _, _), isExtended) =>
DescribeTableExec(desc.output, table, isExtended) :: Nil
case DropTable(catalog, ident, ifExists) =>
DropTableExec(catalog, ident, ifExists) :: Nil

View file

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, PredicateHelper}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources
object PushDownUtils extends PredicateHelper {
/**
* Pushes down filters to the data source reader
*
* @return pushed filter and post-scan filters.
*/
def pushFilters(
scanBuilder: ScanBuilder,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
scanBuilder match {
case r: SupportsPushDownFilters =>
// A map from translated data source leaf node filters to original catalyst filter
// expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially
// pushed down. This map can be used to construct a catalyst filter expression from the
// input filter, or a superset(partial push down filter) of the input filter.
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
for (filterExpr <- filters) {
val translated =
DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr))
if (translated.isEmpty) {
untranslatableExprs += filterExpr
} else {
translatedFilters += translated.get
}
}
// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(pushedFilters, untranslatableExprs ++ postScanFilters)
case _ => (Nil, filters)
}
}
/**
* Applies column pruning to the data source, w.r.t. the references of the given expressions.
*
* @return the created `ScanConfig`(since column pruning is the last step of operator pushdown),
* and new output attributes after column pruning.
*/
// TODO: nested column pruning.
def pruneColumns(
scanBuilder: ScanBuilder,
relation: DataSourceV2Relation,
exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
scanBuilder match {
case r: SupportsPushDownRequiredColumns =>
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
val neededOutput = relation.output.filter(requiredColumns.contains)
if (neededOutput != relation.output) {
r.pruneColumns(neededOutput.toStructType)
val scan = r.build()
val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
scan -> scan.readSchema().toAttributes.map {
// We have to keep the attribute id during transformation.
a => a.withExprId(nameToAttr(a.name).exprId)
}
} else {
r.build() -> relation.output
}
case _ => scanBuilder.build() -> relation.output
}
}
}

View file

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.catalyst.expressions.{And, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
object V2ScanRelationPushDown extends Rule[LogicalPlan] {
import DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options)
val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery)
val normalizedFilters = DataSourceStrategy.normalizeFilters(
withoutSubquery, relation.output)
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(
scanBuilder, normalizedFilters)
val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery
val (scan, output) = PushDownUtils.pruneColumns(
scanBuilder, relation, project ++ postScanFilters)
logInfo(
s"""
|Pushing operators to ${relation.name}
|Pushed Filters: ${pushedFilters.mkString(", ")}
|Post-Scan Filters: ${postScanFilters.mkString(",")}
|Output: ${output.mkString(", ")}
""".stripMargin)
val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output)
val filterCondition = postScanFilters.reduceLeftOption(And)
val withFilter = filterCondition.map(Filter(_, scanRelation)).getOrElse(scanRelation)
val withProjection = if (withFilter.output != project) {
Project(project, withFilter)
} else {
withFilter
}
withProjection
}
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql
import java.io.{File, FilenameFilter, FileNotFoundException}
import java.io.{File, FileNotFoundException}
import java.nio.file.{Files, StandardOpenOption}
import java.util.Locale
@ -27,9 +27,9 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT}
import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
@ -664,7 +664,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
dir.delete()
spark.range(1000).write.orc(dir.toString)
val df = spark.read.orc(dir.toString)
assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(getLocalDirSize(dir)))
assert(df.queryExecution.optimizedPlan.stats.sizeInBytes === BigInt(getLocalDirSize(dir)))
}
}
}
@ -720,7 +720,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
.option("path", paths.head.getCanonicalPath)
.parquet(paths(1).getCanonicalPath, paths(2).getCanonicalPath)
df.queryExecution.optimizedPlan match {
case PhysicalOperation(_, _, DataSourceV2Relation(table: ParquetTable, _, _)) =>
case PhysicalOperation(_, _, DataSourceV2ScanRelation(table: ParquetTable, _, _)) =>
assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet)
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")

View file

@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapabil
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.read.partitioning.{ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.functions._
@ -195,7 +195,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession {
withClue(cls.getName) {
val df = spark.read.format(cls.getName).load()
val logical = df.queryExecution.optimizedPlan.collect {
case d: DataSourceV2Relation => d
case d: DataSourceV2ScanRelation => d
}.head
val statics = logical.computeStats()
@ -332,7 +332,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession {
test("SPARK-23315: get output from canonicalized data source v2 related plans") {
def checkCanonicalizedOutput(
df: DataFrame, logicalNumOutput: Int, physicalNumOutput: Int): Unit = {
val logical = df.queryExecution.optimizedPlan.collect {
val logical = df.queryExecution.logical.collect {
case d: DataSourceV2Relation => d
}.head
assert(logical.canonicalized.output.length == logicalNumOutput)
@ -356,7 +356,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession {
.read
.option(optionName, false)
.format(classOf[DataSourceV2WithSessionConfig].getName).load()
val options = df.queryExecution.optimizedPlan.collectFirst {
val options = df.queryExecution.logical.collectFirst {
case d: DataSourceV2Relation => d.options
}.get
assert(options.get(optionName) === "false")

View file

@ -27,9 +27,9 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
@ -119,11 +119,8 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
query.queryExecution.optimizedPlan match {
case PhysicalOperation(_, filters,
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
DataSourceV2ScanRelation(_, OrcScan(_, _, _, _, _, _, _, pushedFilters), _)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
val scanBuilder = orcTable.newScanBuilder(options)
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
val pushedFilters = scanBuilder.pushedFilters()
if (noneSupported) {
assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
} else {

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.orc
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.datasources.SchemaPruningSuite
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

View file

@ -33,9 +33,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcFilters
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
@ -1484,12 +1483,10 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
query.queryExecution.optimizedPlan.collectFirst {
case PhysicalOperation(_, filters,
DataSourceV2Relation(parquetTable: ParquetTable, _, options)) =>
DataSourceV2ScanRelation(_, scan: ParquetScan, _)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
val scanBuilder = parquetTable.newScanBuilder(options)
val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter).toArray
scanBuilder.pushFilters(sourceFilters)
val pushedFilters = scanBuilder.pushedFilters()
val pushedFilters = scan.pushedFilters
assert(pushedFilters.nonEmpty, "No filter is pushed down")
val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
val parquetFilters = createParquetFilters(schema)

View file

@ -29,9 +29,8 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@ -55,11 +54,8 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
query.queryExecution.optimizedPlan match {
case PhysicalOperation(_, filters,
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
DataSourceV2ScanRelation(_, OrcScan(_, _, _, _, _, _, _, pushedFilters), _)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
val scanBuilder = orcTable.newScanBuilder(options)
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
val pushedFilters = scanBuilder.pushedFilters()
assert(pushedFilters.nonEmpty, "No filter is pushed down")
val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pushedFilters")