[SPARK-27674][SQL] the hint should not be dropped after cache lookup
## What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/20365 . #20365 fixed this problem when the hint node is a root node. This PR fixes this problem for all the cases. ## How was this patch tested? a new test Closes #24580 from cloud-fan/bug. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
parent
02c33694c8
commit
3e30a98810
|
@ -56,7 +56,7 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
|
|||
* in this method will be cleaned up later by this rule, and may emit warnings depending on the
|
||||
* configurations.
|
||||
*/
|
||||
private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
|
||||
private[sql] def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
|
||||
plan match {
|
||||
case h: ResolvedHint =>
|
||||
val (plan, hints) = extractHintsFromPlan(h.child)
|
||||
|
|
|
@ -23,7 +23,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
|
|||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.{Dataset, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, SubqueryExpression}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
|
||||
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
|
||||
import org.apache.spark.sql.execution.columnar.InMemoryRelation
|
||||
import org.apache.spark.sql.execution.command.CommandUtils
|
||||
|
@ -212,17 +213,18 @@ class CacheManager extends Logging {
|
|||
def useCachedData(plan: LogicalPlan): LogicalPlan = {
|
||||
val newPlan = plan transformDown {
|
||||
case command: IgnoreCachedData => command
|
||||
// Do not lookup the cache by hint node. Hint node is special, we should ignore it when
|
||||
// canonicalizing plans, so that plans which are same except hint can hit the same cache.
|
||||
// However, we also want to keep the hint info after cache lookup. Here we skip the hint
|
||||
// node, so that the returned caching plan won't replace the hint node and drop the hint info
|
||||
// from the original plan.
|
||||
case hint: ResolvedHint => hint
|
||||
|
||||
case currentFragment =>
|
||||
lookupCachedData(currentFragment)
|
||||
.map(_.cachedRepresentation.withOutput(currentFragment.output))
|
||||
.getOrElse(currentFragment)
|
||||
lookupCachedData(currentFragment).map { cached =>
|
||||
// After cache lookup, we should still keep the hints from the input plan.
|
||||
val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2
|
||||
val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output)
|
||||
// The returned hint list is in top-down order, we should create the hint nodes from
|
||||
// right to left.
|
||||
hints.foldRight[LogicalPlan](cachedPlan) { case (hint, p) =>
|
||||
ResolvedHint(p, hint)
|
||||
}
|
||||
}.getOrElse(currentFragment)
|
||||
}
|
||||
|
||||
newPlan transformAllExpressions {
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
|
|||
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
|
||||
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
|
||||
import org.apache.spark.sql.execution.columnar._
|
||||
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
|
||||
|
@ -938,23 +938,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
|
|||
}
|
||||
}
|
||||
|
||||
test("Cache should respect the broadcast hint") {
|
||||
val df = broadcast(spark.range(1000)).cache()
|
||||
val df2 = spark.range(1000).cache()
|
||||
df.count()
|
||||
df2.count()
|
||||
test("Cache should respect the hint") {
|
||||
def testHint(df: Dataset[_], expectedHint: JoinStrategyHint): Unit = {
|
||||
val df2 = spark.range(2000).cache()
|
||||
df2.count()
|
||||
|
||||
// Test the broadcast hint.
|
||||
val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
|
||||
val hint = joinPlan.collect {
|
||||
case Join(_, _, _, _, hint) => hint
|
||||
def checkHintExists(): Unit = {
|
||||
// Test the broadcast hint.
|
||||
val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
|
||||
val joinHints = joinPlan.collect {
|
||||
case Join(_, _, _, _, hint) => hint
|
||||
}
|
||||
assert(joinHints.size == 1)
|
||||
assert(joinHints(0).leftHint.get.strategy.contains(expectedHint))
|
||||
assert(joinHints(0).rightHint.isEmpty)
|
||||
}
|
||||
|
||||
// Make sure the hint does exist when `df` is not cached.
|
||||
checkHintExists()
|
||||
|
||||
df.cache()
|
||||
try {
|
||||
df.count()
|
||||
// Make sure the hint still exists when `df` is cached.
|
||||
checkHintExists()
|
||||
} finally {
|
||||
// Clean-up
|
||||
df.unpersist()
|
||||
}
|
||||
}
|
||||
assert(hint.size == 1)
|
||||
assert(hint(0).leftHint.get.strategy.contains(BROADCAST))
|
||||
assert(hint(0).rightHint.isEmpty)
|
||||
|
||||
// Clean-up
|
||||
df.unpersist()
|
||||
// The hint is the root node
|
||||
testHint(broadcast(spark.range(1000)), BROADCAST)
|
||||
// The hint is under subquery alias
|
||||
testHint(broadcast(spark.range(1000)).as("df"), BROADCAST)
|
||||
// The hint is under filter
|
||||
testHint(broadcast(spark.range(1000)).filter($"id" > 100), BROADCAST)
|
||||
// If there are 2 adjacent hints, the top one takes effect.
|
||||
testHint(
|
||||
spark.range(1000)
|
||||
.hint("SHUFFLE_MERGE")
|
||||
.hint("SHUFFLE_HASH")
|
||||
.as("df"),
|
||||
SHUFFLE_HASH)
|
||||
}
|
||||
|
||||
test("analyzes column statistics in cached query") {
|
||||
|
|
Loading…
Reference in a new issue