[SPARK-22977][SQL] fix web UI SQL tab for CTAS

## What changes were proposed in this pull request?

This is a regression in Spark 2.3.

In Spark 2.2, we have a fragile UI support for SQL data writing commands. We only track the input query plan of `FileFormatWriter` and display its metrics. This is not ideal because we don't know who triggered the writing(can be table insertion, CTAS, etc.), but it's still useful to see the metrics of the input query.

In Spark 2.3, we introduced a new mechanism: `DataWritigCommand`, to fix the UI issue entirely. Now these writing commands have real children, and we don't need to hack into the `FileFormatWriter` for the UI. This also helps with `explain`, now `explain` can show the physical plan of the input query, while in 2.2 the physical writing plan is simply `ExecutedCommandExec` and it has no child.

However there is a regression in CTAS. CTAS commands don't extend `DataWritigCommand`, and we don't have the UI hack in `FileFormatWriter` anymore, so the UI for CTAS is just an empty node. See https://issues.apache.org/jira/browse/SPARK-22977 for more information about this UI issue.

To fix it, we should apply the `DataWritigCommand` mechanism to CTAS commands.

TODO: In the future, we should refactor this part and create some physical layer code pieces for data writing, and reuse them in different writing commands. We should have different logical nodes for different operators, even some of them share some same logic, e.g. CTAS, CREATE TABLE, INSERT TABLE. Internally we can share the same physical logic.

## How was this patch tested?

manually tested.
For data source table
<img width="644" alt="1" src="https://user-images.githubusercontent.com/3182036/35874155-bdffab28-0ba6-11e8-94a8-e32e106ba069.png">
For hive table
<img width="666" alt="2" src="https://user-images.githubusercontent.com/3182036/35874161-c437e2a8-0ba6-11e8-98ed-7930f01432c5.png">

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20521 from cloud-fan/UI.
This commit is contained in:
Wenchen Fan 2018-02-12 22:07:59 +08:00
parent caeb108e25
commit 0e2c266de7
6 changed files with 80 additions and 70 deletions

View file

@ -21,7 +21,9 @@ import java.net.URI
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
@ -136,12 +138,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
case class CreateDataSourceTableAsSelectCommand(
table: CatalogTable,
mode: SaveMode,
query: LogicalPlan)
extends RunnableCommand {
query: LogicalPlan,
outputColumns: Seq[Attribute])
extends DataWritingCommand {
override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
@ -163,7 +164,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
saveDataIntoTable(
sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
sparkSession, table, table.storage.locationUri, child, SaveMode.Append, tableExists = true)
} else {
assert(table.schema.isEmpty)
@ -173,7 +174,7 @@ case class CreateDataSourceTableAsSelectCommand(
table.storage.locationUri
}
val result = saveDataIntoTable(
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
sparkSession, table, tableLocation, child, SaveMode.Overwrite, tableExists = false)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
@ -198,10 +199,10 @@ case class CreateDataSourceTableAsSelectCommand(
session: SparkSession,
table: CatalogTable,
tableLocation: Option[URI],
data: LogicalPlan,
physicalPlan: SparkPlan,
mode: SaveMode,
tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
// Create the relation based on the input logical plan: `query`.
val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
val dataSource = DataSource(
session,
@ -212,7 +213,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)
try {
dataSource.writeAndRead(mode, query)
dataSource.writeAndRead(mode, query, outputColumns, physicalPlan)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)

View file

@ -31,8 +31,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@ -435,10 +437,11 @@ case class DataSource(
}
/**
* Writes the given [[LogicalPlan]] out in this [[FileFormat]].
* Creates a command node to write the given [[LogicalPlan]] out to the given [[FileFormat]].
* The returned command is unresolved and need to be analyzed.
*/
private def planForWritingFileFormat(
format: FileFormat, mode: SaveMode, data: LogicalPlan): LogicalPlan = {
format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand = {
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
@ -482,9 +485,24 @@ case class DataSource(
/**
* Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for
* the following reading.
*
* @param mode The save mode for this writing.
* @param data The input query plan that produces the data to be written. Note that this plan
* is analyzed and optimized.
* @param outputColumns The original output columns of the input query plan. The optimizer may not
* preserve the output column's names' case, so we need this parameter
* instead of `data.output`.
* @param physicalPlan The physical plan of the input query plan. We should run the writing
* command with this physical plan instead of creating a new physical plan,
* so that the metrics can be correctly linked to the given physical plan and
* shown in the web UI.
*/
def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
def writeAndRead(
mode: SaveMode,
data: LogicalPlan,
outputColumns: Seq[Attribute],
physicalPlan: SparkPlan): BaseRelation = {
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
@ -493,9 +511,23 @@ case class DataSource(
dataSource.createRelation(
sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
case format: FileFormat =>
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd
val cmd = planForWritingFileFormat(format, mode, data)
val resolvedPartCols = cmd.partitionColumns.map { col =>
// The partition columns created in `planForWritingFileFormat` should always be
// `UnresolvedAttribute` with a single name part.
assert(col.isInstanceOf[UnresolvedAttribute])
val unresolved = col.asInstanceOf[UnresolvedAttribute]
assert(unresolved.nameParts.length == 1)
val name = unresolved.nameParts.head
outputColumns.find(a => equality(a.name, name)).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]")
}
}
val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns)
resolved.run(sparkSession, physicalPlan)
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation()
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}

View file

@ -139,7 +139,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output)
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
parts, query, overwrite, false) if parts.isEmpty =>

View file

@ -157,7 +157,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc)
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
CreateHiveTableAsSelectCommand(tableDesc, query, query.output, mode)
case InsertIntoDir(isLocal, storage, provider, child, overwrite)
if DDLUtils.isHiveTable(provider) =>

View file

@ -20,10 +20,11 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
/**
@ -36,15 +37,15 @@ import org.apache.spark.sql.execution.command.RunnableCommand
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumns: Seq[Attribute],
mode: SaveMode)
extends RunnableCommand {
extends DataWritingCommand {
private val tableIdentifier = tableDesc.identifier
override def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (catalog.tableExists(tableIdentifier)) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand(
return Seq.empty
}
sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdentifier),
Map(),
query,
overwrite = false,
ifPartitionNotExists = false)).toRdd
InsertIntoHiveTable(
tableDesc,
Map.empty,
query,
overwrite = false,
ifPartitionNotExists = false,
outputColumns = outputColumns).run(sparkSession, child)
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
assert(tableDesc.schema.isEmpty)
sparkSession.sessionState.catalog.createTable(
tableDesc.copy(schema = query.schema), ignoreIfExists = false)
catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false)
try {
sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdentifier),
Map(),
query,
overwrite = true,
ifPartitionNotExists = false)).toRdd
// Read back the metadata of the table which was created just now.
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
// For CTAS, there is no static partition values to insert.
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
createdTableMeta,
partition,
query,
overwrite = true,
ifPartitionNotExists = false,
outputColumns = outputColumns).run(sparkSession, child)
} catch {
case NonFatal(e) =>
// drop the created table.
sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true,
purge = false)
catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false)
throw e
}
}

View file

@ -128,32 +128,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
"src")
}
test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") {
withTempView("jt") {
val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
spark.read.json(ds).createOrReplaceTempView("jt")
val outputs = sql(
s"""
|EXPLAIN EXTENDED
|CREATE TABLE t1
|AS
|SELECT * FROM jt
""".stripMargin).collect().map(_.mkString).mkString
val shouldContain =
"== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" ::
"== Optimized Logical Plan ==" :: "== Physical Plan ==" ::
"CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
for (key <- shouldContain) {
assert(outputs.contains(key), s"$key doesn't exist in result")
}
val physicalIndex = outputs.indexOf("== Physical Plan ==")
assert(outputs.substring(physicalIndex).contains("Subquery"),
"Physical Plan should contain SubqueryAlias since the query should not be optimized")
}
}
test("explain output of physical plan should contain proper codegen stage ID") {
checkKeywordsExist(sql(
"""