start of PushDownPredicate's second case.
parent
7bc91c8402
commit
54c84e7ca7
|
@ -126,7 +126,19 @@ object SparkMethods
|
|||
def expressionIsDeterministic(e: Expression) =
|
||||
e.deterministic
|
||||
|
||||
def combineFilters(fc: Expression, nf: Filter, nc: Expression, grandChild: LogicalPlan) =
|
||||
def namedExpressionsAreDeterministic(nes: Seq[NamedExpression]): Boolean =
|
||||
nes.forall(_.deterministic)
|
||||
|
||||
def canPushThroughCondition(plan: LogicalPlan, condition: Expression): Boolean =
|
||||
{
|
||||
val attributes = plan.outputSet
|
||||
!condition.exists {
|
||||
case s: SubqueryExpression => s.plan.outputSet.intersect(attributes).nonEmpty
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
|
||||
def combineFiltersApplyLocally(fc: Expression, nf: Filter, nc: Expression, grandChild: LogicalPlan): LogicalPlan =
|
||||
{
|
||||
val (combineCandidates, nonDeterministic) =
|
||||
splitConjunctivePredicates(fc).partition(_.deterministic)
|
||||
|
@ -139,4 +151,10 @@ object SparkMethods
|
|||
}
|
||||
nonDeterministic.reduceOption(And).map(c => Filter(c, mergedFilter)).getOrElse(mergedFilter)
|
||||
}
|
||||
}
|
||||
|
||||
def pushPredicateThroughNonJoinApplyLocallyOne(condition: Expression, project: Project, fields: Seq[NamedExpression], grandChild: LogicalPlan ): LogicalPlan =
|
||||
{
|
||||
val aliasMap = getAliasMap(project)
|
||||
project.copy(child = Filter(replaceAlias(condition, getAliasMap(project)), grandChild))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,13 +117,29 @@ object Catalyst extends HardcodedDefinition
|
|||
Type.Native("Expression")
|
||||
)
|
||||
|
||||
Function("combineFilters", Type.AST("LogicalPlan"))(
|
||||
Function("namedExpressionsAreDeterministic", Type.Bool)(
|
||||
Type.Array(Type.Native("NamedExpression"))
|
||||
)
|
||||
|
||||
Function("canPushThroughCondition", Type.Bool)(
|
||||
Type.AST("LogicalPlan"),
|
||||
Type.Native("Expression"),
|
||||
)
|
||||
|
||||
Function("combineFiltersApplyLocally", Type.AST("LogicalPlan"))(
|
||||
Type.Native("Expression"),
|
||||
Type.Node("Filter"),
|
||||
Type.Native("Expression"),
|
||||
Type.AST("LogicalPlan"),
|
||||
)
|
||||
|
||||
Function("pushPredicateThroughNonJoinApplyLocallyOne", Type.AST("LogicalPlan"))(
|
||||
Type.Native("Expression"),
|
||||
Type.Node("Project"),
|
||||
Type.Array(Type.Native("NamedExpression")),
|
||||
Type.AST("LogicalPlan"),
|
||||
)
|
||||
|
||||
Global("JoinHint.NONE", Type.Native("JoinHint"))
|
||||
Global("RightOuter", Type.Native("JoinType"))
|
||||
Global("LeftOuter", Type.Native("JoinType"))
|
||||
|
@ -421,11 +437,35 @@ object Catalyst extends HardcodedDefinition
|
|||
Apply("expressionIsDeterministic")(Ref("nc"))
|
||||
)
|
||||
)(
|
||||
Apply("combineFilters")(
|
||||
Apply("combineFiltersApplyLocally")(
|
||||
Ref("fc"),
|
||||
Ref("nf"),
|
||||
Ref("nc"),
|
||||
Ref("grandChild"),
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
Rule("PushDownPredicates-2-1", "LogicalPlan")(
|
||||
Match("Filter")(
|
||||
Bind("condition"),
|
||||
Bind("project", Match("Project")(
|
||||
Bind("fields"),
|
||||
Bind("grandChild"),
|
||||
))
|
||||
) and Test(
|
||||
Apply("namedExpressionsAreDeterministic")(
|
||||
Ref("fields"),
|
||||
) and Apply("canPushThroughCondition")(
|
||||
Ref("grandChild"),
|
||||
Ref("condition"),
|
||||
)
|
||||
)
|
||||
)(
|
||||
Apply("pushPredicateThroughNonJoinApplyLocallyOne")(
|
||||
Ref("condition"),
|
||||
Ref("project"),
|
||||
Ref("fields"),
|
||||
Ref("grandChild"),
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue