[SPARK-33685][SQL] Migrate DROP VIEW command to use UnresolvedView to resolve the identifier

### What changes were proposed in this pull request?

This PR introduces `UnresolvedView` in the resolution framework to resolve the identifier.

This PR then migrates `DROP VIEW` to use `UnresolvedView` to resolve the table/view identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

### Why are the changes needed?

To use `UnresolvedView` for view resolution. Note that there is no resolution behavior change with this PR.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated existing tests.

Closes #30636 from imback82/drop_view_v2.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Terry Kim 2020-12-08 14:07:58 +00:00 committed by Wenchen Fan
parent 2b30dde249
commit c05ee06f5b
15 changed files with 118 additions and 52 deletions

View file

@ -150,7 +150,7 @@ object AnalysisContext {
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
class Analyzer(override val catalogManager: CatalogManager)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog with SQLConfHelper {
extends RuleExecutor[LogicalPlan] with CheckAnalysis with SQLConfHelper {
private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
@ -277,7 +277,7 @@ class Analyzer(override val catalogManager: CatalogManager)
TypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once,
Seq(ResolveNoopDropTable) ++
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once,
@ -889,6 +889,11 @@ class Analyzer(override val catalogManager: CatalogManager)
u.failAnalysis(s"${ident.quoted} is a temp view. '$cmd' expects a table")
}
u
case u @ UnresolvedView(ident, _, _) =>
lookupTempView(ident).map { _ =>
ResolvedView(ident.asIdentifier, isTemp = true)
}
.getOrElse(u)
case u @ UnresolvedTableOrView(ident, cmd, allowTempView) =>
lookupTempView(ident)
.map { _ =>
@ -1113,6 +1118,14 @@ class Analyzer(override val catalogManager: CatalogManager)
case table => table
}.getOrElse(u)
case u @ UnresolvedView(identifier, cmd, relationTypeMismatchHint) =>
lookupTableOrView(identifier).map {
case v: ResolvedView => v
case _ =>
u.failAnalysis(s"${identifier.quoted} is a table. '$cmd' expects a view." +
relationTypeMismatchHint.map(" " + _).getOrElse(""))
}.getOrElse(u)
case u @ UnresolvedTableOrView(identifier, _, _) =>
lookupTableOrView(identifier).getOrElse(u)
}

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table}
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@ -34,7 +34,7 @@ import org.apache.spark.sql.types._
/**
* Throws user facing errors when passed invalid queries that fail to analyze.
*/
trait CheckAnalysis extends PredicateHelper {
trait CheckAnalysis extends PredicateHelper with LookupCatalog {
protected def isView(nameParts: Seq[String]): Boolean
@ -104,6 +104,15 @@ trait CheckAnalysis extends PredicateHelper {
case u: UnresolvedTable =>
u.failAnalysis(s"Table not found for '${u.commandName}': ${u.multipartIdentifier.quoted}")
case u @ UnresolvedView(NonSessionCatalogAndIdentifier(catalog, ident), cmd, _) =>
u.failAnalysis(
s"Cannot specify catalog `${catalog.name}` for view ${ident.quoted} " +
"because view support in v2 catalog has not been implemented yet. " +
s"$cmd expects a view.")
case u: UnresolvedView =>
u.failAnalysis(s"View not found for '${u.commandName}': ${u.multipartIdentifier.quoted}")
case u: UnresolvedTableOrView =>
val viewStr = if (u.allowTempView) "view" else "permanent view"
u.failAnalysis(

View file

@ -187,11 +187,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
writeOptions = c.writeOptions,
orCreate = c.orCreate)
case DropViewStatement(NonSessionCatalogAndTable(catalog, viewName), _) =>
throw new AnalysisException(
s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " +
s"because view support in catalog has not been implemented yet")
case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
if !isSessionCatalog(catalog) =>
CreateNamespace(catalog.asNamespaceCatalog, ns, c.ifNotExists, c.properties)

View file

@ -17,17 +17,19 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable}
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand}
import org.apache.spark.sql.catalyst.rules.Rule
/**
* A rule for handling [[DropTable]] logical plan when the table or temp view is not resolved.
* If "ifExists" flag is set to true, the plan is resolved to [[NoopDropTable]],
* which is a no-op command.
* A rule for handling commands when the table or temp view is not resolved.
* These commands support a flag, "ifExists", so that they do not fail when a relation is not
* resolved. If the "ifExists" flag is set to true. the plan is resolved to [[NoopCommand]],
*/
object ResolveNoopDropTable extends Rule[LogicalPlan] {
object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists =>
NoopDropTable(u.multipartIdentifier)
NoopCommand("DROP TABLE", u.multipartIdentifier)
case DropView(u: UnresolvedView, ifExists) if ifExists =>
NoopCommand("DROP VIEW", u.multipartIdentifier)
}
}

View file

@ -45,6 +45,19 @@ case class UnresolvedTable(
override def output: Seq[Attribute] = Nil
}
/**
* Holds the name of a view that has yet to be looked up in a catalog. It will be resolved to
* [[ResolvedView]] during analysis.
*/
case class UnresolvedView(
multipartIdentifier: Seq[String],
commandName: String,
relationTypeMismatchHint: Option[String] = None) extends LeafNode {
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}
/**
* Holds the name of a table or view that has yet to be looked up in a catalog. It will
* be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis.

View file

@ -3155,11 +3155,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
* Create a [[DropViewStatement]] command.
* Create a [[DropView]] command.
*/
override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) {
DropViewStatement(
visitMultipartIdentifier(ctx.multipartIdentifier()),
DropView(
UnresolvedView(
visitMultipartIdentifier(ctx.multipartIdentifier()),
"DROP VIEW",
Some("Please use DROP TABLE instead.")),
ctx.EXISTS != null)
}

View file

@ -338,13 +338,6 @@ case class AlterViewAsStatement(
originalText: String,
query: LogicalPlan) extends ParsedStatement
/**
* A DROP VIEW statement, as parsed from SQL.
*/
case class DropViewStatement(
viewName: Seq[String],
ifExists: Boolean) extends ParsedStatement
/**
* An INSERT INTO statement, as parsed from SQL.
*

View file

@ -419,9 +419,11 @@ case class DropTable(
}
/**
* The logical plan for handling non-existing table for DROP TABLE command.
* The logical plan for no-op command handling non-existing table.
*/
case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command
case class NoopCommand(
commandName: String,
multipartIdentifier: Seq[String]) extends Command
/**
* The logical plan of the ALTER TABLE command.
@ -724,3 +726,12 @@ case class ShowPartitions(
override val output: Seq[Attribute] = Seq(
AttributeReference("partition", StringType, nullable = false)())
}
/**
* The logical plan of the DROP VIEW command.
*/
case class DropView(
child: LogicalPlan,
ifExists: Boolean) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
@ -721,13 +721,18 @@ class DDLParserSuite extends AnalysisTest {
}
test("drop view") {
val cmd = "DROP VIEW"
val hint = Some("Please use DROP TABLE instead.")
parseCompare(s"DROP VIEW testcat.db.view",
DropViewStatement(Seq("testcat", "db", "view"), ifExists = false))
parseCompare(s"DROP VIEW db.view", DropViewStatement(Seq("db", "view"), ifExists = false))
DropView(UnresolvedView(Seq("testcat", "db", "view"), cmd, hint), ifExists = false))
parseCompare(s"DROP VIEW db.view",
DropView(UnresolvedView(Seq("db", "view"), cmd, hint), ifExists = false))
parseCompare(s"DROP VIEW IF EXISTS db.view",
DropViewStatement(Seq("db", "view"), ifExists = true))
parseCompare(s"DROP VIEW view", DropViewStatement(Seq("view"), ifExists = false))
parseCompare(s"DROP VIEW IF EXISTS view", DropViewStatement(Seq("view"), ifExists = true))
DropView(UnresolvedView(Seq("db", "view"), cmd, hint), ifExists = true))
parseCompare(s"DROP VIEW view",
DropView(UnresolvedView(Seq("view"), cmd, hint), ifExists = false))
parseCompare(s"DROP VIEW IF EXISTS view",
DropView(UnresolvedView(Seq("view"), cmd, hint), ifExists = true))
}
private def testCreateOrReplaceDdl(

View file

@ -352,9 +352,8 @@ class ResolveSessionCatalog(
}
DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge)
// v1 DROP TABLE supports temp view.
case DropViewStatement(TempViewOrV1Table(name), ifExists) =>
DropTableCommand(name.asTableIdentifier, ifExists, isView = true, purge = false)
case DropView(r: ResolvedView, ifExists) =>
DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = true, purge = false)
case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
if isSessionCatalog(catalog) =>

View file

@ -251,7 +251,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DropTable(r: ResolvedTable, ifExists, purge) =>
DropTableExec(r.catalog, r.identifier, ifExists, purge, invalidateCache(r)) :: Nil
case _: NoopDropTable =>
case _: NoopCommand =>
LocalTableScanExec(Nil, Nil) :: Nil
case AlterTable(catalog, ident, _, changes) =>

View file

@ -2594,6 +2594,13 @@ class DataSourceV2SQLSuite
}
}
test("DROP VIEW is not supported for v2 catalogs") {
assertAnalysisError(
"DROP VIEW testcat.v",
"Cannot specify catalog `testcat` for view v because view support in v2 catalog " +
"has not been implemented yet. DROP VIEW expects a view.")
}
private def testNotSupportedV2Command(
sqlCommand: String,
sqlParams: String,
@ -2612,13 +2619,6 @@ class DataSourceV2SQLSuite
assert(e.message.contains(s"$sqlCommand is only supported with v1 tables"))
}
private def testV1CommandSupportingTempView(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
}
assert(e.message.contains(s"$sqlCommand is only supported with temp views or v1 tables"))
}
private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)

View file

@ -1363,12 +1363,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
assert(catalog.listTables("dbx") == Seq(tableIdent))
val e = intercept[AnalysisException] {
sql("DROP VIEW dbx.tab1")
}
assert(
e.getMessage.contains("Cannot drop a table with DROP VIEW. Please use DROP TABLE instead"))
assert(e.getMessage.contains(
"dbx.tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead."))
}
protected def testSetProperties(isDatasourceTable: Boolean): Unit = {

View file

@ -78,6 +78,14 @@ class PlanResolutionSuite extends AnalysisTest {
V1Table(t)
}
private val view: V1Table = {
val t = mock(classOf[CatalogTable])
when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string"))
when(t.tableType).thenReturn(CatalogTableType.VIEW)
when(t.provider).thenReturn(Some(v1Format))
V1Table(t)
}
private val testCat: TableCatalog = {
val newCatalog = mock(classOf[TableCatalog])
when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => {
@ -101,6 +109,7 @@ class PlanResolutionSuite extends AnalysisTest {
case "v2Table" => table
case "v2Table1" => table
case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability
case "view" => view
case name => throw new NoSuchTableException(name)
}
})
@ -148,7 +157,10 @@ class PlanResolutionSuite extends AnalysisTest {
manager
}
def parseAndResolve(query: String, withDefault: Boolean = false): LogicalPlan = {
def parseAndResolve(
query: String,
withDefault: Boolean = false,
checkAnalysis: Boolean = false): LogicalPlan = {
val catalogManager = if (withDefault) {
catalogManagerWithDefault
} else {
@ -158,8 +170,13 @@ class PlanResolutionSuite extends AnalysisTest {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Seq(
new ResolveSessionCatalog(catalogManager, _ == Seq("v"), _ => false))
}
// We don't check analysis here, as we expect the plan to be unresolved such as `CreateTable`.
analyzer.execute(CatalystSqlParser.parsePlan(query))
// We don't check analysis here by default, as we expect the plan to be unresolved
// such as `CreateTable`.
val analyzed = analyzer.execute(CatalystSqlParser.parsePlan(query))
if (checkAnalysis) {
analyzer.checkAnalysis(analyzed)
}
analyzed
}
private def parseResolveCompare(query: String, expected: LogicalPlan): Unit =
@ -677,6 +694,8 @@ class PlanResolutionSuite extends AnalysisTest {
val viewIdent1 = TableIdentifier("view", Option("db"))
val viewName2 = "view"
val viewIdent2 = TableIdentifier("view", Option("default"))
val tempViewName = "v"
val tempViewIdent = TableIdentifier("v")
parseResolveCompare(s"DROP VIEW $viewName1",
DropTableCommand(viewIdent1, ifExists = false, isView = true, purge = false))
@ -686,11 +705,15 @@ class PlanResolutionSuite extends AnalysisTest {
DropTableCommand(viewIdent2, ifExists = false, isView = true, purge = false))
parseResolveCompare(s"DROP VIEW IF EXISTS $viewName2",
DropTableCommand(viewIdent2, ifExists = true, isView = true, purge = false))
parseResolveCompare(s"DROP VIEW $tempViewName",
DropTableCommand(tempViewIdent, ifExists = false, isView = true, purge = false))
parseResolveCompare(s"DROP VIEW IF EXISTS $tempViewName",
DropTableCommand(tempViewIdent, ifExists = true, isView = true, purge = false))
}
test("drop view in v2 catalog") {
intercept[AnalysisException] {
parseAndResolve("DROP VIEW testcat.db.view")
parseAndResolve("DROP VIEW testcat.db.view", checkAnalysis = true)
}.getMessage.toLowerCase(Locale.ROOT).contains(
"view support in catalog has not been implemented")
}

View file

@ -1048,7 +1048,8 @@ class HiveDDLSuite
val message = intercept[AnalysisException] {
sql("DROP VIEW tab1")
}.getMessage
assert(message.contains("Cannot drop a table with DROP VIEW. Please use DROP TABLE instead"))
assert(message.contains(
"tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead."))
}
}