[SPARK-33829][SQL] Renaming v2 tables should recreate the cache

### What changes were proposed in this pull request?

Currently, renaming v2 tables does not invalidate/recreate the cache, leading to an incorrect behavior (cache not being used) when v2 tables are renamed. This PR fixes the behavior.

### Why are the changes needed?

Fixing a bug since the cache associated with the renamed table is not being cleaned up/recreated.

### Does this PR introduce _any_ user-facing change?

Yes, now when a v2 table is renamed, cache is correctly updated.

### How was this patch tested?

Added a new test

Closes #30825 from imback82/rename_recreate_cache_v2.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Terry Kim 2020-12-19 08:32:58 -08:00 committed by Dongjoon Hyun
parent dd44ba5460
commit 06075d849e
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
3 changed files with 57 additions and 10 deletions

View file

@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.StorageLevel
class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {
@ -56,17 +57,24 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
session.sharedState.cacheManager.recacheByPlan(session, r)
}
private def invalidateCache(r: ResolvedTable, recacheTable: Boolean = false)(): Unit = {
// Invalidates the cache associated with the given table. If the invalidated cache matches the
// given table, the cache's storage level is returned.
private def invalidateCache(
r: ResolvedTable,
recacheTable: Boolean = false)(): Option[StorageLevel] = {
val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier))
val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation)
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
if (recacheTable && cache.isDefined) {
// save the cache name and cache level for recreation
val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
if (cache.isDefined) {
val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
// recache with the same name and cache level.
session.sharedState.cacheManager.cacheQuery(session, v2Relation, cacheName, cacheLevel)
if (recacheTable) {
val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
// recache with the same name and cache level.
session.sharedState.cacheManager.cacheQuery(session, v2Relation, cacheName, cacheLevel)
}
Some(cacheLevel)
} else {
None
}
}
@ -266,12 +274,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case AlterTable(catalog, ident, _, changes) =>
AlterTableExec(catalog, ident, changes) :: Nil
case RenameTable(ResolvedTable(catalog, oldIdent, _), newIdent, isView) =>
case RenameTable(r @ ResolvedTable(catalog, oldIdent, _), newIdent, isView) =>
if (isView) {
throw new AnalysisException(
"Cannot rename a table with ALTER VIEW. Please use ALTER TABLE instead.")
}
RenameTableExec(catalog, oldIdent, newIdent.asIdentifier) :: Nil
RenameTableExec(
catalog,
oldIdent,
newIdent.asIdentifier,
invalidateCache(r),
session.sharedState.cacheManager.cacheQuery) :: Nil
case AlterNamespaceSetProperties(ResolvedNamespace(catalog, ns), properties) =>
AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil

View file

@ -17,9 +17,12 @@
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.storage.StorageLevel
/**
* Physical plan node for renaming a table.
@ -27,14 +30,26 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
case class RenameTableExec(
catalog: TableCatalog,
oldIdent: Identifier,
newIdent: Identifier) extends V2CommandExec {
newIdent: Identifier,
invalidateCache: () => Option[StorageLevel],
cacheTable: (SparkSession, LogicalPlan, Option[String], StorageLevel) => Unit)
extends V2CommandExec {
override def output: Seq[Attribute] = Seq.empty
override protected def run(): Seq[InternalRow] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
val optOldStorageLevel = invalidateCache()
catalog.invalidateTable(oldIdent)
catalog.renameTable(oldIdent, newIdent)
optOldStorageLevel.foreach { oldStorageLevel =>
val tbl = catalog.loadTable(newIdent)
val newRelation = DataSourceV2Relation.create(tbl, Some(catalog), Some(newIdent))
cacheTable(sqlContext.sparkSession, newRelation, Some(newIdent.quoted), oldStorageLevel)
}
Seq.empty
}
}

View file

@ -37,6 +37,7 @@ import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.SimpleScanSource
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@ -863,6 +864,24 @@ class DataSourceV2SQLSuite
}
}
test("SPARK-33829: Renaming a table should recreate a cache while retaining the old cache info") {
withTable("testcat.ns.old", "testcat.ns.new") {
def getStorageLevel(tableName: String): StorageLevel = {
val table = spark.table(tableName)
val optCachedData = spark.sharedState.cacheManager.lookupCachedData(table)
assert(optCachedData.isDefined)
optCachedData.get.cachedRepresentation.cacheBuilder.storageLevel
}
sql("CREATE TABLE testcat.ns.old USING foo AS SELECT id, data FROM source")
sql("CACHE TABLE testcat.ns.old OPTIONS('storageLevel' 'MEMORY_ONLY')")
val oldStorageLevel = getStorageLevel("testcat.ns.old")
sql("ALTER TABLE testcat.ns.old RENAME TO ns.new")
val newStorageLevel = getStorageLevel("testcat.ns.new")
assert(oldStorageLevel === newStorageLevel)
}
}
test("Relation: basic") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {