From 89894a4b1dcebf411bd5bd046165faa74fffdee5 Mon Sep 17 00:00:00 2001 From: Michael Chen Date: Thu, 23 Sep 2021 15:54:33 +0900 Subject: [PATCH] [SPARK-36795][SQL] Explain Formatted has Duplicate Node IDs Fixed explain formatted mode so it doesn't have duplicate node IDs when InMemoryRelation is present in query plan. Having duplicated node IDs in the plan makes it confusing. Yes, explain formatted string will change. Notice how `ColumnarToRow` and `InMemoryRelation` have node id of 2. Before changes => ``` == Physical Plan == AdaptiveSparkPlan (14) +- == Final Plan == * BroadcastHashJoin Inner BuildLeft (9) :- BroadcastQueryStage (5) : +- BroadcastExchange (4) : +- * Filter (3) : +- * ColumnarToRow (2) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- * Filter (8) +- * ColumnarToRow (7) +- Scan parquet default.t2 (6) +- == Initial Plan == BroadcastHashJoin Inner BuildLeft (13) :- BroadcastExchange (11) : +- Filter (10) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- Filter (12) +- Scan parquet default.t2 (6) (1) InMemoryTableScan Output [1]: [k#x] Arguments: [k#x], [isnotnull(k#x)] (2) InMemoryRelation Arguments: [k#x], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer401788d5,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) ColumnarToRow +- FileScan parquet default.t1[k#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apach..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) (3) Scan parquet default.t1 Output [1]: [k#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t1] ReadSchema: struct (4) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (5) BroadcastQueryStage Output [1]: [k#x] Arguments: 0 (6) Scan parquet default.t2 Output [1]: [key#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t2] PushedFilters: [IsNotNull(key)] ReadSchema: struct (7) ColumnarToRow Input [1]: [key#x] (8) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (9) BroadcastHashJoin [codegen id : 2] Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (10) Filter Input [1]: [k#x] Condition : isnotnull(k#x) (11) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (12) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (13) BroadcastHashJoin Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (14) AdaptiveSparkPlan Output [2]: [k#x, key#x] Arguments: isFinalPlan=true ``` After Changes => ``` == Physical Plan == AdaptiveSparkPlan (17) +- == Final Plan == * BroadcastHashJoin Inner BuildLeft (12) :- BroadcastQueryStage (8) : +- BroadcastExchange (7) : +- * Filter (6) : +- * ColumnarToRow (5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- * Filter (11) +- * ColumnarToRow (10) +- Scan parquet default.t2 (9) +- == Initial Plan == BroadcastHashJoin Inner BuildLeft (16) :- BroadcastExchange (14) : +- Filter (13) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- Filter (15) +- Scan parquet default.t2 (9) (1) InMemoryTableScan Output [1]: [k#x] Arguments: [k#x], [isnotnull(k#x)] (2) InMemoryRelation Arguments: [k#x], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer3ccb12d,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) ColumnarToRow +- FileScan parquet default.t1[k#x] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apach..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) (3) Scan parquet default.t1 Output [1]: [k#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t1] ReadSchema: struct (4) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (5) ColumnarToRow [codegen id : 1] Input [1]: [k#x] (6) Filter [codegen id : 1] Input [1]: [k#x] Condition : isnotnull(k#x) (7) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (8) BroadcastQueryStage Output [1]: [k#x] Arguments: 0 (9) Scan parquet default.t2 Output [1]: [key#x] Batched: true Location: InMemoryFileIndex [file:/Users/mike.chen/code/apacheSpark/spark/spark-warehouse/org.apache.spark.sql.ExplainSuiteAE/t2] PushedFilters: [IsNotNull(key)] ReadSchema: struct (10) ColumnarToRow Input [1]: [key#x] (11) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (12) BroadcastHashJoin [codegen id : 2] Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (13) Filter Input [1]: [k#x] Condition : isnotnull(k#x) (14) BroadcastExchange Input [1]: [k#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] (15) Filter Input [1]: [key#x] Condition : isnotnull(key#x) (16) BroadcastHashJoin Left keys [1]: [k#x] Right keys [1]: [key#x] Join condition: None (17) AdaptiveSparkPlan Output [2]: [k#x, key#x] Arguments: isFinalPlan=true ``` add test Closes #34036 from ChenMichael/SPARK-36795-Duplicate-node-id-with-inMemoryRelation. Authored-by: Michael Chen Signed-off-by: Hyukjin Kwon (cherry picked from commit 6d7ab7b52bb37aa03029a0c0761988733665633f) Signed-off-by: Hyukjin Kwon --- .../spark/sql/execution/ExplainUtils.scala | 2 +- .../org/apache/spark/sql/ExplainSuite.scala | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index ea6e1f933f..1eea0cd777 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -148,7 +148,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { setOpId(p) case other: QueryPlan[_] => setOpId(other) - other.innerChildren.foldLeft(currentOperationID) { + currentOperationID = other.innerChildren.foldLeft(currentOperationID) { (curId, plan) => generateOperatorIDs(plan, curId) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 52daa3d21b..aef39a24e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -677,6 +677,25 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit } } } + + test("SPARK-36795: Node IDs should not be duplicated when InMemoryRelation present") { + withTempView("t1", "t2") { + Seq(1).toDF("k").write.saveAsTable("t1") + Seq(1).toDF("key").write.saveAsTable("t2") + spark.sql("SELECT * FROM t1").persist() + val query = "SELECT * FROM (SELECT * FROM t1) join t2 " + + "ON k = t2.key" + val df = sql(query).toDF() + + val inMemoryRelationRegex = """InMemoryRelation \(([0-9]+)\)""".r + val columnarToRowRegex = """ColumnarToRow \(([0-9]+)\)""".r + val explainString = getNormalizedExplain(df, FormattedMode) + val inMemoryRelationNodeId = inMemoryRelationRegex.findAllIn(explainString).group(1) + val columnarToRowNodeId = columnarToRowRegex.findAllIn(explainString).group(1) + + assert(inMemoryRelationNodeId != columnarToRowNodeId) + } + } } case class ExplainSingleData(id: Int)