[SPARK-23802][SQL] PropagateEmptyRelation can leave query plan in unresolved state

## What changes were proposed in this pull request?

Add cast to nulls introduced by PropagateEmptyRelation so in cases they're part of coalesce they will not break its type checking rules

## How was this patch tested?

Added unit test

Author: Robert Kruszewski <robertk@palantir.com>

Closes #20914 from robert3005/rk/propagate-empty-fix.
This commit is contained in:
Robert Kruszewski 2018-04-03 17:25:54 -07:00 committed by gatorsmile
parent 359375eff7
commit 5cfd5fabcd
2 changed files with 25 additions and 9 deletions

View file

@ -17,10 +17,12 @@
package org.apache.spark.sql.catalyst.optimizer package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
/** /**
* Collapse plans consisting empty local relations generated by [[PruneFilters]]. * Collapse plans consisting empty local relations generated by [[PruneFilters]].
@ -32,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules._
* - Aggregate with all empty children and at least one grouping expression. * - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results. * - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
*/ */
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper with CastSupport {
private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
case p: LocalRelation => p.data.isEmpty case p: LocalRelation => p.data.isEmpty
case _ => false case _ => false
@ -43,7 +45,9 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
// Construct a project list from plan's output, while the value is always NULL. // Construct a project list from plan's output, while the value is always NULL.
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
plan.output.map{ a => Alias(Literal(null), a.name)(a.exprId) } plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) }
override def conf: SQLConf = SQLConf.get
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p: Union if p.children.forall(isEmptyLocalRelation) => case p: Union if p.children.forall(isEmptyLocalRelation) =>

View file

@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.{IntegerType, StructType}
class PropagateEmptyRelationSuite extends PlanTest { class PropagateEmptyRelationSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] { object Optimize extends RuleExecutor[LogicalPlan] {
@ -37,7 +37,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceIntersectWithSemiJoin, ReplaceIntersectWithSemiJoin,
PushDownPredicate, PushDownPredicate,
PruneFilters, PruneFilters,
PropagateEmptyRelation) :: Nil PropagateEmptyRelation,
CollapseProject) :: Nil
} }
object OptimizeWithoutPropagateEmptyRelation extends RuleExecutor[LogicalPlan] { object OptimizeWithoutPropagateEmptyRelation extends RuleExecutor[LogicalPlan] {
@ -48,7 +49,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceExceptWithAntiJoin, ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin, ReplaceIntersectWithSemiJoin,
PushDownPredicate, PushDownPredicate,
PruneFilters) :: Nil PruneFilters,
CollapseProject) :: Nil
} }
val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
@ -79,9 +81,11 @@ class PropagateEmptyRelationSuite extends PlanTest {
(true, false, Inner, Some(LocalRelation('a.int, 'b.int))), (true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
(true, false, Cross, Some(LocalRelation('a.int, 'b.int))), (true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
(true, false, LeftOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)), (true, false, LeftOuter,
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
(true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))), (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
(true, false, FullOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)), (true, false, FullOuter,
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
(true, false, LeftAnti, Some(testRelation1)), (true, false, LeftAnti, Some(testRelation1)),
(true, false, LeftSemi, Some(LocalRelation('a.int))), (true, false, LeftSemi, Some(LocalRelation('a.int))),
@ -89,8 +93,9 @@ class PropagateEmptyRelationSuite extends PlanTest {
(false, true, Cross, Some(LocalRelation('a.int, 'b.int))), (false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
(false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))), (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
(false, true, RightOuter, (false, true, RightOuter,
Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)), Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
(false, true, FullOuter, Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)), (false, true, FullOuter,
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
(false, true, LeftAnti, Some(LocalRelation('a.int))), (false, true, LeftAnti, Some(LocalRelation('a.int))),
(false, true, LeftSemi, Some(LocalRelation('a.int))), (false, true, LeftSemi, Some(LocalRelation('a.int))),
@ -209,4 +214,11 @@ class PropagateEmptyRelationSuite extends PlanTest {
comparePlans(optimized, correctAnswer) comparePlans(optimized, correctAnswer)
} }
test("propagate empty relation keeps the plan resolved") {
val query = testRelation1.join(
LocalRelation('a.int, 'b.int), UsingJoin(FullOuter, "a" :: Nil), None)
val optimized = Optimize.execute(query.analyze)
assert(optimized.resolved)
}
} }