[SPARK-34546][SQL] AlterViewAs.query should be analyzed during the analysis phase, and AlterViewAs should invalidate the cache

### What changes were proposed in this pull request?

This PR proposes the following:
   * `AlterViewAs.query` is currently analyzed in the physical operator `AlterViewAsCommand`, but it should be analyzed during the analysis phase.
   *  When `spark.sql.legacy.storeAnalyzedPlanForView` is set to true, store `TermporaryViewRelation` which wraps the analyzed plan, similar to #31273.
   *  Try to uncache the view you are altering.

### Why are the changes needed?

Analyzing a plan should be done in the analysis phase if possible.

Not uncaching the view (existing behavior) seems like a bug since the cache may not be used again.

### Does this PR introduce _any_ user-facing change?

Yes, now the view can be uncached if it's already cached.

### How was this patch tested?

Added new tests around uncaching.

The existing tests such as `SQLViewSuite` should cover the analysis changes.

Closes #31652 from imback82/alter_view_child.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Terry Kim 2021-03-11 05:31:40 +00:00 committed by Wenchen Fan
parent 5e120e4651
commit 2a6e68e1f7
10 changed files with 183 additions and 88 deletions

View file

@ -879,7 +879,7 @@ class Analyzer(override val catalogManager: CatalogManager)
private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
EliminateSubqueryAliases(plan) match {
case v: View if v.isDataFrameTempView => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
case other => other
}
}

View file

@ -393,7 +393,7 @@ object UnsupportedOperationChecker extends Logging {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
_: TypedFilter) =>
case v: View if v.isDataFrameTempView =>
case v: View if v.isTempViewStoringAnalyzedPlan =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")

View file

@ -31,7 +31,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
@ -468,7 +468,7 @@ object CatalogTable {
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"
val VIEW_CREATED_FROM_DATAFRAME = VIEW_PREFIX + "createdFromDataFrame"
val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"
def splitLargeTableProp(
key: String,
@ -782,14 +782,14 @@ case class UnresolvedCatalogRelation(
/**
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
* and will be transformed to `View` during analysis. If the temporary view was
* created from a dataframe, `plan` is set to the analyzed plan for the view.
* and will be transformed to `View` during analysis. If the temporary view is
* storing an analyzed plan, `plan` is set to the analyzed plan for the view.
*/
case class TemporaryViewRelation(
tableMeta: CatalogTable,
plan: Option[LogicalPlan] = None) extends LeafNode {
require(plan.isEmpty ||
(plan.get.resolved && tableMeta.properties.contains(VIEW_CREATED_FROM_DATAFRAME)))
(plan.get.resolved && tableMeta.properties.contains(VIEW_STORING_ANALYZED_PLAN)))
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
@ -462,7 +462,7 @@ case class View(
desc: CatalogTable,
isTempView: Boolean,
child: LogicalPlan) extends UnaryNode {
require(!isDataFrameTempView || child.resolved)
require(!isTempViewStoringAnalyzedPlan || child.resolved)
override def output: Seq[Attribute] = child.output
@ -475,8 +475,8 @@ case class View(
case _ => child.canonicalized
}
def isDataFrameTempView: Boolean =
isTempView && desc.properties.contains(VIEW_CREATED_FROM_DATAFRAME)
def isTempViewStoringAnalyzedPlan: Boolean =
isTempView && desc.properties.contains(VIEW_STORING_ANALYZED_PLAN)
// When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view
// output schema doesn't change even if the table referenced by the view is changed after view

View file

@ -855,7 +855,7 @@ case class AlterViewAs(
child: LogicalPlan,
originalText: String,
query: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
override def children: Seq[LogicalPlan] = child :: query :: Nil
}
/**

View file

@ -66,7 +66,7 @@ trait AnalysisTest extends PlanTest {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
val actualPlan = getAnalyzer.executeAndCheck(inputPlan, new QueryPlanningTracker)
val transformed = actualPlan transformUp {
case v: View if v.isDataFrameTempView => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
}
comparePlans(transformed, expectedPlan)
}

View file

@ -86,7 +86,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}
private def getTempViewRawPlan(plan: Option[LogicalPlan]): Option[LogicalPlan] = plan match {
case Some(v: View) if v.isDataFrameTempView => Some(v.child)
case Some(v: View) if v.isTempViewStoringAnalyzedPlan => Some(v.child)
case other => other
}

View file

@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)
case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
AlterViewAsCommand(
ident.asTableIdentifier,
originalText,

View file

@ -22,8 +22,9 @@ import scala.collection.mutable
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
@ -115,48 +116,27 @@ case class CreateViewCommand(
if (viewType == LocalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
if (replace && needsToUncache(catalog.getRawTempView(name.table), aliasedPlan)) {
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(name), name)
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
// If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
name,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(name, aliasedPlan),
Some(aliasedPlan))
}
val tableDefinition = createTemporaryViewRelation(
name,
sparkSession,
replace,
catalog.getRawTempView,
originalText,
analyzedPlan,
aliasedPlan)
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(name.table, Option(db))
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
if (replace && needsToUncache(catalog.getRawGlobalTempView(name.table), aliasedPlan)) {
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
}
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
viewIdent,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(viewIdent, aliasedPlan),
Some(aliasedPlan))
}
val tableDefinition = createTemporaryViewRelation(
viewIdent,
sparkSession,
replace,
catalog.getRawGlobalTempView,
originalText,
analyzedPlan,
aliasedPlan)
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
val tableMetadata = catalog.getTableMetadata(name)
@ -192,20 +172,6 @@ case class CreateViewCommand(
Seq.empty[Row]
}
/**
* Checks if need to uncache the temp view being replaced.
*/
private def needsToUncache(
rawTempView: Option[LogicalPlan],
aliasedPlan: LogicalPlan): Boolean = rawTempView match {
// The temp view doesn't exist, no need to uncache.
case None => false
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the
// same-result plans.
case Some(TemporaryViewRelation(_, Some(p))) => !p.sameResult(aliasedPlan)
case Some(p) => !p.sameResult(aliasedPlan)
}
/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
@ -274,28 +240,29 @@ case class AlterViewAsCommand(
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(session: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = session.sessionState.executePlan(query)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
if (session.sessionState.catalog.isTempView(name)) {
alterTemporaryView(session, analyzedPlan)
alterTemporaryView(session, query)
} else {
alterPermanentView(session, analyzedPlan)
alterPermanentView(session, query)
}
Seq.empty[Row]
}
private def alterTemporaryView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = {
val tableDefinition = if (conf.storeAnalyzedPlanForView) {
analyzedPlan
val catalog = session.sessionState.catalog
val getRawTempView: String => Option[LogicalPlan] = if (name.database.isEmpty) {
catalog.getRawTempView
} else {
checkCyclicViewReference(analyzedPlan, Seq(name), name)
TemporaryViewRelation(
prepareTemporaryView(
name, session, analyzedPlan, analyzedPlan.schema, Some(originalText)))
catalog.getRawGlobalTempView
}
val tableDefinition = createTemporaryViewRelation(
name,
session,
replace = true,
getRawTempView,
Some(originalText),
analyzedPlan,
aliasedPlan = analyzedPlan)
session.sessionState.catalog.alterTempViewDefinition(name, tableDefinition)
}
@ -306,6 +273,9 @@ case class AlterViewAsCommand(
val viewIdent = viewMeta.identifier
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(session, viewIdent.quotedString)
val newProperties = generateViewProperties(
viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames)
@ -349,7 +319,7 @@ case class ShowViewsCommand(
}
}
object ViewHelper {
object ViewHelper extends SQLConfHelper with Logging {
private val configPrefixDenyList = Seq(
SQLConf.MAX_NESTED_VIEW_DEPTH.key,
@ -596,6 +566,67 @@ object ViewHelper {
(collectTempViews(child), collectTempFunctions(child))
}
/**
* Returns a [[TemporaryViewRelation]] that contains information about a temporary view
* to create, given an analyzed plan of the view. If a temp view is to be replaced and it is
* cached, it will be uncached before being replaced.
*
* @param name the name of the temporary view to create/replace.
* @param session the spark session.
* @param replace if true and the existing view is cached, it will be uncached.
* @param getRawTempView the function that returns an optional raw plan of the local or
* global temporary view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API or spark.sql.legacy.storeAnalyzedPlanForView is set to true.
* @param analyzedPlan the logical plan that represents the view; this is used to generate the
* logical plan for temporary view and the view schema.
* @param aliasedPlan the aliased logical plan based on the user specified columns. If there are
* no user specified plans, this should be same as `analyzedPlan`.
*/
def createTemporaryViewRelation(
name: TableIdentifier,
session: SparkSession,
replace: Boolean,
getRawTempView: String => Option[LogicalPlan],
originalText: Option[String],
analyzedPlan: LogicalPlan,
aliasedPlan: LogicalPlan): TemporaryViewRelation = {
val uncache = getRawTempView(name.table).map { r =>
needsToUncache(r, aliasedPlan)
}.getOrElse(false)
if (replace && uncache) {
logDebug(s"Try to uncache ${name.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(name), name)
CommandUtils.uncacheTableOrView(session, name.quotedString)
}
if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
name,
session,
analyzedPlan,
aliasedPlan.schema,
originalText.get))
} else {
TemporaryViewRelation(
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan),
Some(aliasedPlan))
}
}
/**
* Checks if need to uncache the temp view being replaced.
*/
private def needsToUncache(
rawTempView: LogicalPlan,
aliasedPlan: LogicalPlan): Boolean = rawTempView match {
// If TemporaryViewRelation doesn't store the analyzed view, always uncache.
case TemporaryViewRelation(_, None) => true
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the
// same-result plans.
case TemporaryViewRelation(_, Some(p)) => !p.sameResult(aliasedPlan)
case p => !p.sameResult(aliasedPlan)
}
/**
* Returns a [[CatalogTable]] that contains information for temporary view.
@ -603,12 +634,12 @@ object ViewHelper {
* column names) and store them as properties in the CatalogTable, and also creates
* the proper schema for the view.
*/
def prepareTemporaryView(
private def prepareTemporaryView(
viewName: TableIdentifier,
session: SparkSession,
analyzedPlan: LogicalPlan,
viewSchema: StructType,
originalText: Option[String]): CatalogTable = {
originalText: String): CatalogTable = {
val catalog = session.sessionState.catalog
val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, analyzedPlan)
@ -622,15 +653,15 @@ object ViewHelper {
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = viewSchema,
viewText = originalText,
viewText = Some(originalText),
properties = newProperties)
}
/**
* Returns a [[CatalogTable]] that contains information for the temporary view created
* from a dataframe.
* Returns a [[CatalogTable]] that contains information for the temporary view storing
* an analyzed plan.
*/
def prepareTemporaryViewFromDataFrame(
private def prepareTemporaryViewStoringAnalyzedPlan(
viewName: TableIdentifier,
analyzedPlan: LogicalPlan): CatalogTable = {
CatalogTable(
@ -638,6 +669,6 @@ object ViewHelper {
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = analyzedPlan.schema,
properties = Map((VIEW_CREATED_FROM_DATAFRAME, "true")))
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")))
}
}

View file

@ -1445,4 +1445,68 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}
}
test("SPARK-34546: ALTER VIEW AS should uncache if a temp view is cached") {
Seq(true, false).foreach { storeAnalyzed =>
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storeAnalyzed.toString) {
withTempView("tv") {
testAlterTemporaryViewAsWithCache(TableIdentifier("tv"), storeAnalyzed)
}
}
}
}
test("SPARK-34546: ALTER VIEW AS should uncache if a global temp view is cached") {
Seq(true, false).foreach { storeAnalyzed =>
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storeAnalyzed.toString) {
withGlobalTempView("global_tv") {
val db = spark.sharedState.globalTempViewManager.database
testAlterTemporaryViewAsWithCache(TableIdentifier("global_tv", Some(db)), storeAnalyzed)
}
}
}
}
private def testAlterTemporaryViewAsWithCache(
ident: TableIdentifier,
storeAnalyzed: Boolean): Unit = {
val (tempViewStr, viewName) = if (ident.database.nonEmpty) {
("GLOBAL TEMPORARY", s"${ident.database.get}.${ident.table}")
} else {
("TEMPORARY", ident.table)
}
sql(s"CREATE $tempViewStr VIEW ${ident.table} AS SELECT 1")
sql(s"CACHE TABLE $viewName")
assert(spark.catalog.isCached(viewName))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).nonEmpty)
if (storeAnalyzed) {
// Altered temporary view will have the same plan, thus it will not be uncached.
// Note that this check is done only if a temporary view stores an analyzed view.
sql(s"ALTER VIEW $viewName as SELECT 1")
assert(spark.catalog.isCached(viewName))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).nonEmpty)
}
sql(s"ALTER VIEW $viewName as SELECT 2")
assert(!spark.catalog.isCached(viewName))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty)
}
test("SPARK-34546: ALTER VIEW AS should uncache if a permanent view is cached") {
withView("view") {
sql("CREATE VIEW view AS SELECT 1")
sql("CACHE TABLE view")
assert(spark.catalog.isCached("view"))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).nonEmpty)
// ALTER VIEW AS on a permanent view should uncache even if the replacing view produces
// the same result.
sql("ALTER VIEW view as SELECT 1")
assert(!spark.catalog.isCached("view"))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty)
}
}
}