[SPARK-36032][SQL] Use inputPlan instead of currentPhysicalPlan to initialize logical link

### What changes were proposed in this pull request?

Change `currentPhysicalPlan.logicalLink.get` to `inputPlan.logicalLink.get` for initial logical link.

### Why are the changes needed?

At `initialPlan` we may remove some Spark Plan with `queryStagePreparationRules`, if removed Spark Plan is top level node, then we will lose the linked logical node.

Since we support AQE side broadcast join config. It's more common that a join is SMJ at normal planner and changed to BHJ after AQE reOptimize. However, `RemoveRedundantSorts` is applied before reOptimize at `initialPlan`, then a local sort might be removed incorrectly if a join is SMJ at first but changed to BHJ during reOptimize.

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

add test

Closes #33244 from ulysses-you/SPARK-36032.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 484b50cadf)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
ulysses-you 2021-07-08 22:39:53 -07:00 committed by Liang-Chi Hsieh
parent fd277dc036
commit 2e4929b142
2 changed files with 36 additions and 1 deletions

View file

@ -186,7 +186,9 @@ case class AdaptiveSparkPlanExec(
// created in the middle of the execution.
context.session.withActive {
val executionId = getExecutionId
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
// Use inputPlan logicalLink here in case some top level physical nodes may be removed
// during `initialPlan`
var currentLogicalPlan = inputPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[Throwable]()

View file

@ -1959,6 +1959,39 @@ class AdaptiveQueryExecSuite
runAdaptiveAndVerifyResult(query)
}
}
test("SPARK-36032: Use inputPlan instead of currentPhysicalPlan to initialize logical link") {
withTempView("v") {
spark.sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString)), 2)
.toDF("c1", "c2").createOrReplaceTempView("v")
Seq("-1", "10000").foreach { aqeBhj =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> aqeBhj,
SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
val (origin, adaptive) = runAdaptiveAndVerifyResult(
"""
|SELECT * FROM v t1 JOIN (
| SELECT c1 + 1 as c3 FROM v
|)t2 ON t1.c1 = t2.c3
|SORT BY c1
""".stripMargin)
if (aqeBhj.toInt < 0) {
// 1 sort since spark plan has no shuffle for SMJ
assert(findTopLevelSort(origin).size == 1)
// 2 sorts in SMJ
assert(findTopLevelSort(adaptive).size == 2)
} else {
assert(findTopLevelSort(origin).size == 1)
// 1 sort at top node and BHJ has no sort
assert(findTopLevelSort(adaptive).size == 1)
}
}
}
}
}
}
/**