[SQL] Improve column pruning in the optimizer.
Author: Michael Armbrust <michael@databricks.com> Closes #378 from marmbrus/columnPruning and squashes the following commits: 779da56 [Michael Armbrust] More consistent naming. 1a4e9ea [Michael Armbrust] More comments. 2f4e7b9 [Michael Armbrust] Improve column pruning in the optimizer.
This commit is contained in:
parent
930b70f052
commit
f99401a630
|
@ -33,7 +33,56 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
|
|||
Batch("Filter Pushdown", Once,
|
||||
CombineFilters,
|
||||
PushPredicateThroughProject,
|
||||
PushPredicateThroughInnerJoin) :: Nil
|
||||
PushPredicateThroughInnerJoin,
|
||||
ColumnPruning) :: Nil
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
|
||||
* transformations:
|
||||
*
|
||||
* - Inserting Projections beneath the following operators:
|
||||
* - Aggregate
|
||||
* - Project <- Join
|
||||
* - Collapse adjacent projections, performing alias substitution.
|
||||
*/
|
||||
object ColumnPruning extends Rule[LogicalPlan] {
|
||||
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
|
||||
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
|
||||
// Project away references that are not needed to calculate the required aggregates.
|
||||
a.copy(child = Project(a.references.toSeq, child))
|
||||
|
||||
case Project(projectList, Join(left, right, joinType, condition)) =>
|
||||
// Collect the list of off references required either above or to evaluate the condition.
|
||||
val allReferences: Set[Attribute] =
|
||||
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
|
||||
/** Applies a projection when the child is producing unnecessary attributes */
|
||||
def prunedChild(c: LogicalPlan) =
|
||||
if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
|
||||
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
|
||||
} else {
|
||||
c
|
||||
}
|
||||
|
||||
Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
|
||||
|
||||
case Project(projectList1, Project(projectList2, child)) =>
|
||||
// Create a map of Aliases to their values from the child projection.
|
||||
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
|
||||
val aliasMap = projectList2.collect {
|
||||
case a @ Alias(e, _) => (a.toAttribute: Expression, a)
|
||||
}.toMap
|
||||
|
||||
// Substitute any attributes that are produced by the child projection, so that we safely
|
||||
// eliminate it.
|
||||
// e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'
|
||||
// TODO: Fix TransformBase to avoid the cast below.
|
||||
val substitutedProjection = projectList1.map(_.transform {
|
||||
case a if aliasMap.contains(a) => aliasMap(a)
|
||||
}).asInstanceOf[Seq[NamedExpression]]
|
||||
|
||||
Project(substitutedProjection, child)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -127,7 +127,7 @@ case class Aggregate(
|
|||
extends UnaryNode {
|
||||
|
||||
def output = aggregateExpressions.map(_.toAttribute)
|
||||
def references = child.references
|
||||
def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
|
||||
}
|
||||
|
||||
case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode {
|
||||
|
|
Loading…
Reference in a new issue