From f80be4187ed4956fdd65c01698b91ca4a27bcc08 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Mon, 21 Jun 2021 22:10:49 +0900 Subject: [PATCH] [SPARK-34565][SQL] Collapse Window nodes with Project between them ### What changes were proposed in this pull request? Extend the `CollapseWindow` rule to collapse `Window` nodes, that have `Project` between them. ### Why are the changes needed? The analyzer will turn a `dataset.withColumn("colName", expressionWithWindowFunction)` method call to a `Project - Window - Project` chain in the logical plan. When this method is called multiple times in a row, then the projects can block the `Window` nodes from being collapsed by the current `CollapseWindow` rule. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #31677 from tanelk/SPARK-34565_collapse_windows. Lead-authored-by: tanel.kiis@gmail.com Co-authored-by: Tanel Kiis Signed-off-by: Takeshi Yamamuro --- .../sql/catalyst/optimizer/Optimizer.scala | 25 +++++++--- .../optimizer/CollapseWindowSuite.scala | 50 ++++++++++++++++++- 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d1010778c7..c49ab6fb53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -950,15 +950,28 @@ object OptimizeWindowFunctions extends Rule[LogicalPlan] { * independent and are of the same window function type, collapse into the parent. */ object CollapseWindow extends Rule[LogicalPlan] { + private def windowsCompatible(w1: Window, w2: Window): Boolean = { + w1.partitionSpec == w2.partitionSpec && + w1.orderSpec == w2.orderSpec && + w1.references.intersect(w2.windowOutputSet).isEmpty && + w1.windowExpressions.nonEmpty && w2.windowExpressions.nonEmpty && + // This assumes Window contains the same type of window expressions. This is ensured + // by ExtractWindowFunctions. + WindowFunctionType.functionType(w1.windowExpressions.head) == + WindowFunctionType.functionType(w2.windowExpressions.head) + } + def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsPattern(WINDOW), ruleId) { - case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) - if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty && - we1.nonEmpty && we2.nonEmpty && - // This assumes Window contains the same type of window expressions. This is ensured - // by ExtractWindowFunctions. - WindowFunctionType.functionType(we1.head) == WindowFunctionType.functionType(we2.head) => + case w1 @ Window(we1, _, _, w2 @ Window(we2, _, _, grandChild)) + if windowsCompatible(w1, w2) => w1.copy(windowExpressions = we2 ++ we1, child = grandChild) + + case w1 @ Window(we1, _, _, Project(pl, w2 @ Window(we2, _, _, grandChild))) + if windowsCompatible(w1, w2) && w1.references.subsetOf(grandChild.outputSet) => + Project( + pl ++ w1.windowOutputSet, + w1.copy(windowExpressions = we2 ++ we1, child = grandChild)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala index 3b3b4907ee..2d9b6c3cec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala @@ -27,7 +27,8 @@ class CollapseWindowSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("CollapseWindow", FixedPoint(10), - CollapseWindow) :: Nil + CollapseWindow, + CollapseProject) :: Nil } val testRelation = LocalRelation('a.double, 'b.double, 'c.string) @@ -100,4 +101,51 @@ class CollapseWindowSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("SPARK-34565: collapse two windows with the same partition/order " + + "and a Project between them") { + + val query = testRelation + .window(Seq(min(a).as("_we0")), partitionSpec1, orderSpec1) + .select($"a", $"b", $"c", $"_we0" as "min_a") + .window(Seq(max(a).as("_we1")), partitionSpec1, orderSpec1) + .select($"a", $"b", $"c", $"min_a", $"_we1" as "max_a") + .window(Seq(sum(b).as("_we2")), partitionSpec1, orderSpec1) + .select($"a", $"b", $"c", $"min_a", $"max_a", $"_we2" as "sum_b") + .window(Seq(avg(b).as("_we3")), partitionSpec1, orderSpec1) + .select($"a", $"b", $"c", $"min_a", $"max_a", $"sum_b", $"_we3" as "avg_b") + .analyze + + val optimized = Optimize.execute(query) + assert(query.output === optimized.output) + + val correctAnswer = testRelation + .window(Seq( + min(a).as("_we0"), + max(a).as("_we1"), + sum(b).as("_we2"), + avg(b).as("_we3") + ), partitionSpec1, orderSpec1) + .select( + a, b, c, + $"_we0" as "min_a", $"_we1" as "max_a", $"_we2" as "sum_b", $"_we3" as "avg_b") + .analyze + + comparePlans(optimized, correctAnswer) + } + + test("SPARK-34565: do not collapse two windows if project between them " + + "generates an input column") { + + val query = testRelation + .window(Seq(min(a).as("min_a")), partitionSpec1, orderSpec1) + .select($"a", $"b", $"c", $"min_a", ($"a" + $"b").as("d")) + .window(Seq(max($"d").as("max_d")), partitionSpec1, orderSpec1) + .analyze + + val optimized = Optimize.execute(query) + assert(query.output === optimized.output) + + comparePlans(optimized, query) + } }