[SPARK-29366][SQL] Subqueries created for DPP are not printed in EXPLAIN FORMATTED
### What changes were proposed in this pull request? The subquery expressions introduced by DPP are not printed in the newer explain command. This PR fixes the code that computes the list of subqueries in the plan. **SQL** df1 and df2 are partitioned on k. ``` SELECT df1.id, df2.k FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id < 2 ``` **Before** ``` |== Physical Plan == * Project (9) +- * BroadcastHashJoin Inner BuildRight (8) :- * ColumnarToRow (2) : +- Scan parquet default.df1 (1) +- BroadcastExchange (7) +- * Project (6) +- * Filter (5) +- * ColumnarToRow (4) +- Scan parquet default.df2 (3) (1) Scan parquet default.df1 Output: [id#19L, k#20L] (2) ColumnarToRow [codegen id : 2] Input: [id#19L, k#20L] (3) Scan parquet default.df2 Output: [id#21L, k#22L] (4) ColumnarToRow [codegen id : 1] Input: [id#21L, k#22L] (5) Filter [codegen id : 1] Input : [id#21L, k#22L] Condition : (isnotnull(id#21L) AND (id#21L < 2)) (6) Project [codegen id : 1] Output : [k#22L] Input : [id#21L, k#22L] (7) BroadcastExchange Input: [k#22L] (8) BroadcastHashJoin [codegen id : 2] Left keys: List(k#20L) Right keys: List(k#22L) Join condition: None (9) Project [codegen id : 2] Output : [id#19L, k#22L] Input : [id#19L, k#20L, k#22L] ``` **After** ``` |== Physical Plan == * Project (9) +- * BroadcastHashJoin Inner BuildRight (8) :- * ColumnarToRow (2) : +- Scan parquet default.df1 (1) +- BroadcastExchange (7) +- * Project (6) +- * Filter (5) +- * ColumnarToRow (4) +- Scan parquet default.df2 (3) (1) Scan parquet default.df1 Output: [id#19L, k#20L] (2) ColumnarToRow [codegen id : 2] Input: [id#19L, k#20L] (3) Scan parquet default.df2 Output: [id#21L, k#22L] (4) ColumnarToRow [codegen id : 1] Input: [id#21L, k#22L] (5) Filter [codegen id : 1] Input : [id#21L, k#22L] Condition : (isnotnull(id#21L) AND (id#21L < 2)) (6) Project [codegen id : 1] Output : [k#22L] Input : [id#21L, k#22L] (7) BroadcastExchange Input: [k#22L] (8) BroadcastHashJoin [codegen id : 2] Left keys: List(k#20L) Right keys: List(k#22L) Join condition: None (9) Project [codegen id : 2] Output : [id#19L, k#22L] Input : [id#19L, k#20L, k#22L] ===== Subqueries ===== Subquery:1 Hosting operator id = 1 Hosting Expression = k#20L IN subquery25 * HashAggregate (16) +- Exchange (15) +- * HashAggregate (14) +- * Project (13) +- * Filter (12) +- * ColumnarToRow (11) +- Scan parquet default.df2 (10) (10) Scan parquet default.df2 Output: [id#21L, k#22L] (11) ColumnarToRow [codegen id : 1] Input: [id#21L, k#22L] (12) Filter [codegen id : 1] Input : [id#21L, k#22L] Condition : (isnotnull(id#21L) AND (id#21L < 2)) (13) Project [codegen id : 1] Output : [k#22L] Input : [id#21L, k#22L] (14) HashAggregate [codegen id : 1] Input: [k#22L] (15) Exchange Input: [k#22L] (16) HashAggregate [codegen id : 2] Input: [k#22L] ``` ### Why are the changes needed? Without the fix, the subqueries are not printed in the explain plan. ### Does this PR introduce any user-facing change? Yes. the explain output will be different. ### How was this patch tested? Added a test case in ExplainSuite. Closes #26039 from dilipbiswal/explain_subquery_issue. Authored-by: Dilip Biswal <dkbiswal@gmail.com> Signed-off-by: Xiao Li <gatorsmile@gmail.com>
This commit is contained in:
parent
948a6e80fe
commit
ef1e8495ba
|
@ -193,14 +193,14 @@ object ExplainUtils {
|
|||
subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = {
|
||||
plan.foreach {
|
||||
case p: SparkPlan =>
|
||||
p.expressions.flatMap(_.collect {
|
||||
p.expressions.foreach (_.collect {
|
||||
case e: PlanExpression[_] =>
|
||||
e.plan match {
|
||||
case s: BaseSubqueryExec =>
|
||||
subqueries += ((p, e, s))
|
||||
getSubqueries(s, subqueries)
|
||||
case _ =>
|
||||
}
|
||||
case other =>
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.test.SharedSparkSession
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
@ -36,6 +37,19 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
f(normalizedOutput)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the explain by running the sql. The explain mode should be part of the
|
||||
* sql text itself.
|
||||
*/
|
||||
private def withNormalizedExplain(queryText: String)(f: String => Unit) = {
|
||||
val output = new java.io.ByteArrayOutputStream()
|
||||
Console.withOut(output) {
|
||||
sql(queryText).show(false)
|
||||
}
|
||||
val normalizedOutput = output.toString.replaceAll("#\\d+", "#x")
|
||||
f(normalizedOutput)
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the plan and makes sure the plans contains all of the keywords.
|
||||
*/
|
||||
|
@ -200,6 +214,41 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("explain formatted - check presence of subquery in case of DPP") {
|
||||
withTable("df1", "df2") {
|
||||
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
|
||||
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
|
||||
withTable("df1", "df2") {
|
||||
spark.range(1000).select(col("id"), col("id").as("k"))
|
||||
.write
|
||||
.partitionBy("k")
|
||||
.format("parquet")
|
||||
.mode("overwrite")
|
||||
.saveAsTable("df1")
|
||||
|
||||
spark.range(100)
|
||||
.select(col("id"), col("id").as("k"))
|
||||
.write
|
||||
.partitionBy("k")
|
||||
.format("parquet")
|
||||
.mode("overwrite")
|
||||
.saveAsTable("df2")
|
||||
|
||||
val sqlText =
|
||||
"""
|
||||
|EXPLAIN FORMATTED SELECT df1.id, df2.k
|
||||
|FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id < 2
|
||||
|""".stripMargin
|
||||
|
||||
val expected_pattern = "Subquery:1 Hosting operator id = 1 Hosting Expression = k#x"
|
||||
withNormalizedExplain(sqlText) { normalizedOutput =>
|
||||
assert(expected_pattern.r.findAllMatchIn(normalizedOutput).length == 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class ExplainSingleData(id: Int)
|
||||
|
|
Loading…
Reference in a new issue