[SPARK-34168][SQL][FOLLOWUP] Improve DynamicPartitionPruningSuiteBase

### What changes were proposed in this pull request?

A few minor improvements for `DynamicPartitionPruningSuiteBase`.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #31625 from cloud-fan/followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Wenchen Fan 2021-02-23 13:41:24 -08:00 committed by Dongjoon Hyun
parent 0d5d248bdc
commit 95e45c6257

View file

@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expr
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, BroadcastQueryStageExec, DisableAdaptiveExecution, EliminateJoinToEmptyRelation}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.functions._
@ -44,14 +44,9 @@ abstract class DynamicPartitionPruningSuiteBase
import testImplicits._
val adaptiveExecutionOn: Boolean
override def beforeAll(): Unit = {
super.beforeAll()
spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, adaptiveExecutionOn)
spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY, true)
val factData = Seq[(Int, Int, Int, Int)](
(1000, 1, 1, 10),
(1010, 2, 1, 10),
@ -195,8 +190,8 @@ abstract class DynamicPartitionPruningSuiteBase
subqueryBroadcast.foreach { s =>
s.child match {
case _: ReusedExchangeExec => // reuse check ok.
case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) =>
case b: BroadcastExchangeExec =>
case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse check ok.
case b: BroadcastExchangeLike =>
val hasReuse = plan.find {
case ReusedExchangeExec(_, e) => e eq b
case _ => false
@ -337,7 +332,7 @@ abstract class DynamicPartitionPruningSuiteBase
def getFactScan(plan: SparkPlan): SparkPlan = {
val scanOption =
plan.find {
find(plan) {
case s: FileSourceScanExec =>
s.output.exists(_.find(_.argString(maxFields = 100).contains("fid")).isDefined)
case _ => false
@ -1261,7 +1256,7 @@ abstract class DynamicPartitionPruningSuiteBase
val countSubqueryBroadcasts =
collectWithSubqueries(plan)({ case _: SubqueryBroadcastExec => 1 }).sum
if (adaptiveExecutionOn) {
if (conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)) {
val countReusedSubqueryBroadcasts =
collectWithSubqueries(plan)({ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1}).sum
@ -1390,10 +1385,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
override val adaptiveExecutionOn: Boolean = false
}
class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase
with DisableAdaptiveExecutionSuite
class DynamicPartitionPruningSuiteAEOn extends DynamicPartitionPruningSuiteBase {
override val adaptiveExecutionOn: Boolean = true
}
class DynamicPartitionPruningSuiteAEOn extends DynamicPartitionPruningSuiteBase
with EnableAdaptiveExecutionSuite