[SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade uncache in the uncache method
## What changes were proposed in this pull request? This is a follow up of https://github.com/apache/spark/pull/23644/files , to make these methods less coupled with each other. ## How was this patch tested? existing tests Closes #23687 from cloud-fan/cache. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
ae5b2a6a92
commit
d8d2736fd1
|
@ -160,7 +160,22 @@ class CacheManager extends Logging {
|
|||
}
|
||||
// Re-compile dependent cached queries after removing the cached query.
|
||||
if (!cascade) {
|
||||
recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false)
|
||||
recacheByCondition(spark, cd => {
|
||||
// If the cache buffer has already been loaded, we don't need to recompile the cached plan,
|
||||
// as it does not rely on the plan that has been uncached anymore, it will just produce
|
||||
// data from the cache buffer.
|
||||
// Note that the `CachedRDDBuilder.isCachedColumnBuffersLoaded` call is a non-locking
|
||||
// status test and may not return the most accurate cache buffer state. So the worse case
|
||||
// scenario can be:
|
||||
// 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
|
||||
// will clear the buffer and re-compiled the plan. It is inefficient but doesn't affect
|
||||
// correctness.
|
||||
// 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
|
||||
// will keep it as it is. It means the physical plan has been re-compiled already in the
|
||||
// other thread.
|
||||
val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
|
||||
cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,38 +183,21 @@ class CacheManager extends Logging {
|
|||
* Tries to re-cache all the cache entries that refer to the given plan.
|
||||
*/
|
||||
def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
|
||||
recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined)
|
||||
recacheByCondition(spark, _.plan.find(_.sameResult(plan)).isDefined)
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-caches all the cache entries that satisfies the given `condition`.
|
||||
*/
|
||||
private def recacheByCondition(
|
||||
spark: SparkSession,
|
||||
condition: LogicalPlan => Boolean,
|
||||
clearCache: Boolean = true): Unit = {
|
||||
condition: CachedData => Boolean): Unit = {
|
||||
val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
|
||||
writeLock {
|
||||
val it = cachedData.iterator()
|
||||
while (it.hasNext) {
|
||||
val cd = it.next()
|
||||
// If `clearCache` is false (which means the recache request comes from a non-cascading
|
||||
// cache invalidation) and the cache buffer has already been loaded, we do not need to
|
||||
// re-compile a physical plan because the old plan will not be used any more by the
|
||||
// CacheManager although it still lives in compiled `Dataset`s and it could still work.
|
||||
// Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
|
||||
// and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
|
||||
// buffer is still empty, then we could have a more efficient new plan by removing
|
||||
// dependency on the previously removed cache entries.
|
||||
// Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
|
||||
// status test and may not return the most accurate cache buffer state. So the worse case
|
||||
// scenario can be:
|
||||
// 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
|
||||
// will clear the buffer and build a new plan. It is inefficient but doesn't affect
|
||||
// correctness.
|
||||
// 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
|
||||
// will keep it as it is. It means the physical plan has been re-compiled already in the
|
||||
// other thread.
|
||||
val buildNewPlan =
|
||||
clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
|
||||
if (condition(cd.plan) && buildNewPlan) {
|
||||
if (condition(cd)) {
|
||||
needToRecache += cd
|
||||
// Remove the cache entry before we create a new one, so that we can have a different
|
||||
// physical plan.
|
||||
|
@ -267,7 +265,7 @@ class CacheManager extends Logging {
|
|||
(fs, fs.makeQualified(path))
|
||||
}
|
||||
|
||||
recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
|
||||
recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in a new issue