[SPARK-36020][SQL] Check logical link in remove redundant projects

### What changes were proposed in this pull request?

The `RemoveRedundantProjects` feature can conflict with the AQE broadcast threshold ([PR](https://github.com/apache/spark/pull/32391)) sometimes. After removing the project, the physical plan to logical plan link can be changed and we may have a `Project` above `LogicalQueryStage`. This breaks AQE broadcast threshold, because the stats of `Project` does not have the `isRuntime = true` flag, and thus still use the normal broadcast threshold.

This PR updates `RemoveRedundantProjects` to not remove `ProjectExec` that has a different logical plan link than its child.

### Why are the changes needed?

Make AQE broadcast threshold work in more cases.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #33222 from cloud-fan/aqe2.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2021-07-06 21:17:33 +08:00
parent 2fff060b6d
commit 6b3ab8262f
8 changed files with 208 additions and 153 deletions

View file

@ -48,10 +48,8 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
private def removeProject(plan: SparkPlan, requireOrdering: Boolean): SparkPlan = { private def removeProject(plan: SparkPlan, requireOrdering: Boolean): SparkPlan = {
plan match { plan match {
case p @ ProjectExec(_, child) => case p @ ProjectExec(_, child) =>
if (isRedundant(p, child, requireOrdering)) { if (isRedundant(p, child, requireOrdering) && canRemove(p, child)) {
val newPlan = removeProject(child, requireOrdering) removeProject(child, requireOrdering)
newPlan.setLogicalLink(child.logicalLink.get)
newPlan
} else { } else {
p.mapChildren(removeProject(_, false)) p.mapChildren(removeProject(_, false))
} }
@ -110,4 +108,11 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
} }
} }
} }
// SPARK-36020: Currently a project can only be removed if (1) its logical link is empty or (2)
// its logical link is the same as the child's logical link. This is to ensure the physical
// plan node can correctly map to its logical plan node in AQE.
private def canRemove(project: ProjectExec, child: SparkPlan): Boolean = {
project.logicalLink.isEmpty || project.logicalLink.exists(child.logicalLink.contains)
}
} }

View file

@ -1,28 +1,29 @@
== Physical Plan == == Physical Plan ==
TakeOrderedAndProject (24) TakeOrderedAndProject (25)
+- * HashAggregate (23) +- * HashAggregate (24)
+- Exchange (22) +- Exchange (23)
+- * HashAggregate (21) +- * HashAggregate (22)
+- * Expand (20) +- * Expand (21)
+- * BroadcastNestedLoopJoin Inner BuildRight (19) +- * Project (20)
:- * Project (15) +- * BroadcastNestedLoopJoin Inner BuildRight (19)
: +- * SortMergeJoin Inner (14) :- * Project (15)
: :- * Sort (8) : +- * SortMergeJoin Inner (14)
: : +- Exchange (7) : :- * Sort (8)
: : +- * Project (6) : : +- Exchange (7)
: : +- * BroadcastHashJoin Inner BuildRight (5) : : +- * Project (6)
: : :- * Filter (3) : : +- * BroadcastHashJoin Inner BuildRight (5)
: : : +- * ColumnarToRow (2) : : :- * Filter (3)
: : : +- Scan parquet default.inventory (1) : : : +- * ColumnarToRow (2)
: : +- ReusedExchange (4) : : : +- Scan parquet default.inventory (1)
: +- * Sort (13) : : +- ReusedExchange (4)
: +- Exchange (12) : +- * Sort (13)
: +- * Filter (11) : +- Exchange (12)
: +- * ColumnarToRow (10) : +- * Filter (11)
: +- Scan parquet default.item (9) : +- * ColumnarToRow (10)
+- BroadcastExchange (18) : +- Scan parquet default.item (9)
+- * ColumnarToRow (17) +- BroadcastExchange (18)
+- Scan parquet default.warehouse (16) +- * ColumnarToRow (17)
+- Scan parquet default.warehouse (16)
(1) Scan parquet default.inventory (1) Scan parquet default.inventory
@ -40,7 +41,7 @@ Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3]
Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3] Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3]
Condition : isnotnull(inv_item_sk#1) Condition : isnotnull(inv_item_sk#1)
(4) ReusedExchange [Reuses operator id: 29] (4) ReusedExchange [Reuses operator id: 30]
Output [1]: [d_date_sk#5] Output [1]: [d_date_sk#5]
(5) BroadcastHashJoin [codegen id : 2] (5) BroadcastHashJoin [codegen id : 2]
@ -107,61 +108,65 @@ Arguments: IdentityBroadcastMode, [id=#13]
(19) BroadcastNestedLoopJoin [codegen id : 7] (19) BroadcastNestedLoopJoin [codegen id : 7]
Join condition: None Join condition: None
(20) Expand [codegen id : 7] (20) Project [codegen id : 7]
Output [5]: [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, i_category#10]
Input [5]: [inv_quantity_on_hand#2, i_brand#8, i_class#9, i_category#10, i_product_name#11] Input [5]: [inv_quantity_on_hand#2, i_brand#8, i_class#9, i_category#10, i_product_name#11]
(21) Expand [codegen id : 7]
Input [5]: [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, i_category#10]
Arguments: [[inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, i_category#10, 0], [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, null, 1], [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, null, null, 3], [inv_quantity_on_hand#2, i_product_name#11, null, null, null, 7], [inv_quantity_on_hand#2, null, null, null, null, 15]], [inv_quantity_on_hand#2, i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18] Arguments: [[inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, i_category#10, 0], [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, null, 1], [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, null, null, 3], [inv_quantity_on_hand#2, i_product_name#11, null, null, null, 7], [inv_quantity_on_hand#2, null, null, null, null, 15]], [inv_quantity_on_hand#2, i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
(21) HashAggregate [codegen id : 7] (22) HashAggregate [codegen id : 7]
Input [6]: [inv_quantity_on_hand#2, i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18] Input [6]: [inv_quantity_on_hand#2, i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
Keys [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18] Keys [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
Functions [1]: [partial_avg(inv_quantity_on_hand#2)] Functions [1]: [partial_avg(inv_quantity_on_hand#2)]
Aggregate Attributes [2]: [sum#19, count#20] Aggregate Attributes [2]: [sum#19, count#20]
Results [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22] Results [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
(22) Exchange (23) Exchange
Input [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22] Input [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
Arguments: hashpartitioning(i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, 5), ENSURE_REQUIREMENTS, [id=#23] Arguments: hashpartitioning(i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, 5), ENSURE_REQUIREMENTS, [id=#23]
(23) HashAggregate [codegen id : 8] (24) HashAggregate [codegen id : 8]
Input [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22] Input [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
Keys [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18] Keys [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
Functions [1]: [avg(inv_quantity_on_hand#2)] Functions [1]: [avg(inv_quantity_on_hand#2)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#2)#24] Aggregate Attributes [1]: [avg(inv_quantity_on_hand#2)#24]
Results [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, avg(inv_quantity_on_hand#2)#24 AS qoh#25] Results [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, avg(inv_quantity_on_hand#2)#24 AS qoh#25]
(24) TakeOrderedAndProject (25) TakeOrderedAndProject
Input [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, qoh#25] Input [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, qoh#25]
Arguments: 100, [qoh#25 ASC NULLS FIRST, i_product_name#14 ASC NULLS FIRST, i_brand#15 ASC NULLS FIRST, i_class#16 ASC NULLS FIRST, i_category#17 ASC NULLS FIRST], [i_product_name#14, i_brand#15, i_class#16, i_category#17, qoh#25] Arguments: 100, [qoh#25 ASC NULLS FIRST, i_product_name#14 ASC NULLS FIRST, i_brand#15 ASC NULLS FIRST, i_class#16 ASC NULLS FIRST, i_category#17 ASC NULLS FIRST], [i_product_name#14, i_brand#15, i_class#16, i_category#17, qoh#25]
===== Subqueries ===== ===== Subqueries =====
Subquery:1 Hosting operator id = 1 Hosting Expression = inv_date_sk#3 IN dynamicpruning#4 Subquery:1 Hosting operator id = 1 Hosting Expression = inv_date_sk#3 IN dynamicpruning#4
BroadcastExchange (29) BroadcastExchange (30)
+- * Project (28) +- * Project (29)
+- * Filter (27) +- * Filter (28)
+- * ColumnarToRow (26) +- * ColumnarToRow (27)
+- Scan parquet default.date_dim (25) +- Scan parquet default.date_dim (26)
(25) Scan parquet default.date_dim (26) Scan parquet default.date_dim
Output [2]: [d_date_sk#5, d_month_seq#26] Output [2]: [d_date_sk#5, d_month_seq#26]
Batched: true Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim] Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211), IsNotNull(d_date_sk)] PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_month_seq:int> ReadSchema: struct<d_date_sk:int,d_month_seq:int>
(26) ColumnarToRow [codegen id : 1] (27) ColumnarToRow [codegen id : 1]
Input [2]: [d_date_sk#5, d_month_seq#26] Input [2]: [d_date_sk#5, d_month_seq#26]
(27) Filter [codegen id : 1] (28) Filter [codegen id : 1]
Input [2]: [d_date_sk#5, d_month_seq#26] Input [2]: [d_date_sk#5, d_month_seq#26]
Condition : (((isnotnull(d_month_seq#26) AND (d_month_seq#26 >= 1200)) AND (d_month_seq#26 <= 1211)) AND isnotnull(d_date_sk#5)) Condition : (((isnotnull(d_month_seq#26) AND (d_month_seq#26 >= 1200)) AND (d_month_seq#26 <= 1211)) AND isnotnull(d_date_sk#5))
(28) Project [codegen id : 1] (29) Project [codegen id : 1]
Output [1]: [d_date_sk#5] Output [1]: [d_date_sk#5]
Input [2]: [d_date_sk#5, d_month_seq#26] Input [2]: [d_date_sk#5, d_month_seq#26]
(29) BroadcastExchange (30) BroadcastExchange
Input [1]: [d_date_sk#5] Input [1]: [d_date_sk#5]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#27] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#27]

View file

@ -6,44 +6,45 @@ TakeOrderedAndProject [qoh,i_product_name,i_brand,i_class,i_category]
WholeStageCodegen (7) WholeStageCodegen (7)
HashAggregate [i_product_name,i_brand,i_class,i_category,spark_grouping_id,inv_quantity_on_hand] [sum,count,sum,count] HashAggregate [i_product_name,i_brand,i_class,i_category,spark_grouping_id,inv_quantity_on_hand] [sum,count,sum,count]
Expand [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category] Expand [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
BroadcastNestedLoopJoin Project [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name] BroadcastNestedLoopJoin
SortMergeJoin [inv_item_sk,i_item_sk] Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name]
InputAdapter SortMergeJoin [inv_item_sk,i_item_sk]
WholeStageCodegen (3) InputAdapter
Sort [inv_item_sk] WholeStageCodegen (3)
Sort [inv_item_sk]
InputAdapter
Exchange [inv_item_sk] #2
WholeStageCodegen (2)
Project [inv_item_sk,inv_quantity_on_hand]
BroadcastHashJoin [inv_date_sk,d_date_sk]
Filter [inv_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.inventory [inv_item_sk,inv_quantity_on_hand,inv_date_sk]
SubqueryBroadcast [d_date_sk] #1
BroadcastExchange #3
WholeStageCodegen (1)
Project [d_date_sk]
Filter [d_month_seq,d_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_month_seq]
InputAdapter
ReusedExchange [d_date_sk] #3
InputAdapter
WholeStageCodegen (5)
Sort [i_item_sk]
InputAdapter
Exchange [i_item_sk] #4
WholeStageCodegen (4)
Filter [i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (6)
ColumnarToRow
InputAdapter InputAdapter
Exchange [inv_item_sk] #2 Scan parquet default.warehouse
WholeStageCodegen (2)
Project [inv_item_sk,inv_quantity_on_hand]
BroadcastHashJoin [inv_date_sk,d_date_sk]
Filter [inv_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.inventory [inv_item_sk,inv_quantity_on_hand,inv_date_sk]
SubqueryBroadcast [d_date_sk] #1
BroadcastExchange #3
WholeStageCodegen (1)
Project [d_date_sk]
Filter [d_month_seq,d_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_month_seq]
InputAdapter
ReusedExchange [d_date_sk] #3
InputAdapter
WholeStageCodegen (5)
Sort [i_item_sk]
InputAdapter
Exchange [i_item_sk] #4
WholeStageCodegen (4)
Filter [i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (6)
ColumnarToRow
InputAdapter
Scan parquet default.warehouse

View file

@ -1,25 +1,26 @@
== Physical Plan == == Physical Plan ==
TakeOrderedAndProject (21) TakeOrderedAndProject (22)
+- * HashAggregate (20) +- * HashAggregate (21)
+- Exchange (19) +- Exchange (20)
+- * HashAggregate (18) +- * HashAggregate (19)
+- * Expand (17) +- * Expand (18)
+- * BroadcastNestedLoopJoin Inner BuildRight (16) +- * Project (17)
:- * Project (12) +- * BroadcastNestedLoopJoin Inner BuildRight (16)
: +- * BroadcastHashJoin Inner BuildRight (11) :- * Project (12)
: :- * Project (6) : +- * BroadcastHashJoin Inner BuildRight (11)
: : +- * BroadcastHashJoin Inner BuildRight (5) : :- * Project (6)
: : :- * Filter (3) : : +- * BroadcastHashJoin Inner BuildRight (5)
: : : +- * ColumnarToRow (2) : : :- * Filter (3)
: : : +- Scan parquet default.inventory (1) : : : +- * ColumnarToRow (2)
: : +- ReusedExchange (4) : : : +- Scan parquet default.inventory (1)
: +- BroadcastExchange (10) : : +- ReusedExchange (4)
: +- * Filter (9) : +- BroadcastExchange (10)
: +- * ColumnarToRow (8) : +- * Filter (9)
: +- Scan parquet default.item (7) : +- * ColumnarToRow (8)
+- BroadcastExchange (15) : +- Scan parquet default.item (7)
+- * ColumnarToRow (14) +- BroadcastExchange (15)
+- Scan parquet default.warehouse (13) +- * ColumnarToRow (14)
+- Scan parquet default.warehouse (13)
(1) Scan parquet default.inventory (1) Scan parquet default.inventory
@ -37,7 +38,7 @@ Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3]
Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3] Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3]
Condition : isnotnull(inv_item_sk#1) Condition : isnotnull(inv_item_sk#1)
(4) ReusedExchange [Reuses operator id: 26] (4) ReusedExchange [Reuses operator id: 27]
Output [1]: [d_date_sk#5] Output [1]: [d_date_sk#5]
(5) BroadcastHashJoin [codegen id : 4] (5) BroadcastHashJoin [codegen id : 4]
@ -92,61 +93,65 @@ Arguments: IdentityBroadcastMode, [id=#12]
(16) BroadcastNestedLoopJoin [codegen id : 4] (16) BroadcastNestedLoopJoin [codegen id : 4]
Join condition: None Join condition: None
(17) Expand [codegen id : 4] (17) Project [codegen id : 4]
Output [5]: [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, i_category#9]
Input [5]: [inv_quantity_on_hand#2, i_brand#7, i_class#8, i_category#9, i_product_name#10] Input [5]: [inv_quantity_on_hand#2, i_brand#7, i_class#8, i_category#9, i_product_name#10]
(18) Expand [codegen id : 4]
Input [5]: [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, i_category#9]
Arguments: [[inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, i_category#9, 0], [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, null, 1], [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, null, null, 3], [inv_quantity_on_hand#2, i_product_name#10, null, null, null, 7], [inv_quantity_on_hand#2, null, null, null, null, 15]], [inv_quantity_on_hand#2, i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17] Arguments: [[inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, i_category#9, 0], [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, null, 1], [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, null, null, 3], [inv_quantity_on_hand#2, i_product_name#10, null, null, null, 7], [inv_quantity_on_hand#2, null, null, null, null, 15]], [inv_quantity_on_hand#2, i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17]
(18) HashAggregate [codegen id : 4] (19) HashAggregate [codegen id : 4]
Input [6]: [inv_quantity_on_hand#2, i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17] Input [6]: [inv_quantity_on_hand#2, i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17]
Keys [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17] Keys [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17]
Functions [1]: [partial_avg(inv_quantity_on_hand#2)] Functions [1]: [partial_avg(inv_quantity_on_hand#2)]
Aggregate Attributes [2]: [sum#18, count#19] Aggregate Attributes [2]: [sum#18, count#19]
Results [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21] Results [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21]
(19) Exchange (20) Exchange
Input [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21] Input [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21]
Arguments: hashpartitioning(i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, 5), ENSURE_REQUIREMENTS, [id=#22] Arguments: hashpartitioning(i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, 5), ENSURE_REQUIREMENTS, [id=#22]
(20) HashAggregate [codegen id : 5] (21) HashAggregate [codegen id : 5]
Input [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21] Input [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21]
Keys [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17] Keys [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17]
Functions [1]: [avg(inv_quantity_on_hand#2)] Functions [1]: [avg(inv_quantity_on_hand#2)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#2)#23] Aggregate Attributes [1]: [avg(inv_quantity_on_hand#2)#23]
Results [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, avg(inv_quantity_on_hand#2)#23 AS qoh#24] Results [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, avg(inv_quantity_on_hand#2)#23 AS qoh#24]
(21) TakeOrderedAndProject (22) TakeOrderedAndProject
Input [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, qoh#24] Input [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, qoh#24]
Arguments: 100, [qoh#24 ASC NULLS FIRST, i_product_name#13 ASC NULLS FIRST, i_brand#14 ASC NULLS FIRST, i_class#15 ASC NULLS FIRST, i_category#16 ASC NULLS FIRST], [i_product_name#13, i_brand#14, i_class#15, i_category#16, qoh#24] Arguments: 100, [qoh#24 ASC NULLS FIRST, i_product_name#13 ASC NULLS FIRST, i_brand#14 ASC NULLS FIRST, i_class#15 ASC NULLS FIRST, i_category#16 ASC NULLS FIRST], [i_product_name#13, i_brand#14, i_class#15, i_category#16, qoh#24]
===== Subqueries ===== ===== Subqueries =====
Subquery:1 Hosting operator id = 1 Hosting Expression = inv_date_sk#3 IN dynamicpruning#4 Subquery:1 Hosting operator id = 1 Hosting Expression = inv_date_sk#3 IN dynamicpruning#4
BroadcastExchange (26) BroadcastExchange (27)
+- * Project (25) +- * Project (26)
+- * Filter (24) +- * Filter (25)
+- * ColumnarToRow (23) +- * ColumnarToRow (24)
+- Scan parquet default.date_dim (22) +- Scan parquet default.date_dim (23)
(22) Scan parquet default.date_dim (23) Scan parquet default.date_dim
Output [2]: [d_date_sk#5, d_month_seq#25] Output [2]: [d_date_sk#5, d_month_seq#25]
Batched: true Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim] Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211), IsNotNull(d_date_sk)] PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_month_seq:int> ReadSchema: struct<d_date_sk:int,d_month_seq:int>
(23) ColumnarToRow [codegen id : 1] (24) ColumnarToRow [codegen id : 1]
Input [2]: [d_date_sk#5, d_month_seq#25] Input [2]: [d_date_sk#5, d_month_seq#25]
(24) Filter [codegen id : 1] (25) Filter [codegen id : 1]
Input [2]: [d_date_sk#5, d_month_seq#25] Input [2]: [d_date_sk#5, d_month_seq#25]
Condition : (((isnotnull(d_month_seq#25) AND (d_month_seq#25 >= 1200)) AND (d_month_seq#25 <= 1211)) AND isnotnull(d_date_sk#5)) Condition : (((isnotnull(d_month_seq#25) AND (d_month_seq#25 >= 1200)) AND (d_month_seq#25 <= 1211)) AND isnotnull(d_date_sk#5))
(25) Project [codegen id : 1] (26) Project [codegen id : 1]
Output [1]: [d_date_sk#5] Output [1]: [d_date_sk#5]
Input [2]: [d_date_sk#5, d_month_seq#25] Input [2]: [d_date_sk#5, d_month_seq#25]
(26) BroadcastExchange (27) BroadcastExchange
Input [1]: [d_date_sk#5] Input [1]: [d_date_sk#5]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#26] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#26]

View file

@ -6,35 +6,36 @@ TakeOrderedAndProject [qoh,i_product_name,i_brand,i_class,i_category]
WholeStageCodegen (4) WholeStageCodegen (4)
HashAggregate [i_product_name,i_brand,i_class,i_category,spark_grouping_id,inv_quantity_on_hand] [sum,count,sum,count] HashAggregate [i_product_name,i_brand,i_class,i_category,spark_grouping_id,inv_quantity_on_hand] [sum,count,sum,count]
Expand [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category] Expand [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
BroadcastNestedLoopJoin Project [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name] BroadcastNestedLoopJoin
BroadcastHashJoin [inv_item_sk,i_item_sk] Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name]
Project [inv_item_sk,inv_quantity_on_hand] BroadcastHashJoin [inv_item_sk,i_item_sk]
BroadcastHashJoin [inv_date_sk,d_date_sk] Project [inv_item_sk,inv_quantity_on_hand]
Filter [inv_item_sk] BroadcastHashJoin [inv_date_sk,d_date_sk]
ColumnarToRow Filter [inv_item_sk]
InputAdapter
Scan parquet default.inventory [inv_item_sk,inv_quantity_on_hand,inv_date_sk]
SubqueryBroadcast [d_date_sk] #1
BroadcastExchange #2
WholeStageCodegen (1)
Project [d_date_sk]
Filter [d_month_seq,d_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_month_seq]
InputAdapter
ReusedExchange [d_date_sk] #2
InputAdapter
BroadcastExchange #3
WholeStageCodegen (2)
Filter [i_item_sk]
ColumnarToRow ColumnarToRow
InputAdapter InputAdapter
Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name] Scan parquet default.inventory [inv_item_sk,inv_quantity_on_hand,inv_date_sk]
InputAdapter SubqueryBroadcast [d_date_sk] #1
BroadcastExchange #4 BroadcastExchange #2
WholeStageCodegen (3) WholeStageCodegen (1)
ColumnarToRow Project [d_date_sk]
InputAdapter Filter [d_month_seq,d_date_sk]
Scan parquet default.warehouse ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_month_seq]
InputAdapter
ReusedExchange [d_date_sk] #2
InputAdapter
BroadcastExchange #3
WholeStageCodegen (2)
Filter [i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
InputAdapter
BroadcastExchange #4
WholeStageCodegen (3)
ColumnarToRow
InputAdapter
Scan parquet default.warehouse

View file

@ -131,7 +131,7 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite with DisableAdaptiv
} }
private def getLogicalPlan(node: SparkPlan): LogicalPlan = { private def getLogicalPlan(node: SparkPlan): LogicalPlan = {
node.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).getOrElse { node.logicalLink.getOrElse {
fail(node.getClass.getSimpleName + " does not have a logical plan link") fail(node.getClass.getSimpleName + " does not have a logical plan link")
} }
} }

View file

@ -216,8 +216,23 @@ abstract class RemoveRedundantProjectsSuiteBase
|ORDER BY t1.key, t2.key, s1, s2 |ORDER BY t1.key, t2.key, s1, s2
|LIMIT 10 |LIMIT 10
|""".stripMargin |""".stripMargin
assertProjectExec(query, 0, 3) // The Project above the Expand is not removed due to SPARK-36020.
assertProjectExec(query, 1, 3)
}
test("SPARK-36020: Project should not be removed when child's logical link is different") {
val query =
"""
|WITH t AS (
| SELECT key, a, b, c, explode(d) AS d FROM testView
|)
|SELECT t1.key, t1.d, t2.key
|FROM (SELECT d, key FROM t) t1
|JOIN testView t2 ON t1.key = t2.key
|""".stripMargin
// The ProjectExec above the GenerateExec should not be removed because
// they have different logical links.
assertProjectExec(query, enabled = 2, disabled = 3)
} }
Seq("true", "false").foreach { codegenEnabled => Seq("true", "false").foreach { codegenEnabled =>

View file

@ -1929,6 +1929,29 @@ class AdaptiveQueryExecSuite
} }
} }
} }
test("SPARK-36020: Check logical link in remove redundant projects") {
withTempView("t") {
spark.range(10).selectExpr("id % 10 as key", "cast(id * 2 as int) as a",
"cast(id * 3 as int) as b", "array(id, id + 1, id + 3) as c").createOrReplaceTempView("t")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "800") {
val query =
"""
|WITH tt AS (
| SELECT key, a, b, explode(c) AS c FROM t
|)
|SELECT t1.key, t1.c, t2.key, t2.c
|FROM (SELECT a, b, c, key FROM tt WHERE a > 1) t1
|JOIN (SELECT a, b, c, key FROM tt) t2
| ON t1.key = t2.key
|""".stripMargin
val (origin, adaptive) = runAdaptiveAndVerifyResult(query)
assert(findTopLevelSortMergeJoin(origin).size == 1)
assert(findTopLevelBroadcastHashJoin(adaptive).size == 1)
}
}
}
} }
/** /**