From 6d7ab7b52bb37aa03029a0c0761988733665633f Mon Sep 17 00:00:00 2001 From: Michael Chen Date: Thu, 23 Sep 2021 15:54:33 +0900 Subject: [PATCH] [SPARK-36795][SQL] Explain Formatted has Duplicate Node IDs ### What changes were proposed in this pull request? Fixed explain formatted mode so it doesn't have duplicate node IDs when InMemoryRelation is present in query plan. ### Why are the changes needed? Having duplicated node IDs in the plan makes it confusing. ### Does this PR introduce _any_ user-facing change? Yes, explain formatted string will change. Notice how `ColumnarToRow` and `InMemoryRelation` have node id of 2. Before changes => ``` == Physical Plan == AdaptiveSparkPlan (14) +- == Final Plan == * BroadcastHashJoin Inner BuildLeft (9) :- BroadcastQueryStage (5) : +- BroadcastExchange (4) : +- * Filter (3) : +- * ColumnarToRow (2) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- * Filter (8) +- * ColumnarToRow (7) +- Scan parquet default.t2 (6) +- == Initial Plan == BroadcastHashJoin Inner BuildLeft (13) :- BroadcastExchange (11) : +- Filter (10) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- Filter (12) +- Scan parquet default.t2 (6) (1) InMemoryTableScan Output [1]: [k#x] Arguments: [k#x], [isnotnull(k#x)] (2) InMemoryRelation Arguments: [k#x], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer401788d5,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) ColumnarToRow +- FileScan parquet default.t1[k#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apach..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) (3) Scan parquet default.t1 Output [1]: [k#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t1] ReadSchema: struct (4) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (5) BroadcastQueryStage Output [1]: [k#x] Arguments: 0 (6) Scan parquet default.t2 Output [1]: [key#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t2] PushedFilters: [IsNotNull(key)] ReadSchema: struct (7) ColumnarToRow Input [1]: [key#x] (8) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (9) BroadcastHashJoin [codegen id : 2] Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (10) Filter Input [1]: [k#x] Condition : isnotnull(k#x) (11) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (12) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (13) BroadcastHashJoin Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (14) AdaptiveSparkPlan Output [2]: [k#x, key#x] Arguments: isFinalPlan=true ``` After Changes => ``` == Physical Plan == AdaptiveSparkPlan (17) +- == Final Plan == * BroadcastHashJoin Inner BuildLeft (12) :- BroadcastQueryStage (8) : +- BroadcastExchange (7) : +- * Filter (6) : +- * ColumnarToRow (5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- * Filter (11) +- * ColumnarToRow (10) +- Scan parquet default.t2 (9) +- == Initial Plan == BroadcastHashJoin Inner BuildLeft (16) :- BroadcastExchange (14) : +- Filter (13) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- Filter (15) +- Scan parquet default.t2 (9) (1) InMemoryTableScan Output [1]: [k#x] Arguments: [k#x], [isnotnull(k#x)] (2) InMemoryRelation Arguments: [k#x], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer3ccb12d,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) ColumnarToRow +- FileScan parquet default.t1[k#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apach..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) (3) Scan parquet default.t1 Output [1]: [k#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t1] ReadSchema: struct (4) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (5) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (6) Filter [codegen id : 1] Input [1]: [k#x] Condition : isnotnull(k#x) (7) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (8) BroadcastQueryStage Output [1]: [k#x] Arguments: 0 (9) Scan parquet default.t2 Output [1]: [key#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t2] PushedFilters: [IsNotNull(key)] ReadSchema: struct (10) ColumnarToRow Input [1]: [key#x] (11) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (12) BroadcastHashJoin [codegen id : 2] Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (13) Filter Input [1]: [k#x] Condition : isnotnull(k#x) (14) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (15) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (16) BroadcastHashJoin Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (17) AdaptiveSparkPlan Output [2]: [k#x, key#x] Arguments: isFinalPlan=true ``` ### How was this patch tested? add test Closes #34036 from ChenMichael/SPARK-36795-Duplicate-node-id-with-inMemoryRelation. Authored-by: Michael Chen Signed-off-by: Hyukjin Kwon --- .../spark/sql/execution/ExplainUtils.scala | 2 +- .../org/apache/spark/sql/ExplainSuite.scala | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index ea6e1f933f..1eea0cd777 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -148,7 +148,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { setOpId(p) case other: QueryPlan[_] => setOpId(other) - other.innerChildren.foldLeft(currentOperationID) { + currentOperationID = other.innerChildren.foldLeft(currentOperationID) { (curId, plan) => generateOperatorIDs(plan, curId) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 688ded0c3e..44d0445928 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -704,6 +704,25 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit "Bucketed: false (bucket column(s) not read)") } } + + test("SPARK-36795: Node IDs should not be duplicated when InMemoryRelation present") { + withTempView("t1", "t2") { + Seq(1).toDF("k").write.saveAsTable("t1") + Seq(1).toDF("key").write.saveAsTable("t2") + spark.sql("SELECT * FROM t1").persist() + val query = "SELECT * FROM (SELECT * FROM t1) join t2 " + + "ON k = t2.key" + val df = sql(query).toDF() + + val inMemoryRelationRegex = """InMemoryRelation \(([0-9]+)\)""".r + val columnarToRowRegex = """ColumnarToRow \(([0-9]+)\)""".r + val explainString = getNormalizedExplain(df, FormattedMode) + val inMemoryRelationNodeId = inMemoryRelationRegex.findAllIn(explainString).group(1) + val columnarToRowNodeId = columnarToRowRegex.findAllIn(explainString).group(1) + + assert(inMemoryRelationNodeId != columnarToRowNodeId) + } + } } case class ExplainSingleData(id: Int)