[SPARK-33290][SQL] REFRESH TABLE should invalidate cache even though the table itself may not be cached

### What changes were proposed in this pull request?

In `CatalogImpl.refreshTable`, this moves the `uncacheQuery` call out of the condition `if (cache.nonEmpty)` so that it will be called whether the table itself is cached or not.

### Why are the changes needed?

In the case like the following:
```sql
CREATE TABLE t ...;
CREATE VIEW t1 AS SELECT * FROM t;
REFRESH TABLE t;
```

If the table `t` is refreshed, the view `t1` which is depending on `t` will not be invalidated. This could lead to incorrect result and is similar to [SPARK-19765](https://issues.apache.org/jira/browse/SPARK-19765).

On the other hand, if we have:

```sql
CREATE TABLE t ...;
CACHE TABLE t;
CREATE VIEW t1 AS SELECT * FROM t;
REFRESH TABLE t;
```

Then the view `t1` will be refreshed. The behavior is somewhat inconsistent.

### Does this PR introduce _any_ user-facing change?

Yes, with the change any cache that are depending on the table refreshed will be invalidated with the change. Previously this only happens if the table itself is cached.

### How was this patch tested?

Added a new UT for the case.

Closes #30187 from sunchao/SPARK-33290.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Chao Sun 2020-10-31 09:49:18 -07:00 committed by Dongjoon Hyun
parent 72ad9dcd5d
commit 32b78d3795
2 changed files with 51 additions and 3 deletions

View file

@ -504,6 +504,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* If this table is cached as an InMemoryRelation, drop the original cached version and make the * If this table is cached as an InMemoryRelation, drop the original cached version and make the
* new version cached lazily. * new version cached lazily.
* *
* In addition, refreshing a table also invalidate all caches that have reference to the table
* in a cascading manner. This is to prevent incorrect result from the otherwise staled caches.
*
* @group cachemgmt * @group cachemgmt
* @since 2.0.0 * @since 2.0.0
*/ */
@ -524,14 +527,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
// If this table is cached as an InMemoryRelation, drop the original // If this table is cached as an InMemoryRelation, drop the original
// cached version and make the new version cached lazily. // cached version and make the new version cached lazily.
val cache = sparkSession.sharedState.cacheManager.lookupCachedData(table) val cache = sparkSession.sharedState.cacheManager.lookupCachedData(table)
// uncache the logical plan.
// note this is a no-op for the table itself if it's not cached, but will invalidate all
// caches referencing this table.
sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true)
if (cache.nonEmpty) { if (cache.nonEmpty) {
// save the cache name and cache level for recreation // save the cache name and cache level for recreation
val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
// uncache the logical plan.
sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true)
// recache with the same name and cache level. // recache with the same name and cache level.
sparkSession.sharedState.cacheManager.cacheQuery(table, cacheName, cacheLevel) sparkSession.sharedState.cacheManager.cacheQuery(table, cacheName, cacheLevel)
} }

View file

@ -1208,4 +1208,46 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined) assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
} }
} }
test("SPARK-33290: REFRESH TABLE should invalidate all caches referencing the table") {
withTable("t") {
withTempPath { path =>
withTempView("tempView1", "tempView2") {
Seq((1 -> "a")).toDF("i", "j").write.parquet(path.getCanonicalPath)
sql(s"CREATE TABLE t USING parquet LOCATION '${path.toURI}'")
sql("CREATE TEMPORARY VIEW tempView1 AS SELECT * FROM t")
sql("CACHE TABLE tempView2 AS SELECT i FROM tempView1")
checkAnswer(sql("SELECT * FROM tempView1"), Seq(Row(1, "a")))
checkAnswer(sql("SELECT * FROM tempView2"), Seq(Row(1)))
Utils.deleteRecursively(path)
sql("REFRESH TABLE tempView1")
checkAnswer(sql("SELECT * FROM tempView1"), Seq.empty)
checkAnswer(sql("SELECT * FROM tempView2"), Seq.empty)
}
}
}
}
test("SPARK-33290: querying temporary view after REFRESH TABLE fails with FNFE") {
withTable("t") {
withTempPath { path =>
withTempView("tempView1") {
Seq((1 -> "a")).toDF("i", "j").write.parquet(path.getCanonicalPath)
sql(s"CREATE TABLE t USING parquet LOCATION '${path.toURI}'")
sql("CREATE TEMPORARY VIEW tempView1 AS SELECT * FROM t")
checkAnswer(sql("SELECT * FROM tempView1"), Seq(Row(1, "a")))
Utils.deleteRecursively(path)
sql("REFRESH TABLE t")
checkAnswer(sql("SELECT * FROM t"), Seq.empty)
val exception = intercept[Exception] {
checkAnswer(sql("SELECT * FROM tempView1"), Seq.empty)
}
assert(exception.getMessage.contains("FileNotFoundException"))
assert(exception.getMessage.contains("REFRESH TABLE"))
}
}
}
}
} }