[SPARK-23877][SQL][FOLLOWUP] use PhysicalOperation to simplify the handling of Project and Filter over partitioned relation
## What changes were proposed in this pull request? A followup of https://github.com/apache/spark/pull/20988 `PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation. ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes #21111 from cloud-fan/refactor.
This commit is contained in:
parent
c3a86faa53
commit
f70f46d1e5
|
@ -43,6 +43,12 @@ object LocalRelation {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Logical plan node for scanning data from a local collection.
|
||||
*
|
||||
* @param data The local collection holding the data. It doesn't need to be sent to executors
|
||||
* and then doesn't need to be serializable.
|
||||
*/
|
||||
case class LocalRelation(
|
||||
output: Seq[Attribute],
|
||||
data: Seq[InternalRow] = Nil,
|
||||
|
|
|
@ -25,6 +25,9 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
|
|||
|
||||
/**
|
||||
* Physical plan node for scanning data from a local collection.
|
||||
*
|
||||
* `Seq` may not be serializable and ideally we should not send `rows` and `unsafeRows`
|
||||
* to the executors. Thus marking them as transient.
|
||||
*/
|
||||
case class LocalTableScanExec(
|
||||
output: Seq[Attribute],
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
|
|||
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.expressions.aggregate._
|
||||
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
|
||||
|
@ -49,9 +50,13 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
|
|||
}
|
||||
|
||||
plan.transform {
|
||||
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(_, attrs, filters, rel)) =>
|
||||
case a @ Aggregate(_, aggExprs, child @ PhysicalOperation(
|
||||
projectList, filters, PartitionedRelation(partAttrs, rel))) =>
|
||||
// We only apply this optimization when only partitioned attributes are scanned.
|
||||
if (a.references.subsetOf(attrs)) {
|
||||
if (AttributeSet((projectList ++ filters).flatMap(_.references)).subsetOf(partAttrs)) {
|
||||
// The project list and filters all only refer to partition attributes, which means the
|
||||
// the Aggregator operator can also only refer to partition attributes, and filters are
|
||||
// all partition filters. This is a metadata only query we can optimize.
|
||||
val aggFunctions = aggExprs.flatMap(_.collect {
|
||||
case agg: AggregateExpression => agg
|
||||
})
|
||||
|
@ -102,7 +107,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
|
|||
partFilters: Seq[Expression]): LogicalPlan = {
|
||||
// this logic comes from PruneFileSourcePartitions. it ensures that the filter names match the
|
||||
// relation's schema. PartitionedRelation ensures that the filters only reference partition cols
|
||||
val relFilters = partFilters.map { e =>
|
||||
val normalizedFilters = partFilters.map { e =>
|
||||
e transform {
|
||||
case a: AttributeReference =>
|
||||
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
|
||||
|
@ -114,11 +119,8 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
|
|||
relation match {
|
||||
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
|
||||
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
|
||||
val partitionData = fsRelation.location.listFiles(relFilters, Nil)
|
||||
// partition data may be a stream, which can cause serialization to hit stack level too
|
||||
// deep exceptions because it is a recursive structure in memory. converting to array
|
||||
// avoids the problem.
|
||||
LocalRelation(partAttrs, partitionData.map(_.values).toArray, isStreaming)
|
||||
val partitionData = fsRelation.location.listFiles(normalizedFilters, Nil)
|
||||
LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)
|
||||
|
||||
case relation: HiveTableRelation =>
|
||||
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
|
||||
|
@ -127,7 +129,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
|
|||
val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
|
||||
.getOrElse(SQLConf.get.sessionLocalTimeZone)
|
||||
val partitions = if (partFilters.nonEmpty) {
|
||||
catalog.listPartitionsByFilter(relation.tableMeta.identifier, relFilters)
|
||||
catalog.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters)
|
||||
} else {
|
||||
catalog.listPartitions(relation.tableMeta.identifier)
|
||||
}
|
||||
|
@ -137,10 +139,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
|
|||
Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
|
||||
})
|
||||
}
|
||||
// partition data may be a stream, which can cause serialization to hit stack level too
|
||||
// deep exceptions because it is a recursive structure in memory. converting to array
|
||||
// avoids the problem.
|
||||
LocalRelation(partAttrs, partitionData.toArray)
|
||||
LocalRelation(partAttrs, partitionData)
|
||||
|
||||
case _ =>
|
||||
throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
|
||||
|
@ -151,44 +150,21 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
|
|||
|
||||
/**
|
||||
* A pattern that finds the partitioned table relation node inside the given plan, and returns a
|
||||
* pair of the partition attributes, partition filters, and the table relation node.
|
||||
*
|
||||
* It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with
|
||||
* deterministic expressions, and returns result after reaching the partitioned table relation
|
||||
* node.
|
||||
* pair of the partition attributes and the table relation node.
|
||||
*/
|
||||
object PartitionedRelation extends PredicateHelper {
|
||||
|
||||
def unapply(
|
||||
plan: LogicalPlan): Option[(AttributeSet, AttributeSet, Seq[Expression], LogicalPlan)] = {
|
||||
def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = {
|
||||
plan match {
|
||||
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
|
||||
if fsRelation.partitionSchema.nonEmpty =>
|
||||
if fsRelation.partitionSchema.nonEmpty =>
|
||||
val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
|
||||
Some((partAttrs, partAttrs, Nil, l))
|
||||
Some((partAttrs, l))
|
||||
|
||||
case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
|
||||
val partAttrs = AttributeSet(
|
||||
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
|
||||
Some((partAttrs, partAttrs, Nil, relation))
|
||||
|
||||
case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
|
||||
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
|
||||
if (p.references.subsetOf(attrs)) {
|
||||
Some((partAttrs, p.outputSet, filters, relation))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
case f @ Filter(condition, child) if condition.deterministic =>
|
||||
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
|
||||
if (f.references.subsetOf(partAttrs)) {
|
||||
Some((partAttrs, attrs, splitConjunctivePredicates(condition) ++ filters, relation))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Some((partAttrs, relation))
|
||||
|
||||
case _ => None
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.spark.sql.QueryTest
|
|||
import org.apache.spark.sql.catalyst.expressions.NamedExpression
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
|
||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||
import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
|
||||
import org.apache.spark.sql.test.SQLTestUtils
|
||||
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
|
||||
|
||||
|
@ -32,13 +33,22 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto
|
|||
|
||||
import spark.implicits._
|
||||
|
||||
before {
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
|
||||
(0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
|
||||
}
|
||||
|
||||
override protected def afterAll(): Unit = {
|
||||
try {
|
||||
sql("DROP TABLE IF EXISTS metadata_only")
|
||||
} finally {
|
||||
super.afterAll()
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
|
||||
withTable("metadata_only") {
|
||||
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
|
||||
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
|
||||
|
||||
// verify the number of matching partitions
|
||||
|
@ -50,7 +60,7 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto
|
|||
}
|
||||
|
||||
test("SPARK-23877: filter on projected expression") {
|
||||
withTable("metadata_only") {
|
||||
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
|
||||
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
|
||||
|
||||
// verify the matching partitions
|
||||
|
|
Loading…
Reference in a new issue