[SPARK-34565][SQL] Collapse Window nodes with Project between them

### What changes were proposed in this pull request?

Extend the `CollapseWindow` rule to collapse `Window` nodes, that have `Project` between them.

### Why are the changes needed?

The analyzer will turn a `dataset.withColumn("colName", expressionWithWindowFunction)` method call to a `Project - Window - Project` chain in the logical plan. When this method is called multiple times in a row, then the projects can block the `Window` nodes from being collapsed by the current `CollapseWindow` rule.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #31677 from tanelk/SPARK-34565_collapse_windows.

Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
tanel.kiis@gmail.com 2021-06-21 22:10:49 +09:00 committed by Takeshi Yamamuro
parent 37ef7bb98c
commit f80be4187e
2 changed files with 68 additions and 7 deletions

View file

@ -950,15 +950,28 @@ object OptimizeWindowFunctions extends Rule[LogicalPlan] {
* independent and are of the same window function type, collapse into the parent.
*/
object CollapseWindow extends Rule[LogicalPlan] {
private def windowsCompatible(w1: Window, w2: Window): Boolean = {
w1.partitionSpec == w2.partitionSpec &&
w1.orderSpec == w2.orderSpec &&
w1.references.intersect(w2.windowOutputSet).isEmpty &&
w1.windowExpressions.nonEmpty && w2.windowExpressions.nonEmpty &&
// This assumes Window contains the same type of window expressions. This is ensured
// by ExtractWindowFunctions.
WindowFunctionType.functionType(w1.windowExpressions.head) ==
WindowFunctionType.functionType(w2.windowExpressions.head)
}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsPattern(WINDOW), ruleId) {
case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty &&
we1.nonEmpty && we2.nonEmpty &&
// This assumes Window contains the same type of window expressions. This is ensured
// by ExtractWindowFunctions.
WindowFunctionType.functionType(we1.head) == WindowFunctionType.functionType(we2.head) =>
case w1 @ Window(we1, _, _, w2 @ Window(we2, _, _, grandChild))
if windowsCompatible(w1, w2) =>
w1.copy(windowExpressions = we2 ++ we1, child = grandChild)
case w1 @ Window(we1, _, _, Project(pl, w2 @ Window(we2, _, _, grandChild)))
if windowsCompatible(w1, w2) && w1.references.subsetOf(grandChild.outputSet) =>
Project(
pl ++ w1.windowOutputSet,
w1.copy(windowExpressions = we2 ++ we1, child = grandChild))
}
}

View file

@ -27,7 +27,8 @@ class CollapseWindowSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("CollapseWindow", FixedPoint(10),
CollapseWindow) :: Nil
CollapseWindow,
CollapseProject) :: Nil
}
val testRelation = LocalRelation('a.double, 'b.double, 'c.string)
@ -100,4 +101,51 @@ class CollapseWindowSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
test("SPARK-34565: collapse two windows with the same partition/order " +
"and a Project between them") {
val query = testRelation
.window(Seq(min(a).as("_we0")), partitionSpec1, orderSpec1)
.select($"a", $"b", $"c", $"_we0" as "min_a")
.window(Seq(max(a).as("_we1")), partitionSpec1, orderSpec1)
.select($"a", $"b", $"c", $"min_a", $"_we1" as "max_a")
.window(Seq(sum(b).as("_we2")), partitionSpec1, orderSpec1)
.select($"a", $"b", $"c", $"min_a", $"max_a", $"_we2" as "sum_b")
.window(Seq(avg(b).as("_we3")), partitionSpec1, orderSpec1)
.select($"a", $"b", $"c", $"min_a", $"max_a", $"sum_b", $"_we3" as "avg_b")
.analyze
val optimized = Optimize.execute(query)
assert(query.output === optimized.output)
val correctAnswer = testRelation
.window(Seq(
min(a).as("_we0"),
max(a).as("_we1"),
sum(b).as("_we2"),
avg(b).as("_we3")
), partitionSpec1, orderSpec1)
.select(
a, b, c,
$"_we0" as "min_a", $"_we1" as "max_a", $"_we2" as "sum_b", $"_we3" as "avg_b")
.analyze
comparePlans(optimized, correctAnswer)
}
test("SPARK-34565: do not collapse two windows if project between them " +
"generates an input column") {
val query = testRelation
.window(Seq(min(a).as("min_a")), partitionSpec1, orderSpec1)
.select($"a", $"b", $"c", $"min_a", ($"a" + $"b").as("d"))
.window(Seq(max($"d").as("max_d")), partitionSpec1, orderSpec1)
.analyze
val optimized = Optimize.execute(query)
assert(query.output === optimized.output)
comparePlans(optimized, query)
}
}