[SPARK-32466][SQL][FOLLOW-UP] Normalize Location info in explain plan

### What changes were proposed in this pull request?

1. Extract `SQLQueryTestSuite.replaceNotIncludedMsg` to `PlanTest`.

2. Reuse `replaceNotIncludedMsg` to normalize the explain plan that generated in `PlanStabilitySuite`.

### Why are the changes needed?

This's a follow-up of https://github.com/apache/spark/pull/29270.
Eliminates the personal related information (e.g., local directories) in the explain plan.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated test.

Closes #29537 from Ngone51/follow-up-plan-stablity.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
yi.wu 2020-08-25 21:03:44 +09:00 committed by HyukjinKwon
parent c26a97637f
commit b78b776c9e
3 changed files with 34 additions and 27 deletions

View file

@ -91,7 +91,7 @@ TakeOrderedAndProject (87)
(1) Scan parquet default.store_sales
Output [4]: [ss_sold_date_sk#1, ss_customer_sk#2, ss_ext_discount_amt#3, ss_ext_list_price#4]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/store_sales]
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int,ss_ext_discount_amt:decimal(7,2),ss_ext_list_price:decimal(7,2)>
@ -105,7 +105,7 @@ Condition : (isnotnull(ss_customer_sk#2) AND isnotnull(ss_sold_date_sk#1))
(4) Scan parquet default.date_dim
Output [2]: [d_date_sk#5, d_year#6]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/date_dim]
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>
@ -140,7 +140,7 @@ Arguments: [ss_customer_sk#2 ASC NULLS FIRST], false, 0
(12) Scan parquet default.customer
Output [8]: [c_customer_sk#9, c_customer_id#10, c_first_name#11, c_last_name#12, c_preferred_cust_flag#13, c_birth_country#14, c_login#15, c_email_address#16]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>
@ -201,7 +201,7 @@ Arguments: [customer_id#22 ASC NULLS FIRST], false, 0
(25) Scan parquet default.store_sales
Output [4]: [ss_sold_date_sk#1, ss_customer_sk#2, ss_ext_discount_amt#3, ss_ext_list_price#4]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/store_sales]
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int,ss_ext_discount_amt:decimal(7,2),ss_ext_list_price:decimal(7,2)>
@ -215,7 +215,7 @@ Condition : (isnotnull(ss_customer_sk#2) AND isnotnull(ss_sold_date_sk#1))
(28) Scan parquet default.date_dim
Output [2]: [d_date_sk#5, d_year#6]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/date_dim]
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2002), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>
@ -301,7 +301,7 @@ Input [5]: [customer_id#22, year_total#23, customer_id#31, customer_preferred_cu
(47) Scan parquet default.web_sales
Output [4]: [ws_sold_date_sk#35, ws_bill_customer_sk#36, ws_ext_discount_amt#37, ws_ext_list_price#38]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/web_sales]
Location [not included in comparison]/{warehouse_dir}/web_sales]
PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)]
ReadSchema: struct<ws_sold_date_sk:int,ws_bill_customer_sk:int,ws_ext_discount_amt:decimal(7,2),ws_ext_list_price:decimal(7,2)>
@ -394,7 +394,7 @@ Input [6]: [customer_id#22, year_total#23, customer_preferred_cust_flag#32, year
(68) Scan parquet default.web_sales
Output [4]: [ws_sold_date_sk#35, ws_bill_customer_sk#36, ws_ext_discount_amt#37, ws_ext_list_price#38]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/web_sales]
Location [not included in comparison]/{warehouse_dir}/web_sales]
PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)]
ReadSchema: struct<ws_sold_date_sk:int,ws_bill_customer_sk:int,ws_ext_discount_amt:decimal(7,2),ws_ext_list_price:decimal(7,2)>

View file

@ -77,7 +77,7 @@ TakeOrderedAndProject (73)
(1) Scan parquet default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>
@ -91,7 +91,7 @@ Condition : (isnotnull(c_customer_sk#1) AND isnotnull(c_customer_id#2))
(4) Scan parquet default.store_sales
Output [4]: [ss_sold_date_sk#9, ss_customer_sk#10, ss_ext_discount_amt#11, ss_ext_list_price#12]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/store_sales]
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_customer_sk), IsNotNull(ss_sold_date_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int,ss_ext_discount_amt:decimal(7,2),ss_ext_list_price:decimal(7,2)>
@ -118,7 +118,7 @@ Input [12]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_
(10) Scan parquet default.date_dim
Output [2]: [d_date_sk#14, d_year#15]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/date_dim]
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>
@ -167,7 +167,7 @@ Condition : (isnotnull(year_total#22) AND (year_total#22 > 0.00))
(20) Scan parquet default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>
@ -193,7 +193,7 @@ Input [12]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_
(26) Scan parquet default.date_dim
Output [2]: [d_date_sk#14, d_year#15]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/date_dim]
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2002), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>
@ -251,7 +251,7 @@ Input [5]: [customer_id#21, year_total#22, customer_id#28, customer_preferred_cu
(38) Scan parquet default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>
@ -265,7 +265,7 @@ Condition : (isnotnull(c_customer_sk#1) AND isnotnull(c_customer_id#2))
(41) Scan parquet default.web_sales
Output [4]: [ws_sold_date_sk#32, ws_bill_customer_sk#33, ws_ext_discount_amt#34, ws_ext_list_price#35]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/web_sales]
Location [not included in comparison]/{warehouse_dir}/web_sales]
PushedFilters: [IsNotNull(ws_bill_customer_sk), IsNotNull(ws_sold_date_sk)]
ReadSchema: struct<ws_sold_date_sk:int,ws_bill_customer_sk:int,ws_ext_discount_amt:decimal(7,2),ws_ext_list_price:decimal(7,2)>
@ -343,7 +343,7 @@ Input [6]: [customer_id#21, year_total#22, customer_preferred_cust_flag#29, year
(58) Scan parquet default.customer
Output [8]: [c_customer_sk#1, c_customer_id#2, c_first_name#3, c_last_name#4, c_preferred_cust_flag#5, c_birth_country#6, c_login#7, c_email_address#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
Location [not included in comparison]/{warehouse_dir}/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_customer_id)]
ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string,c_birth_country:string,c_login:string,c_email_address:string>

View file

@ -94,6 +94,8 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
private val referenceRegex = "#\\d+".r
private val normalizeRegex = "#\\d+L?".r
private val clsName = this.getClass.getCanonicalName
def goldenFilePath: String
private def getDirForTest(name: String): File = {
@ -102,8 +104,8 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
private def isApproved(dir: File, actualSimplifiedPlan: String): Boolean = {
val file = new File(dir, "simplified.txt")
val approved = FileUtils.readFileToString(file, StandardCharsets.UTF_8)
approved == actualSimplifiedPlan
val expected = FileUtils.readFileToString(file, StandardCharsets.UTF_8)
expected == actualSimplifiedPlan
}
/**
@ -115,7 +117,7 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
* @param explain the full explain output; this is saved to help debug later as the simplified
* plan is not too useful for debugging
*/
private def generateApprovedPlanFile(plan: SparkPlan, name: String, explain: String): Unit = {
private def generateGoldenFile(plan: SparkPlan, name: String, explain: String): Unit = {
val dir = getDirForTest(name)
val simplified = getSimplifiedPlan(plan)
val foundMatch = dir.exists() && isApproved(dir, simplified)
@ -207,7 +209,7 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
* WholeStageCodegen
* Project [c_customer_id]
*/
def getSimplifiedPlan(node: SparkPlan, depth: Int): String = {
def simplifyNode(node: SparkPlan, depth: Int): String = {
val padding = " " * depth
var thisNode = node.nodeName
if (node.references.nonEmpty) {
@ -220,19 +222,24 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
if (id > 0) {
thisNode += s" #$id"
}
val childrenSimplified = node.children.map(getSimplifiedPlan(_, depth + 1))
val subqueriesSimplified = node.subqueries.map(getSimplifiedPlan(_, depth + 1))
val childrenSimplified = node.children.map(simplifyNode(_, depth + 1))
val subqueriesSimplified = node.subqueries.map(simplifyNode(_, depth + 1))
s"$padding$thisNode\n${subqueriesSimplified.mkString("")}${childrenSimplified.mkString("")}"
}
getSimplifiedPlan(plan, 0)
simplifyNode(plan, 0)
}
private def normalizeIds(query: String): String = {
private def normalizeIds(plan: String): String = {
val map = new mutable.HashMap[String, String]()
normalizeRegex.findAllMatchIn(query).map(_.toString)
normalizeRegex.findAllMatchIn(plan).map(_.toString)
.foreach(map.getOrElseUpdate(_, (map.size + 1).toString))
normalizeRegex.replaceAllIn(query, regexMatch => s"#${map(regexMatch.toString)}")
normalizeRegex.replaceAllIn(plan, regexMatch => s"#${map(regexMatch.toString)}")
}
private def normalizeLocation(plan: String): String = {
plan.replaceAll(s"Location.*$clsName/",
"Location [not included in comparison]/{warehouse_dir}/")
}
/**
@ -244,10 +251,10 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
classLoader = Thread.currentThread().getContextClassLoader)
val qe = sql(queryString).queryExecution
val plan = qe.executedPlan
val explain = normalizeIds(qe.explainString(FormattedMode))
val explain = normalizeLocation(normalizeIds(qe.explainString(FormattedMode)))
if (regenerateGoldenFiles) {
generateApprovedPlanFile(plan, query + suffix, explain)
generateGoldenFile(plan, query + suffix, explain)
} else {
checkWithApproved(plan, query + suffix, explain)
}