[SPARK-17180][SPARK-17309][SPARK-17323][SQL] create AlterViewAsCommand to handle ALTER VIEW AS

## What changes were proposed in this pull request?

Currently we use `CreateViewCommand` to implement ALTER VIEW AS, which has 3 bugs:

1. SPARK-17180: ALTER VIEW AS should alter temp view if view name has no database part and temp view exists
2. SPARK-17309: ALTER VIEW AS should issue exception if view does not exist.
3. SPARK-17323: ALTER VIEW AS should keep the previous table properties, comment, create_time, etc.

The root cause is, ALTER VIEW AS is quite different from CREATE VIEW, we need different code path to handle them. However, in `CreateViewCommand`, there is no way to distinguish ALTER VIEW AS and CREATE VIEW, we have to introduce extra flag. But instead of doing this, I think a more natural way is to separate the ALTER VIEW AS logic into a new command.

## How was this patch tested?

new tests in SQLViewSuite

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14874 from cloud-fan/minor4.
This commit is contained in:
Wenchen Fan 2016-08-31 17:08:08 +08:00
parent fa6347938f
commit 12fd0cd615
3 changed files with 157 additions and 60 deletions

View file

@ -1254,60 +1254,33 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ic.identifier.getText -> Option(ic.STRING).map(string)
}
}
createView(
ctx,
ctx.tableIdentifier,
CreateViewCommand(
name = visitTableIdentifier(ctx.tableIdentifier),
userSpecifiedColumns = userSpecifiedColumns,
comment = Option(ctx.STRING).map(string),
userSpecifiedColumns,
ctx.query,
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty),
properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty),
originalText = Option(source(ctx.query)),
child = plan(ctx.query),
allowExisting = ctx.EXISTS != null,
replace = ctx.REPLACE != null,
isTemporary = ctx.TEMPORARY != null
)
isTemporary = ctx.TEMPORARY != null)
}
}
/**
* Alter the query of a view. This creates a [[CreateViewCommand]] command.
* Alter the query of a view. This creates a [[AlterViewAsCommand]] command.
*
* For example:
* {{{
* ALTER VIEW [db_name.]view_name AS SELECT ...;
* }}}
*/
override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
createView(
ctx,
name = ctx.tableIdentifier,
comment = None,
userSpecifiedColumns = Seq.empty,
query = ctx.query,
properties = Map.empty,
allowExisting = false,
replace = true,
isTemporary = false)
}
/**
* Create a [[CreateViewCommand]] command.
*/
private def createView(
ctx: ParserRuleContext,
name: TableIdentifierContext,
comment: Option[String],
userSpecifiedColumns: Seq[(String, Option[String])],
query: QueryContext,
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean,
isTemporary: Boolean): LogicalPlan = {
val originalText = source(query)
CreateViewCommand(
visitTableIdentifier(name),
userSpecifiedColumns,
comment,
properties,
Some(originalText),
plan(query),
allowExisting = allowExisting,
replace = replace,
isTemporary = isTemporary)
AlterViewAsCommand(
name = visitTableIdentifier(ctx.tableIdentifier),
originalText = source(ctx.query),
query = plan(ctx.query))
}
/**

View file

@ -22,15 +22,16 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.types.StructType
/**
* Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
* depending on Hive meta-store.
* Create or replace a view with given query plan. This command will convert the query plan to
* canonicalized SQL string, and store it as view text in metastore, if we need to create a
* permanent view.
*
* @param name the name of this view.
* @param userSpecifiedColumns the output column names and optional comments specified by users,
@ -64,11 +65,6 @@ case class CreateViewCommand(
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is
// different from Hive and may not work for some cases like create view on self join.
override def output: Seq[Attribute] = Seq.empty[Attribute]
if (!isTemporary) {
require(originalText.isDefined,
"The table to created with CREATE VIEW must have 'originalText'.")
@ -119,9 +115,7 @@ case class CreateViewCommand(
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
} else if (tableMetadata.tableType != CatalogTableType.VIEW) {
throw new AnalysisException(
"Existing table is not a view. The following is an existing table, " +
s"not a view: $qualifiedName")
throw new AnalysisException(s"$qualifiedName is not a view")
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
@ -179,7 +173,7 @@ case class CreateViewCommand(
sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: ${viewSQL}", e)
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
}
val viewSchema = if (userSpecifiedColumns.isEmpty) {
@ -202,3 +196,62 @@ case class CreateViewCommand(
)
}
}
/**
* Alter a view with given query plan. If the view name contains database prefix, this command will
* alter a permanent view matching the given name, or throw an exception if view not exist. Else,
* this command will try to alter a temporary view first, if view not exist, try permanent view
* next, if still not exist, throw an exception.
*
* @param name the name of this view.
* @param originalText the original SQL text of this view. Note that we can only alter a view by
* SQL API, which means we always have originalText.
* @param query the logical plan that represents the view; this is used to generate a canonicalized
* version of the SQL that can be saved in the catalog.
*/
case class AlterViewAsCommand(
name: TableIdentifier,
originalText: String,
query: LogicalPlan) extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(session: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = session.sessionState.executePlan(query)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
if (session.sessionState.catalog.isTemporaryTable(name)) {
session.sessionState.catalog.createTempView(name.table, analyzedPlan, overrideIfExists = true)
} else {
alterPermanentView(session, analyzedPlan)
}
Seq.empty[Row]
}
private def alterPermanentView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = {
val viewMeta = session.sessionState.catalog.getTableMetadata(name)
if (viewMeta.tableType != CatalogTableType.VIEW) {
throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
}
val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL
// Validate the view SQL - make sure we can parse it and analyze it.
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
try {
session.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
}
val updatedViewMeta = viewMeta.copy(
schema = analyzedPlan.schema,
viewOriginalText = Some(originalText),
viewText = Some(viewSQL))
session.sessionState.catalog.alterTable(updatedViewMeta)
}
}

View file

@ -18,6 +18,8 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@ -60,15 +62,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
var e = intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
}.getMessage
assert(e.contains("The following is an existing table, not a view: `default`.`tab1`"))
assert(e.contains("`default`.`tab1` is not a view"))
e = intercept[AnalysisException] {
sql("CREATE VIEW tab1 AS SELECT * FROM jt")
}.getMessage
assert(e.contains("The following is an existing table, not a view: `default`.`tab1`"))
assert(e.contains("`default`.`tab1` is not a view"))
e = intercept[AnalysisException] {
sql("ALTER VIEW tab1 AS SELECT * FROM jt")
}.getMessage
assert(e.contains("The following is an existing table, not a view: `default`.`tab1`"))
assert(e.contains("`default`.`tab1` is not a view"))
}
}
@ -274,6 +276,75 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
test("should not allow ALTER VIEW AS when the view does not exist") {
intercept[NoSuchTableException](
sql("ALTER VIEW testView AS SELECT 1, 2")
)
intercept[NoSuchTableException](
sql("ALTER VIEW default.testView AS SELECT 1, 2")
)
}
test("ALTER VIEW AS should try to alter temp view first if view name has no database part") {
withView("test_view") {
withTempView("test_view") {
sql("CREATE VIEW test_view AS SELECT 1 AS a, 2 AS b")
sql("CREATE TEMP VIEW test_view AS SELECT 1 AS a, 2 AS b")
sql("ALTER VIEW test_view AS SELECT 3 AS i, 4 AS j")
// The temporary view should be updated.
checkAnswer(spark.table("test_view"), Row(3, 4))
// The permanent view should stay same.
checkAnswer(spark.table("default.test_view"), Row(1, 2))
}
}
}
test("ALTER VIEW AS should alter permanent view if view name has database part") {
withView("test_view") {
withTempView("test_view") {
sql("CREATE VIEW test_view AS SELECT 1 AS a, 2 AS b")
sql("CREATE TEMP VIEW test_view AS SELECT 1 AS a, 2 AS b")
sql("ALTER VIEW default.test_view AS SELECT 3 AS i, 4 AS j")
// The temporary view should stay same.
checkAnswer(spark.table("test_view"), Row(1, 2))
// The permanent view should be updated.
checkAnswer(spark.table("default.test_view"), Row(3, 4))
}
}
}
test("ALTER VIEW AS should keep the previous table properties, comment, create_time, etc.") {
withView("test_view") {
sql(
"""
|CREATE VIEW test_view
|COMMENT 'test'
|TBLPROPERTIES ('key' = 'a')
|AS SELECT 1 AS a, 2 AS b
""".stripMargin)
val catalog = spark.sessionState.catalog
val viewMeta = catalog.getTableMetadata(TableIdentifier("test_view"))
assert(viewMeta.comment == Some("test"))
assert(viewMeta.properties("key") == "a")
sql("ALTER VIEW test_view AS SELECT 3 AS i, 4 AS j")
val updatedViewMeta = catalog.getTableMetadata(TableIdentifier("test_view"))
assert(updatedViewMeta.comment == Some("test"))
assert(updatedViewMeta.properties("key") == "a")
assert(updatedViewMeta.createTime == viewMeta.createTime)
// The view should be updated.
checkAnswer(spark.table("test_view"), Row(3, 4))
}
}
test("create hive view for json table") {
// json table is not hive-compatible, make sure the new flag fix it.
withView("testView") {