From a884daad805a701494e87393dc307937472a985d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 Apr 2016 13:03:27 -0700 Subject: [PATCH] [SPARK-14191][SQL] Remove invalid Expand operator constraints `Expand` operator now uses its child plan's constraints as its valid constraints (i.e., the base of constraints). This is not correct because `Expand` will set its group by attributes to null values. So the nullability of these attributes should be true. E.g., for an `Expand` operator like: val input = LocalRelation('a.int, 'b.int, 'c.int).where('c.attr > 10 && 'a.attr < 5 && 'b.attr > 2) Expand( Seq( Seq('c, Literal.create(null, StringType), 1), Seq('c, 'a, 2)), Seq('c, 'a, 'gid.int), Project(Seq('a, 'c), input)) The `Project` operator has the constraints `IsNotNull('a)`, `IsNotNull('b)` and `IsNotNull('c)`. But the `Expand` should not have `IsNotNull('a)` in its constraints. This PR is the first step for this issue and remove invalid constraints of `Expand` operator. A test is added to `ConstraintPropagationSuite`. Author: Liang-Chi Hsieh Author: Michael Armbrust Closes #11995 from viirya/fix-expand-constraints. --- .../plans/logical/basicOperators.scala | 5 +++- .../plans/ConstraintPropagationSuite.scala | 27 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 09c200fa83..a18efc90ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -519,7 +519,6 @@ case class Expand( projections: Seq[Seq[Expression]], output: Seq[Attribute], child: LogicalPlan) extends UnaryNode { - override def references: AttributeSet = AttributeSet(projections.flatten.flatMap(_.references)) @@ -527,6 +526,10 @@ case class Expand( val sizeInBytes = super.statistics.sizeInBytes * projections.length Statistics(sizeInBytes = sizeInBytes) } + + // This operator can reuse attributes (for example making them null when doing a roll up) so + // the contraints of the child may no longer be valid. + override protected def validConstraints: Set[Expression] = Set.empty[Expression] } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 5cbb889f8e..49c1353efb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -88,6 +88,33 @@ class ConstraintPropagationSuite extends SparkFunSuite { IsNotNull(resolveColumn(aliasedRelation.analyze, "a"))))) } + test("propagating constraints in expand") { + val tr = LocalRelation('a.int, 'b.int, 'c.int) + + assert(tr.analyze.constraints.isEmpty) + + // We add IsNotNull constraints for 'a, 'b and 'c into LocalRelation + // by creating notNullRelation. + val notNullRelation = tr.where('c.attr > 10 && 'a.attr < 5 && 'b.attr > 2) + verifyConstraints(notNullRelation.analyze.constraints, + ExpressionSet(Seq(resolveColumn(notNullRelation.analyze, "c") > 10, + IsNotNull(resolveColumn(notNullRelation.analyze, "c")), + resolveColumn(notNullRelation.analyze, "a") < 5, + IsNotNull(resolveColumn(notNullRelation.analyze, "a")), + resolveColumn(notNullRelation.analyze, "b") > 2, + IsNotNull(resolveColumn(notNullRelation.analyze, "b"))))) + + val expand = Expand( + Seq( + Seq('c, Literal.create(null, StringType), 1), + Seq('c, 'a, 2)), + Seq('c, 'a, 'gid.int), + Project(Seq('a, 'c), + notNullRelation)) + verifyConstraints(expand.analyze.constraints, + ExpressionSet(Seq.empty[Expression])) + } + test("propagating constraints in aliases") { val tr = LocalRelation('a.int, 'b.string, 'c.int)