[SPARK-2366] [SQL] Add column pruning for the right side of LeftSemi join.

The right side of `LeftSemi` join needs columns only used in join condition.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1301 from ueshin/issues/SPARK-2366 and squashes the following commits:

7677a39 [Takuya UESHIN] Update comments.
786d3a0 [Takuya UESHIN] Rename method name.
e0957b1 [Takuya UESHIN] Add column pruning for the right side of LeftSemi join.
This commit is contained in:
Takuya UESHIN 2014-07-05 11:48:08 -07:00 committed by Michael Armbrust
parent 42f3abd529
commit 3da8df939e

View file

@ -52,6 +52,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
* - Inserting Projections beneath the following operators:
* - Aggregate
* - Project <- Join
* - LeftSemiJoin
* - Collapse adjacent projections, performing alias substitution.
*/
object ColumnPruning extends Rule[LogicalPlan] {
@ -62,19 +63,22 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Eliminate unneeded attributes from either side of a Join.
case Project(projectList, Join(left, right, joinType, condition)) =>
// Collect the list of off references required either above or to evaluate the condition.
// Collect the list of all references required either above or to evaluate the condition.
val allReferences: Set[Attribute] =
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
/** Applies a projection only when the child is producing unnecessary attributes */
def prunedChild(c: LogicalPlan) =
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}
def pruneJoinChild(c: LogicalPlan) = prunedChild(c, allReferences)
Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), joinType, condition))
// Eliminate unneeded attributes from right side of a LeftSemiJoin.
case Join(left, right, LeftSemi, condition) =>
// Collect the list of all references required to evaluate the condition.
val allReferences: Set[Attribute] =
condition.map(_.references).getOrElse(Set.empty)
Join(left, prunedChild(right, allReferences), LeftSemi, condition)
// Combine adjacent Projects.
case Project(projectList1, Project(projectList2, child)) =>
@ -97,6 +101,14 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Eliminate no-op Projects
case Project(projectList, child) if child.output == projectList => child
}
/** Applies a projection only when the child is producing unnecessary attributes */
private def prunedChild(c: LogicalPlan, allReferences: Set[Attribute]) =
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}
}
/**