[SPARK-35378][SQL][FOLLOWUP] Restore the command execution name for DataFrameWriterV2

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32513

It's hard to keep the command execution name for `DataFrameWriter`, as the command logical plan is a bit messy (DS v1, file source and hive and different command logical plans) and sometimes it's hard to distinguish "insert" and "save".

However, `DataFrameWriterV2` only produce v2 commands which are pretty clean. It's easy to keep the command execution name for them.

### Why are the changes needed?

less breaking changes.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #32919 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2021-06-17 08:55:42 +00:00
parent 939ae91e00
commit 0c5a01a78c
5 changed files with 19 additions and 13 deletions

View file

@ -95,7 +95,7 @@ license: |
- In Spark 3.2, `FloatType` is mapped to `FLOAT` in MySQL. Prior to this, it used to be mapped to `REAL`, which is by default a synonym to `DOUBLE PRECISION` in MySQL.
- In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`, `create`, `append`, `overwrite`, `overwritePartitions`, `replace`.
- In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`.
## Upgrading from Spark SQL 3.0 to 3.1

View file

@ -857,8 +857,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
// call `QueryExecution.commandExecuted` to trigger the execution of commands.
qe.commandExecuted
qe.assertCommandExecuted()
}
private def lookupV2Provider(): Option[TableProvider] = {

View file

@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.types.IntegerType
/**
@ -191,8 +190,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
*/
private def runCommand(command: LogicalPlan): Unit = {
val qe = sparkSession.sessionState.executePlan(command)
// call `QueryExecution.toRDD` to trigger the execution of commands.
SQLExecution.withNewExecutionId(qe, Some("command"))(qe.toRdd)
qe.assertCommandExecuted()
}
private def internalReplace(orCreate: Boolean): Unit = {

View file

@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
@ -81,11 +81,21 @@ class QueryExecution(
case CommandExecutionMode.SKIP => analyzed
}
private def commandExecutionName(command: Command): String = command match {
case _: CreateTableAsSelect => "create"
case _: ReplaceTableAsSelect => "replace"
case _: AppendData => "append"
case _: OverwriteByExpression => "overwrite"
case _: OverwritePartitionsDynamic => "overwritePartitions"
case _ => "command"
}
private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
case c: Command =>
val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
val result =
SQLExecution.withNewExecutionId(qe, Some("command"))(qe.executedPlan.executeCollect())
val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {
qe.executedPlan.executeCollect()
}
CommandResult(
qe.analyzed.output,
qe.commandExecuted,
@ -102,7 +112,7 @@ class QueryExecution(
sparkSession.sharedState.cacheManager.useCachedData(commandExecuted.clone())
}
private def assertCommandExecuted(): Unit = commandExecuted
def assertCommandExecuted(): Unit = commandExecuted
lazy val optimizedPlan: LogicalPlan = {
// We need to materialize the commandExecuted here because optimizedPlan is also tracked under

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NullOrdering, SortDirection, SortOrder}
import org.apache.spark.sql.connector.expressions.LogicalExpressions._
import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SortExec, SparkPlan}
import org.apache.spark.sql.execution.{QueryExecution, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
@ -778,8 +778,7 @@ class WriteDistributionAndOrderingSuite
sparkContext.listenerBus.waitUntilEmpty()
assert(executedPlan.isInstanceOf[CommandResultExec])
executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan match {
executedPlan match {
case w: V2TableWriteExec =>
stripAQEPlan(w.query)
case _ =>