From b78b776c9ebc97bb2d384c823c313df8b81a0235 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 25 Aug 2020 21:03:44 +0900 Subject: [PATCH] [SPARK-32466][SQL][FOLLOW-UP] Normalize Location info in explain plan ### What changes were proposed in this pull request? 1. Extract `SQLQueryTestSuite.replaceNotIncludedMsg` to `PlanTest`. 2. Reuse `replaceNotIncludedMsg` to normalize the explain plan that generated in `PlanStabilitySuite`. ### Why are the changes needed? This's a follow-up of https://github.com/apache/spark/pull/29270. Eliminates the personal related information (e.g., local directories) in the explain plan. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated test. Closes #29537 from Ngone51/follow-up-plan-stablity. Authored-by: yi.wu Signed-off-by: HyukjinKwon --- .../approved-plans-v1_4/q11.sf100/explain.txt | 14 ++++----- .../approved-plans-v1_4/q11/explain.txt | 16 +++++----- .../apache/spark/sql/PlanStabilitySuite.scala | 31 ++++++++++++------- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q11.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q11.sf100/explain.txt index e9e79ad7e4..d8dbb6d131 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q11.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q11.sf100/explain.txt @@ -91,7 +91,7 @@ TakeOrderedAndProject (87) (1) Scan parquet default.store_sales Output [4]: [ss_sold_date_sk#1, ss_customer_sk#2, ss_ext_discount_amt#3, ss_ext_list_price#4] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/store_sales] +Location [not included in comparison]/{warehouse_dir}/store_sales] PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)] ReadSchema: struct @@ -105,7 +105,7 @@ Condition : (isnotnull(ss_customer_sk#2) AND isnotnull(ss_sold_date_sk#1)) (4) Scan parquet default.date_dim Output [2]: [d_date_sk#5, d_year#6] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/date_dim] +Location [not included in comparison]/{warehouse_dir}/date_dim] PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk)] ReadSchema: struct @@ -140,7 +140,7 @@ Arguments: [ss_customer_sk#2 ASC NULLS FIRST], false, 0 (12) Scan parquet default.customer Output [8]: [c_customer_sk#9, c_customer_id#10, c_first_name#11, c_last_name#12, c_preferred_cust_flag#13, c_birth_country#14, c_login#15, c_email_address#16] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/customer] +Location [not included in comparison]/{warehouse_dir}/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)] ReadSchema: struct @@ -201,7 +201,7 @@ Arguments: [customer_id#22 ASC NULLS FIRST], false, 0 (25) Scan parquet default.store_sales Output [4]: [ss_sold_date_sk#1, ss_customer_sk#2, ss_ext_discount_amt#3, ss_ext_list_price#4] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/store_sales] +Location [not included in comparison]/{warehouse_dir}/store_sales] PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)] ReadSchema: struct @@ -215,7 +215,7 @@ Condition : (isnotnull(ss_customer_sk#2) AND isnotnull(ss_sold_date_sk#1)) (28) Scan parquet default.date_dim Output [2]: [d_date_sk#5, d_year#6] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/date_dim] +Location [not included in comparison]/{warehouse_dir}/date_dim] PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2002), IsNotNull(d_date_sk)] ReadSchema: struct @@ -301,7 +301,7 @@ Input [5]: [customer_id#22, year_total#23, customer_id#31, customer_preferred_cu (47) Scan parquet default.web_sales Output [4]: [ws_sold_date_sk#35, ws_bill_customer_sk#36, ws_ext_discount_amt#37, ws_ext_list_price#38] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/web_sales] +Location [not included in comparison]/{warehouse_dir}/web_sales] PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)] ReadSchema: struct @@ -394,7 +394,7 @@ Input [6]: [customer_id#22, year_total#23, customer_preferred_cust_flag#32, year (68) Scan parquet default.web_sales Output [4]: [ws_sold_date_sk#35, ws_bill_customer_sk#36, ws_ext_discount_amt#37, ws_ext_list_price#38] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/web_sales] +Location [not included in comparison]/{warehouse_dir}/web_sales] PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)] ReadSchema: struct diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q11/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q11/explain.txt index c840401768..246a5d5b0b 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q11/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q11/explain.txt @@ -77,7 +77,7 @@ TakeOrderedAndProject (73) (1) Scan parquet default.customer Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer] +Location [not included in comparison]/{warehouse_dir}/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)] ReadSchema: struct @@ -91,7 +91,7 @@ Condition : (isnotnull(c_customer_sk#1) AND isnotnull(c_customer_id#2)) (4) Scan parquet default.store_sales Output [4]: [ss_sold_date_sk#9, ss_customer_sk#10, ss_ext_discount_amt#11, ss_ext_list_price#12] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/store_sales] +Location [not included in comparison]/{warehouse_dir}/store_sales] PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)] ReadSchema: struct @@ -118,7 +118,7 @@ Input [12]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_ (10) Scan parquet default.date_dim Output [2]: [d_date_sk#14, d_year#15] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/date_dim] +Location [not included in comparison]/{warehouse_dir}/date_dim] PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk)] ReadSchema: struct @@ -167,7 +167,7 @@ Condition : (isnotnull(year_total#22) AND (year_total#22 > 0.00)) (20) Scan parquet default.customer Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer] +Location [not included in comparison]/{warehouse_dir}/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)] ReadSchema: struct @@ -193,7 +193,7 @@ Input [12]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_ (26) Scan parquet default.date_dim Output [2]: [d_date_sk#14, d_year#15] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/date_dim] +Location [not included in comparison]/{warehouse_dir}/date_dim] PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2002), IsNotNull(d_date_sk)] ReadSchema: struct @@ -251,7 +251,7 @@ Input [5]: [customer_id#21, year_total#22, customer_id#28, customer_preferred_cu (38) Scan parquet default.customer Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer] +Location [not included in comparison]/{warehouse_dir}/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)] ReadSchema: struct @@ -265,7 +265,7 @@ Condition : (isnotnull(c_customer_sk#1) AND isnotnull(c_customer_id#2)) (41) Scan parquet default.web_sales Output [4]: [ws_sold_date_sk#32, ws_bill_customer_sk#33, ws_ext_discount_amt#34, ws_ext_list_price#35] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/web_sales] +Location [not included in comparison]/{warehouse_dir}/web_sales] PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)] ReadSchema: struct @@ -343,7 +343,7 @@ Input [6]: [customer_id#21, year_total#22, customer_preferred_cust_flag#29, year (58) Scan parquet default.customer Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8] Batched: true -Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer] +Location [not included in comparison]/{warehouse_dir}/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)] ReadSchema: struct diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index 8368875039..34fd1d2439 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -94,6 +94,8 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite { private val referenceRegex = "#\\d+".r private val normalizeRegex = "#\\d+L?".r + private val clsName = this.getClass.getCanonicalName + def goldenFilePath: String private def getDirForTest(name: String): File = { @@ -102,8 +104,8 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite { private def isApproved(dir: File, actualSimplifiedPlan: String): Boolean = { val file = new File(dir, "simplified.txt") - val approved = FileUtils.readFileToString(file, StandardCharsets.UTF_8) - approved == actualSimplifiedPlan + val expected = FileUtils.readFileToString(file, StandardCharsets.UTF_8) + expected == actualSimplifiedPlan } /** @@ -115,7 +117,7 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite { * @param explain the full explain output; this is saved to help debug later as the simplified * plan is not too useful for debugging */ - private def generateApprovedPlanFile(plan: SparkPlan, name: String, explain: String): Unit = { + private def generateGoldenFile(plan: SparkPlan, name: String, explain: String): Unit = { val dir = getDirForTest(name) val simplified = getSimplifiedPlan(plan) val foundMatch = dir.exists() && isApproved(dir, simplified) @@ -207,7 +209,7 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite { * WholeStageCodegen * Project [c_customer_id] */ - def getSimplifiedPlan(node: SparkPlan, depth: Int): String = { + def simplifyNode(node: SparkPlan, depth: Int): String = { val padding = " " * depth var thisNode = node.nodeName if (node.references.nonEmpty) { @@ -220,19 +222,24 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite { if (id > 0) { thisNode += s" #$id" } - val childrenSimplified = node.children.map(getSimplifiedPlan(_, depth + 1)) - val subqueriesSimplified = node.subqueries.map(getSimplifiedPlan(_, depth + 1)) + val childrenSimplified = node.children.map(simplifyNode(_, depth + 1)) + val subqueriesSimplified = node.subqueries.map(simplifyNode(_, depth + 1)) s"$padding$thisNode\n${subqueriesSimplified.mkString("")}${childrenSimplified.mkString("")}" } - getSimplifiedPlan(plan, 0) + simplifyNode(plan, 0) } - private def normalizeIds(query: String): String = { + private def normalizeIds(plan: String): String = { val map = new mutable.HashMap[String, String]() - normalizeRegex.findAllMatchIn(query).map(_.toString) + normalizeRegex.findAllMatchIn(plan).map(_.toString) .foreach(map.getOrElseUpdate(_, (map.size + 1).toString)) - normalizeRegex.replaceAllIn(query, regexMatch => s"#${map(regexMatch.toString)}") + normalizeRegex.replaceAllIn(plan, regexMatch => s"#${map(regexMatch.toString)}") + } + + private def normalizeLocation(plan: String): String = { + plan.replaceAll(s"Location.*$clsName/", + "Location [not included in comparison]/{warehouse_dir}/") } /** @@ -244,10 +251,10 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite { classLoader = Thread.currentThread().getContextClassLoader) val qe = sql(queryString).queryExecution val plan = qe.executedPlan - val explain = normalizeIds(qe.explainString(FormattedMode)) + val explain = normalizeLocation(normalizeIds(qe.explainString(FormattedMode))) if (regenerateGoldenFiles) { - generateApprovedPlanFile(plan, query + suffix, explain) + generateGoldenFile(plan, query + suffix, explain) } else { checkWithApproved(plan, query + suffix, explain) }