diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index ce655afeb4..dd654cde19 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -95,7 +95,7 @@ license: | - In Spark 3.2, `FloatType` is mapped to `FLOAT` in MySQL. Prior to this, it used to be mapped to `REAL`, which is by default a synonym to `DOUBLE PRECISION` in MySQL. - - In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`, `create`, `append`, `overwrite`, `overwritePartitions`, `replace`. + - In Spark 3.2, the query executions triggered by `DataFrameWriter` are always named `command` when being sent to `QueryExecutionListener`. In Spark 3.1 and earlier, the name is one of `save`, `insertInto`, `saveAsTable`. ## Upgrading from Spark SQL 3.0 to 3.1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5b68493ae1..47eb19969a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -857,8 +857,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = { val qe = session.sessionState.executePlan(command) - // call `QueryExecution.commandExecuted` to trigger the execution of commands. - qe.commandExecuted + qe.assertCommandExecuted() } private def lookupV2Provider(): Option[TableProvider] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 7b131058db..3931d1f1f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} -import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.types.IntegerType /** @@ -191,8 +190,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) */ private def runCommand(command: LogicalPlan): Unit = { val qe = sparkSession.sessionState.executePlan(command) - // call `QueryExecution.toRDD` to trigger the execution of commands. - SQLExecution.withNewExecutionId(qe, Some("command"))(qe.toRdd) + qe.assertCommandExecuted() } private def internalReplace(orCreate: Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index a794a47ecb..aaa87bd6ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, ReturnAnswer} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString @@ -81,11 +81,21 @@ class QueryExecution( case CommandExecutionMode.SKIP => analyzed } + private def commandExecutionName(command: Command): String = command match { + case _: CreateTableAsSelect => "create" + case _: ReplaceTableAsSelect => "replace" + case _: AppendData => "append" + case _: OverwriteByExpression => "overwrite" + case _: OverwritePartitionsDynamic => "overwritePartitions" + case _ => "command" + } + private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown { case c: Command => val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT) - val result = - SQLExecution.withNewExecutionId(qe, Some("command"))(qe.executedPlan.executeCollect()) + val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) { + qe.executedPlan.executeCollect() + } CommandResult( qe.analyzed.output, qe.commandExecuted, @@ -102,7 +112,7 @@ class QueryExecution( sparkSession.sharedState.cacheManager.useCachedData(commandExecuted.clone()) } - private def assertCommandExecuted(): Unit = commandExecuted + def assertCommandExecuted(): Unit = commandExecuted lazy val optimizedPlan: LogicalPlan = { // We need to materialize the commandExecuted here because optimizedPlan is also tracked under diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index 945a35a968..db4a9c153c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NullOrdering, SortDirection, SortOrder} import org.apache.spark.sql.connector.expressions.LogicalExpressions._ -import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SortExec, SparkPlan} +import org.apache.spark.sql.execution.{QueryExecution, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike @@ -778,8 +778,7 @@ class WriteDistributionAndOrderingSuite sparkContext.listenerBus.waitUntilEmpty() - assert(executedPlan.isInstanceOf[CommandResultExec]) - executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan match { + executedPlan match { case w: V2TableWriteExec => stripAQEPlan(w.query) case _ =>