[SPARK-36795][SQL] Explain Formatted has Duplicate Node IDs
### What changes were proposed in this pull request? Fixed explain formatted mode so it doesn't have duplicate node IDs when InMemoryRelation is present in query plan. ### Why are the changes needed? Having duplicated node IDs in the plan makes it confusing. ### Does this PR introduce _any_ user-facing change? Yes, explain formatted string will change. Notice how `ColumnarToRow` and `InMemoryRelation` have node id of 2. Before changes => ``` == Physical Plan == AdaptiveSparkPlan (14) +- == Final Plan == * BroadcastHashJoin Inner BuildLeft (9) :- BroadcastQueryStage (5) : +- BroadcastExchange (4) : +- * Filter (3) : +- * ColumnarToRow (2) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- * Filter (8) +- * ColumnarToRow (7) +- Scan parquet default.t2 (6) +- == Initial Plan == BroadcastHashJoin Inner BuildLeft (13) :- BroadcastExchange (11) : +- Filter (10) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- Filter (12) +- Scan parquet default.t2 (6) (1) InMemoryTableScan Output [1]: [k#x] Arguments: [k#x], [isnotnull(k#x)] (2) InMemoryRelation Arguments: [k#x], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer401788d5,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) ColumnarToRow +- FileScan parquet default.t1[k#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apach..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int> ,None) (3) Scan parquet default.t1 Output [1]: [k#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t1] ReadSchema: struct<k:int> (4) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (5) BroadcastQueryStage Output [1]: [k#x] Arguments: 0 (6) Scan parquet default.t2 Output [1]: [key#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t2] PushedFilters: [IsNotNull(key)] ReadSchema: struct<key:int> (7) ColumnarToRow Input [1]: [key#x] (8) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (9) BroadcastHashJoin [codegen id : 2] Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (10) Filter Input [1]: [k#x] Condition : isnotnull(k#x) (11) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (12) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (13) BroadcastHashJoin Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (14) AdaptiveSparkPlan Output [2]: [k#x, key#x] Arguments: isFinalPlan=true ``` After Changes => ``` == Physical Plan == AdaptiveSparkPlan (17) +- == Final Plan == * BroadcastHashJoin Inner BuildLeft (12) :- BroadcastQueryStage (8) : +- BroadcastExchange (7) : +- * Filter (6) : +- * ColumnarToRow (5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- * Filter (11) +- * ColumnarToRow (10) +- Scan parquet default.t2 (9) +- == Initial Plan == BroadcastHashJoin Inner BuildLeft (16) :- BroadcastExchange (14) : +- Filter (13) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- Filter (15) +- Scan parquet default.t2 (9) (1) InMemoryTableScan Output [1]: [k#x] Arguments: [k#x], [isnotnull(k#x)] (2) InMemoryRelation Arguments: [k#x], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer3ccb12d,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) ColumnarToRow +- FileScan parquet default.t1[k#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apach..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int> ,None) (3) Scan parquet default.t1 Output [1]: [k#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t1] ReadSchema: struct<k:int> (4) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (5) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (6) Filter [codegen id : 1] Input [1]: [k#x] Condition : isnotnull(k#x) (7) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (8) BroadcastQueryStage Output [1]: [k#x] Arguments: 0 (9) Scan parquet default.t2 Output [1]: [key#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t2] PushedFilters: [IsNotNull(key)] ReadSchema: struct<key:int> (10) ColumnarToRow Input [1]: [key#x] (11) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (12) BroadcastHashJoin [codegen id : 2] Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (13) Filter Input [1]: [k#x] Condition : isnotnull(k#x) (14) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (15) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (16) BroadcastHashJoin Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (17) AdaptiveSparkPlan Output [2]: [k#x, key#x] Arguments: isFinalPlan=true ``` ### How was this patch tested? add test Closes #34036 from ChenMichael/SPARK-36795-Duplicate-node-id-with-inMemoryRelation. Authored-by: Michael Chen <mike.chen@workday.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
parent
0076eba8d0
commit
6d7ab7b52b
|
@ -148,7 +148,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
|
||||||
setOpId(p)
|
setOpId(p)
|
||||||
case other: QueryPlan[_] =>
|
case other: QueryPlan[_] =>
|
||||||
setOpId(other)
|
setOpId(other)
|
||||||
other.innerChildren.foldLeft(currentOperationID) {
|
currentOperationID = other.innerChildren.foldLeft(currentOperationID) {
|
||||||
(curId, plan) => generateOperatorIDs(plan, curId)
|
(curId, plan) => generateOperatorIDs(plan, curId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -704,6 +704,25 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
|
||||||
"Bucketed: false (bucket column(s) not read)")
|
"Bucketed: false (bucket column(s) not read)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-36795: Node IDs should not be duplicated when InMemoryRelation present") {
|
||||||
|
withTempView("t1", "t2") {
|
||||||
|
Seq(1).toDF("k").write.saveAsTable("t1")
|
||||||
|
Seq(1).toDF("key").write.saveAsTable("t2")
|
||||||
|
spark.sql("SELECT * FROM t1").persist()
|
||||||
|
val query = "SELECT * FROM (SELECT * FROM t1) join t2 " +
|
||||||
|
"ON k = t2.key"
|
||||||
|
val df = sql(query).toDF()
|
||||||
|
|
||||||
|
val inMemoryRelationRegex = """InMemoryRelation \(([0-9]+)\)""".r
|
||||||
|
val columnarToRowRegex = """ColumnarToRow \(([0-9]+)\)""".r
|
||||||
|
val explainString = getNormalizedExplain(df, FormattedMode)
|
||||||
|
val inMemoryRelationNodeId = inMemoryRelationRegex.findAllIn(explainString).group(1)
|
||||||
|
val columnarToRowNodeId = columnarToRowRegex.findAllIn(explainString).group(1)
|
||||||
|
|
||||||
|
assert(inMemoryRelationNodeId != columnarToRowNodeId)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case class ExplainSingleData(id: Int)
|
case class ExplainSingleData(id: Int)
|
||||||
|
|
Loading…
Reference in a new issue