diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 00c446107d..398d7b4480 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -160,7 +160,22 @@ class CacheManager extends Logging { } // Re-compile dependent cached queries after removing the cached query. if (!cascade) { - recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false) + recacheByCondition(spark, cd => { + // If the cache buffer has already been loaded, we don't need to recompile the cached plan, + // as it does not rely on the plan that has been uncached anymore, it will just produce + // data from the cache buffer. + // Note that the `CachedRDDBuilder.isCachedColumnBuffersLoaded` call is a non-locking + // status test and may not return the most accurate cache buffer state. So the worse case + // scenario can be: + // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we + // will clear the buffer and re-compiled the plan. It is inefficient but doesn't affect + // correctness. + // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we + // will keep it as it is. It means the physical plan has been re-compiled already in the + // other thread. + val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded + cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded + }) } } @@ -168,38 +183,21 @@ class CacheManager extends Logging { * Tries to re-cache all the cache entries that refer to the given plan. */ def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = { - recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined) + recacheByCondition(spark, _.plan.find(_.sameResult(plan)).isDefined) } + /** + * Re-caches all the cache entries that satisfies the given `condition`. + */ private def recacheByCondition( spark: SparkSession, - condition: LogicalPlan => Boolean, - clearCache: Boolean = true): Unit = { + condition: CachedData => Boolean): Unit = { val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] writeLock { val it = cachedData.iterator() while (it.hasNext) { val cd = it.next() - // If `clearCache` is false (which means the recache request comes from a non-cascading - // cache invalidation) and the cache buffer has already been loaded, we do not need to - // re-compile a physical plan because the old plan will not be used any more by the - // CacheManager although it still lives in compiled `Dataset`s and it could still work. - // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer - // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache - // buffer is still empty, then we could have a more efficient new plan by removing - // dependency on the previously removed cache entries. - // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking - // status test and may not return the most accurate cache buffer state. So the worse case - // scenario can be: - // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we - // will clear the buffer and build a new plan. It is inefficient but doesn't affect - // correctness. - // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we - // will keep it as it is. It means the physical plan has been re-compiled already in the - // other thread. - val buildNewPlan = - clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded - if (condition(cd.plan) && buildNewPlan) { + if (condition(cd)) { needToRecache += cd // Remove the cache entry before we create a new one, so that we can have a different // physical plan. @@ -267,7 +265,7 @@ class CacheManager extends Logging { (fs, fs.makeQualified(path)) } - recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined) + recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined) } /**