[SPARK-30200][SQL][FOLLOW-UP] Expose only explain(mode: String) in Scala side, and clean up related codes
### What changes were proposed in this pull request? This PR mainly targets: 1. Expose only explain(mode: String) in Scala side 2. Clean up related codes - Hide `ExplainMode` under private `execution` package. No particular reason but just because `ExplainUtils` exists there - Use `case object` + `trait` pattern in `ExplainMode` to look after `ParseMode`. - Move `Dataset.toExplainString` to `QueryExecution.explainString` to look after `QueryExecution.simpleString`, and deduplicate the codes at `ExplainCommand`. - Use `ExplainMode` in `ExplainCommand` too. - Add `explainString` to `PythonSQLUtils` to avoid unexpected test failure of PySpark during refactoring Scala codes side. ### Why are the changes needed? To minimised exposed APIs, deduplicate, and clean up. ### Does this PR introduce any user-facing change? `Dataset.explain(mode: ExplainMode)` will be removed (which only exists in master). ### How was this patch tested? Manually tested and existing tests should cover. Closes #26898 from HyukjinKwon/SPARK-30200-followup. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
26b658f6fb
commit
0a2afcec7d
|
@ -306,21 +306,21 @@ class DataFrame(object):
|
|||
|
||||
if not is_no_argument and not (is_extended_case or is_mode_case):
|
||||
if extended is not None:
|
||||
errMsg = "extended (optional) should be provided as bool" \
|
||||
err_msg = "extended (optional) should be provided as bool" \
|
||||
", got {0}".format(type(extended))
|
||||
else: # For mode case
|
||||
errMsg = "mode (optional) should be provided as str, got {0}".format(type(mode))
|
||||
raise TypeError(errMsg)
|
||||
err_msg = "mode (optional) should be provided as str, got {0}".format(type(mode))
|
||||
raise TypeError(err_msg)
|
||||
|
||||
# Sets an explain mode depending on a given argument
|
||||
if is_no_argument:
|
||||
explainMode = "simple"
|
||||
explain_mode = "simple"
|
||||
elif is_extended_case:
|
||||
explainMode = "extended" if extended else "simple"
|
||||
explain_mode = "extended" if extended else "simple"
|
||||
elif is_mode_case:
|
||||
explainMode = mode
|
||||
explain_mode = mode
|
||||
|
||||
print(self._jdf.toExplainString(explainMode))
|
||||
print(self._sc._jvm.PythonSQLUtils.explainString(self._jdf.queryExecution(), explain_mode))
|
||||
|
||||
@since(2.4)
|
||||
def exceptAll(self, other):
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.sql;
|
||||
|
||||
import org.apache.spark.annotation.Unstable;
|
||||
|
||||
/**
|
||||
* ExplainMode is used to specify the expected output format of plans (logical and physical)
|
||||
* for debugging purpose.
|
||||
*
|
||||
* @since 3.0.0
|
||||
*/
|
||||
@Unstable
|
||||
public enum ExplainMode {
|
||||
/**
|
||||
* Simple mode means that when printing explain for a DataFrame, only a physical plan is
|
||||
* expected to be printed to the console.
|
||||
*
|
||||
* @since 3.0.0
|
||||
*/
|
||||
Simple,
|
||||
/**
|
||||
* Extended mode means that when printing explain for a DataFrame, both logical and physical
|
||||
* plans are expected to be printed to the console.
|
||||
*
|
||||
* @since 3.0.0
|
||||
*/
|
||||
Extended,
|
||||
/**
|
||||
* Codegen mode means that when printing explain for a DataFrame, if generated codes are
|
||||
* available, a physical plan and the generated codes are expected to be printed to the console.
|
||||
*
|
||||
* @since 3.0.0
|
||||
*/
|
||||
Codegen,
|
||||
/**
|
||||
* Cost mode means that when printing explain for a DataFrame, if plan node statistics are
|
||||
* available, a logical plan and the statistics are expected to be printed to the console.
|
||||
*
|
||||
* @since 3.0.0
|
||||
*/
|
||||
Cost,
|
||||
/**
|
||||
* Formatted mode means that when printing explain for a DataFrame, explain output is
|
||||
* expected to be split into two sections: a physical plan outline and node details.
|
||||
*
|
||||
* @since 3.0.0
|
||||
*/
|
||||
Formatted
|
||||
}
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.spark.sql
|
||||
|
||||
import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream}
|
||||
import java.util.Locale
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
@ -522,53 +521,20 @@ class Dataset[T] private[sql](
|
|||
def printSchema(level: Int): Unit = println(schema.treeString(level))
|
||||
// scalastyle:on println
|
||||
|
||||
private def toExplainString(mode: ExplainMode): String = {
|
||||
// Because temporary views are resolved during analysis when we create a Dataset, and
|
||||
// `ExplainCommand` analyzes input query plan and resolves temporary views again. Using
|
||||
// `ExplainCommand` here will probably output different query plans, compared to the results
|
||||
// of evaluation of the Dataset. So just output QueryExecution's query plans here.
|
||||
val qe = ExplainCommandUtil.explainedQueryExecution(sparkSession, logicalPlan, queryExecution)
|
||||
|
||||
mode match {
|
||||
case ExplainMode.Simple =>
|
||||
qe.simpleString
|
||||
case ExplainMode.Extended =>
|
||||
qe.toString
|
||||
case ExplainMode.Codegen =>
|
||||
try {
|
||||
org.apache.spark.sql.execution.debug.codegenString(queryExecution.executedPlan)
|
||||
} catch {
|
||||
case e: AnalysisException => e.toString
|
||||
}
|
||||
case ExplainMode.Cost =>
|
||||
qe.stringWithStats
|
||||
case ExplainMode.Formatted =>
|
||||
qe.simpleString(formatted = true)
|
||||
}
|
||||
}
|
||||
|
||||
// This method intends to be called from PySpark DataFrame
|
||||
private[sql] def toExplainString(mode: String): String = {
|
||||
mode.toLowerCase(Locale.ROOT) match {
|
||||
case "simple" => toExplainString(ExplainMode.Simple)
|
||||
case "extended" => toExplainString(ExplainMode.Extended)
|
||||
case "codegen" => toExplainString(ExplainMode.Codegen)
|
||||
case "cost" => toExplainString(ExplainMode.Cost)
|
||||
case "formatted" => toExplainString(ExplainMode.Formatted)
|
||||
case _ => throw new IllegalArgumentException(s"Unknown explain mode: $mode. Accepted " +
|
||||
"explain modes are 'simple', 'extended', 'codegen', 'cost', 'formatted'.")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints the plans (logical and physical) with a format specified by a given explain mode.
|
||||
*
|
||||
* @group basic
|
||||
* @since 3.0.0
|
||||
*/
|
||||
def explain(mode: ExplainMode): Unit = {
|
||||
def explain(mode: String): Unit = {
|
||||
// Because temporary views are resolved during analysis when we create a Dataset, and
|
||||
// `ExplainCommand` analyzes input query plan and resolves temporary views again. Using
|
||||
// `ExplainCommand` here will probably output different query plans, compared to the results
|
||||
// of evaluation of the Dataset. So just output QueryExecution's query plans here.
|
||||
|
||||
// scalastyle:off println
|
||||
println(toExplainString(mode))
|
||||
println(queryExecution.explainString(ExplainMode.fromString(mode)))
|
||||
// scalastyle:on println
|
||||
}
|
||||
|
||||
|
@ -579,9 +545,9 @@ class Dataset[T] private[sql](
|
|||
* @since 1.6.0
|
||||
*/
|
||||
def explain(extended: Boolean): Unit = if (extended) {
|
||||
explain(ExplainMode.Extended)
|
||||
explain(ExtendedMode.name)
|
||||
} else {
|
||||
explain(ExplainMode.Simple)
|
||||
explain(SimpleMode.name)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -590,7 +556,7 @@ class Dataset[T] private[sql](
|
|||
* @group basic
|
||||
* @since 1.6.0
|
||||
*/
|
||||
def explain(): Unit = explain(ExplainMode.Simple)
|
||||
def explain(): Unit = explain(SimpleMode.name)
|
||||
|
||||
/**
|
||||
* Returns all column names and their data types as an array.
|
||||
|
|
|
@ -23,10 +23,11 @@ import java.nio.channels.Channels
|
|||
import org.apache.spark.api.java.JavaRDD
|
||||
import org.apache.spark.api.python.PythonRDDServer
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{DataFrame, SQLContext}
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext}
|
||||
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
|
||||
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
|
||||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
|
||||
import org.apache.spark.sql.execution.{ExplainMode, QueryExecution}
|
||||
import org.apache.spark.sql.execution.arrow.ArrowConverters
|
||||
import org.apache.spark.sql.types.DataType
|
||||
|
||||
|
@ -56,6 +57,10 @@ private[sql] object PythonSQLUtils {
|
|||
sqlContext: SQLContext): DataFrame = {
|
||||
ArrowConverters.toDataFrame(arrowBatchRDD, schemaString, sqlContext)
|
||||
}
|
||||
|
||||
def explainString(queryExecution: QueryExecution, mode: String): String = {
|
||||
queryExecution.explainString(ExplainMode.fromString(mode))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
sealed trait ExplainMode {
|
||||
/**
|
||||
* String name of the explain mode.
|
||||
*/
|
||||
def name: String
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple mode means that when printing explain for a DataFrame, only a physical plan is
|
||||
* expected to be printed to the console.
|
||||
*/
|
||||
case object SimpleMode extends ExplainMode { val name = "simple" }
|
||||
|
||||
/**
|
||||
* Extended mode means that when printing explain for a DataFrame, both logical and physical
|
||||
* plans are expected to be printed to the console.
|
||||
*/
|
||||
case object ExtendedMode extends ExplainMode { val name = "extended" }
|
||||
|
||||
/**
|
||||
* Codegen mode means that when printing explain for a DataFrame, if generated codes are
|
||||
* available, a physical plan and the generated codes are expected to be printed to the console.
|
||||
*/
|
||||
case object CodegenMode extends ExplainMode { val name = "codegen" }
|
||||
|
||||
/**
|
||||
* Cost mode means that when printing explain for a DataFrame, if plan node statistics are
|
||||
* available, a logical plan and the statistics are expected to be printed to the console.
|
||||
*/
|
||||
case object CostMode extends ExplainMode { val name = "cost" }
|
||||
|
||||
/**
|
||||
* Formatted mode means that when printing explain for a DataFrame, explain output is
|
||||
* expected to be split into two sections: a physical plan outline and node details.
|
||||
*/
|
||||
case object FormattedMode extends ExplainMode { val name = "formatted" }
|
||||
|
||||
object ExplainMode {
|
||||
/**
|
||||
* Returns the explain mode from the given string.
|
||||
*/
|
||||
def fromString(mode: String): ExplainMode = mode.toLowerCase(Locale.ROOT) match {
|
||||
case SimpleMode.name => SimpleMode
|
||||
case ExtendedMode.name => ExtendedMode
|
||||
case CodegenMode.name => CodegenMode
|
||||
case CostMode.name => CostMode
|
||||
case FormattedMode.name => FormattedMode
|
||||
case _ => throw new IllegalArgumentException(s"Unknown explain mode: $mode. Accepted " +
|
||||
"explain modes are 'simple', 'extended', 'codegen', 'cost', 'formatted'.")
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.sql.execution
|
||||
|
||||
import java.io.{BufferedWriter, OutputStreamWriter}
|
||||
import java.util.UUID
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
|
@ -34,7 +35,9 @@ import org.apache.spark.sql.catalyst.util.truncatedString
|
|||
import org.apache.spark.sql.dynamicpruning.PlanDynamicPruningFilters
|
||||
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
|
||||
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
|
||||
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.streaming.OutputMode
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
|
@ -132,6 +135,35 @@ class QueryExecution(
|
|||
concat.toString
|
||||
}
|
||||
|
||||
def explainString(mode: ExplainMode): String = {
|
||||
val queryExecution = if (logical.isStreaming) {
|
||||
// This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
|
||||
// output mode does not matter since there is no `Sink`.
|
||||
new IncrementalExecution(
|
||||
sparkSession, logical, OutputMode.Append(), "<unknown>",
|
||||
UUID.randomUUID, UUID.randomUUID, 0, OffsetSeqMetadata(0, 0))
|
||||
} else {
|
||||
this
|
||||
}
|
||||
|
||||
mode match {
|
||||
case SimpleMode =>
|
||||
queryExecution.simpleString
|
||||
case ExtendedMode =>
|
||||
queryExecution.toString
|
||||
case CodegenMode =>
|
||||
try {
|
||||
org.apache.spark.sql.execution.debug.codegenString(queryExecution.executedPlan)
|
||||
} catch {
|
||||
case e: AnalysisException => e.toString
|
||||
}
|
||||
case CostMode =>
|
||||
queryExecution.stringWithStats
|
||||
case FormattedMode =>
|
||||
queryExecution.simpleString(formatted = true)
|
||||
}
|
||||
}
|
||||
|
||||
private def writePlans(append: String => Unit, maxFields: Int): Unit = {
|
||||
val (verbose, addSuffix) = (true, false)
|
||||
append("== Parsed Logical Plan ==\n")
|
||||
|
|
|
@ -136,10 +136,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
|
|||
} else {
|
||||
ExplainCommand(
|
||||
logicalPlan = statement,
|
||||
extended = ctx.EXTENDED != null,
|
||||
codegen = ctx.CODEGEN != null,
|
||||
cost = ctx.COST != null,
|
||||
formatted = ctx.FORMATTED != null)
|
||||
mode = {
|
||||
if (ctx.EXTENDED != null) ExtendedMode
|
||||
else if (ctx.CODEGEN != null) CodegenMode
|
||||
else if (ctx.COST != null) CostMode
|
||||
else if (ctx.FORMATTED != null) FormattedMode
|
||||
else SimpleMode
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
|
|||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
|
||||
import org.apache.spark.sql.catalyst.plans.QueryPlan
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
|
||||
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan, UnaryExecNode}
|
||||
import org.apache.spark.sql.execution.{ExplainMode, LeafExecNode, QueryExecution, SparkPlan, UnaryExecNode}
|
||||
import org.apache.spark.sql.execution.debug._
|
||||
import org.apache.spark.sql.execution.metric.SQLMetric
|
||||
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
|
||||
|
@ -135,18 +135,11 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
|
|||
* }}}
|
||||
*
|
||||
* @param logicalPlan plan to explain
|
||||
* @param extended whether to do extended explain or not
|
||||
* @param codegen whether to output generated code from whole-stage codegen or not
|
||||
* @param cost whether to show cost information for operators.
|
||||
* @param formatted whether to split explain output into two sections: a physical plan outline
|
||||
* and node details.
|
||||
* @param mode explain mode
|
||||
*/
|
||||
case class ExplainCommand(
|
||||
logicalPlan: LogicalPlan,
|
||||
extended: Boolean = false,
|
||||
codegen: Boolean = false,
|
||||
cost: Boolean = false,
|
||||
formatted: Boolean = false)
|
||||
mode: ExplainMode)
|
||||
extends RunnableCommand {
|
||||
|
||||
override val output: Seq[Attribute] =
|
||||
|
@ -154,48 +147,13 @@ case class ExplainCommand(
|
|||
|
||||
// Run through the optimizer to generate the physical plan.
|
||||
override def run(sparkSession: SparkSession): Seq[Row] = try {
|
||||
val queryExecution = ExplainCommandUtil.explainedQueryExecution(sparkSession, logicalPlan,
|
||||
sparkSession.sessionState.executePlan(logicalPlan))
|
||||
val outputString =
|
||||
if (codegen) {
|
||||
try {
|
||||
codegenString(queryExecution.executedPlan)
|
||||
} catch {
|
||||
case e: AnalysisException => e.toString
|
||||
}
|
||||
} else if (extended) {
|
||||
queryExecution.toString
|
||||
} else if (cost) {
|
||||
queryExecution.stringWithStats
|
||||
} else if (formatted) {
|
||||
queryExecution.simpleString(formatted = true)
|
||||
} else {
|
||||
queryExecution.simpleString
|
||||
}
|
||||
val outputString = sparkSession.sessionState.executePlan(logicalPlan).explainString(mode)
|
||||
Seq(Row(outputString))
|
||||
} catch { case cause: TreeNodeException[_] =>
|
||||
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
|
||||
}
|
||||
}
|
||||
|
||||
object ExplainCommandUtil {
|
||||
// Returns `QueryExecution` which is used to explain a logical plan.
|
||||
def explainedQueryExecution(
|
||||
sparkSession: SparkSession,
|
||||
logicalPlan: LogicalPlan,
|
||||
queryExecution: => QueryExecution): QueryExecution = {
|
||||
if (logicalPlan.isStreaming) {
|
||||
// This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
|
||||
// output mode does not matter since there is no `Sink`.
|
||||
new IncrementalExecution(
|
||||
sparkSession, logicalPlan, OutputMode.Append(), "<unknown>",
|
||||
UUID.randomUUID, UUID.randomUUID, 0, OffsetSeqMetadata(0, 0))
|
||||
} else {
|
||||
queryExecution
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** An explain command for users to see how a streaming batch is executed. */
|
||||
case class StreamingExplainCommand(
|
||||
queryExecution: IncrementalExecution,
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import org.apache.spark.sql.execution._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.test.SharedSparkSession
|
||||
|
@ -28,7 +29,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = {
|
||||
val output = new java.io.ByteArrayOutputStream()
|
||||
Console.withOut(output) {
|
||||
df.explain(mode)
|
||||
df.explain(mode.name)
|
||||
}
|
||||
output.toString.replaceAll("#\\d+", "#x")
|
||||
}
|
||||
|
@ -66,7 +67,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
}
|
||||
|
||||
private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = {
|
||||
checkKeywordsExistsInExplain(df, ExplainMode.Extended, keywords: _*)
|
||||
checkKeywordsExistsInExplain(df, ExtendedMode, keywords: _*)
|
||||
}
|
||||
|
||||
test("SPARK-23034 show rdd names in RDD scan nodes (Dataset)") {
|
||||
|
@ -217,7 +218,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
test("SPARK-26659: explain of DataWritingCommandExec should not contain duplicate cmd.nodeName") {
|
||||
withTable("temptable") {
|
||||
val df = sql("create table temptable using parquet as select * from range(2)")
|
||||
withNormalizedExplain(df, ExplainMode.Simple) { normalizedOutput =>
|
||||
withNormalizedExplain(df, SimpleMode) { normalizedOutput =>
|
||||
assert("Create\\w*?TableAsSelectCommand".r.findAllMatchIn(normalizedOutput).length == 1)
|
||||
}
|
||||
}
|
||||
|
@ -276,7 +277,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2")
|
||||
val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2"))
|
||||
|
||||
val simpleExplainOutput = getNormalizedExplain(testDf, ExplainMode.Simple)
|
||||
val simpleExplainOutput = getNormalizedExplain(testDf, SimpleMode)
|
||||
assert(simpleExplainOutput.startsWith("== Physical Plan =="))
|
||||
Seq("== Parsed Logical Plan ==",
|
||||
"== Analyzed Logical Plan ==",
|
||||
|
@ -285,7 +286,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
}
|
||||
checkKeywordsExistsInExplain(
|
||||
testDf,
|
||||
ExplainMode.Extended,
|
||||
ExtendedMode,
|
||||
"== Parsed Logical Plan ==" ::
|
||||
"== Analyzed Logical Plan ==" ::
|
||||
"== Optimized Logical Plan ==" ::
|
||||
|
@ -293,18 +294,18 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
Nil: _*)
|
||||
checkKeywordsExistsInExplain(
|
||||
testDf,
|
||||
ExplainMode.Cost,
|
||||
CostMode,
|
||||
"Statistics(sizeInBytes=" ::
|
||||
Nil: _*)
|
||||
checkKeywordsExistsInExplain(
|
||||
testDf,
|
||||
ExplainMode.Codegen,
|
||||
CodegenMode,
|
||||
"WholeStageCodegen subtrees" ::
|
||||
"Generated code:" ::
|
||||
Nil: _*)
|
||||
checkKeywordsExistsInExplain(
|
||||
testDf,
|
||||
ExplainMode.Formatted,
|
||||
FormattedMode,
|
||||
"* LocalTableScan (1)" ::
|
||||
"(1) LocalTableScan [codegen id :" ::
|
||||
Nil: _*)
|
||||
|
@ -313,17 +314,17 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
|
|||
test("Dataset.toExplainString has mode as string") {
|
||||
val df = spark.range(10).toDF
|
||||
def assertExplainOutput(mode: ExplainMode): Unit = {
|
||||
assert(df.toExplainString(mode.toString).replaceAll("#\\d+", "#x").trim ===
|
||||
assert(df.queryExecution.explainString(mode).replaceAll("#\\d+", "#x").trim ===
|
||||
getNormalizedExplain(df, mode).trim)
|
||||
}
|
||||
assertExplainOutput(ExplainMode.Simple)
|
||||
assertExplainOutput(ExplainMode.Extended)
|
||||
assertExplainOutput(ExplainMode.Codegen)
|
||||
assertExplainOutput(ExplainMode.Cost)
|
||||
assertExplainOutput(ExplainMode.Formatted)
|
||||
assertExplainOutput(SimpleMode)
|
||||
assertExplainOutput(ExtendedMode)
|
||||
assertExplainOutput(CodegenMode)
|
||||
assertExplainOutput(CostMode)
|
||||
assertExplainOutput(FormattedMode)
|
||||
|
||||
val errMsg = intercept[IllegalArgumentException] {
|
||||
df.toExplainString("unknown")
|
||||
ExplainMode.fromString("unknown")
|
||||
}.getMessage
|
||||
assert(errMsg.contains("Unknown explain mode: unknown"))
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.math.BigDecimal
|
|||
import org.apache.spark.sql.api.java._
|
||||
import org.apache.spark.sql.catalyst.FunctionIdentifier
|
||||
import org.apache.spark.sql.catalyst.plans.logical.Project
|
||||
import org.apache.spark.sql.execution.QueryExecution
|
||||
import org.apache.spark.sql.execution.{QueryExecution, SimpleMode}
|
||||
import org.apache.spark.sql.execution.columnar.InMemoryRelation
|
||||
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, ExplainCommand}
|
||||
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
|
||||
|
@ -309,7 +309,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
|
|||
|
||||
test("SPARK-19338 Provide identical names for UDFs in the EXPLAIN output") {
|
||||
def explainStr(df: DataFrame): String = {
|
||||
val explain = ExplainCommand(df.queryExecution.logical, extended = false)
|
||||
val explain = ExplainCommand(df.queryExecution.logical, SimpleMode)
|
||||
val sparkPlan = spark.sessionState.executePlan(explain).executedPlan
|
||||
sparkPlan.executeCollect().map(_.getString(0).trim).headOption.getOrElse("")
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
|
|||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
|
||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
|
||||
import org.apache.spark.sql.execution.DataSourceScanExec
|
||||
import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode}
|
||||
import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand}
|
||||
import org.apache.spark.sql.execution.datasources.LogicalRelation
|
||||
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRDD, JDBCRelation, JdbcUtils}
|
||||
|
@ -974,7 +974,7 @@ class JDBCSuite extends QueryTest
|
|||
|
||||
test("test credentials in the properties are not in plan output") {
|
||||
val df = sql("SELECT * FROM parts")
|
||||
val explain = ExplainCommand(df.queryExecution.logical, extended = true)
|
||||
val explain = ExplainCommand(df.queryExecution.logical, ExtendedMode)
|
||||
spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
|
||||
r => assert(!List("testPass", "testUser").exists(r.toString.contains))
|
||||
}
|
||||
|
@ -987,7 +987,7 @@ class JDBCSuite extends QueryTest
|
|||
|
||||
test("test credentials in the connection url are not in the plan output") {
|
||||
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
|
||||
val explain = ExplainCommand(df.queryExecution.logical, extended = true)
|
||||
val explain = ExplainCommand(df.queryExecution.logical, ExtendedMode)
|
||||
spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
|
||||
r => assert(!List("testPass", "testUser").exists(r.toString.contains))
|
||||
}
|
||||
|
@ -1009,7 +1009,7 @@ class JDBCSuite extends QueryTest
|
|||
| password '$password')
|
||||
""".stripMargin)
|
||||
|
||||
val explain = ExplainCommand(df.queryExecution.logical, extended = true)
|
||||
val explain = ExplainCommand(df.queryExecution.logical, ExtendedMode)
|
||||
spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach { r =>
|
||||
assert(!r.toString.contains(password))
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.spark.sql._
|
|||
import org.apache.spark.sql.catalyst.plans.logical.Range
|
||||
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
|
||||
import org.apache.spark.sql.catalyst.util.DateTimeUtils
|
||||
import org.apache.spark.sql.execution.SimpleMode
|
||||
import org.apache.spark.sql.execution.command.ExplainCommand
|
||||
import org.apache.spark.sql.execution.streaming._
|
||||
import org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, MemorySink}
|
||||
|
@ -472,7 +473,7 @@ class StreamSuite extends StreamTest {
|
|||
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
|
||||
|
||||
// Test `df.explain`
|
||||
val explain = ExplainCommand(df.queryExecution.logical, extended = false)
|
||||
val explain = ExplainCommand(df.queryExecution.logical, SimpleMode)
|
||||
val explainString =
|
||||
spark.sessionState
|
||||
.executePlan(explain)
|
||||
|
@ -524,7 +525,7 @@ class StreamSuite extends StreamTest {
|
|||
val df = inputData.toDS().map(_ * 2).filter(_ > 5)
|
||||
|
||||
// Test `df.explain`
|
||||
val explain = ExplainCommand(df.queryExecution.logical, extended = false)
|
||||
val explain = ExplainCommand(df.queryExecution.logical, SimpleMode)
|
||||
val explainString =
|
||||
spark.sessionState
|
||||
.executePlan(explain)
|
||||
|
|
Loading…
Reference in a new issue