[SPARK-33197][SQL] Make changes to spark.sql.analyzer.maxIterations take effect at runtime
### What changes were proposed in this pull request? Make changes to `spark.sql.analyzer.maxIterations` take effect at runtime. ### Why are the changes needed? `spark.sql.analyzer.maxIterations` is not a static conf. However, before this patch, changing `spark.sql.analyzer.maxIterations` at runtime does not take effect. ### Does this PR introduce _any_ user-facing change? Yes. Before this patch, changing `spark.sql.analyzer.maxIterations` at runtime does not take effect. ### How was this patch tested? modified unit test Closes #30108 from yuningzh-db/dynamic-analyzer-max-iterations. Authored-by: Yuning Zhang <yuning.zhang@databricks.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
parent
d87a0bb2ca
commit
a21945ce6c
|
@ -132,8 +132,7 @@ object AnalysisContext {
|
|||
*/
|
||||
class Analyzer(
|
||||
override val catalogManager: CatalogManager,
|
||||
conf: SQLConf,
|
||||
maxIterations: Int)
|
||||
conf: SQLConf)
|
||||
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
|
||||
|
||||
private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
|
||||
|
@ -148,12 +147,7 @@ class Analyzer(
|
|||
def this(catalog: SessionCatalog, conf: SQLConf) = {
|
||||
this(
|
||||
new CatalogManager(conf, FakeV2SessionCatalog, catalog),
|
||||
conf,
|
||||
conf.analyzerMaxIterations)
|
||||
}
|
||||
|
||||
def this(catalogManager: CatalogManager, conf: SQLConf) = {
|
||||
this(catalogManager, conf, conf.analyzerMaxIterations)
|
||||
conf)
|
||||
}
|
||||
|
||||
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
|
||||
|
@ -188,9 +182,9 @@ class Analyzer(
|
|||
* If the plan cannot be resolved within maxIterations, analyzer will throw exception to inform
|
||||
* user to increase the value of SQLConf.ANALYZER_MAX_ITERATIONS.
|
||||
*/
|
||||
protected val fixedPoint =
|
||||
protected def fixedPoint =
|
||||
FixedPoint(
|
||||
maxIterations,
|
||||
conf.analyzerMaxIterations,
|
||||
errorOnExceed = true,
|
||||
maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)
|
||||
|
||||
|
@ -206,7 +200,7 @@ class Analyzer(
|
|||
*/
|
||||
val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
|
||||
|
||||
lazy val batches: Seq[Batch] = Seq(
|
||||
override def batches: Seq[Batch] = Seq(
|
||||
Batch("Substitution", fixedPoint,
|
||||
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
|
||||
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
|
||||
|
|
|
@ -926,4 +926,45 @@ class AnalysisSuite extends AnalysisTest with Matchers {
|
|||
)
|
||||
assertAnalysisSuccess(plan)
|
||||
}
|
||||
|
||||
test("SPARK-33197: Make sure changes to ANALYZER_MAX_ITERATIONS take effect at runtime") {
|
||||
// RuleExecutor only throw exception or log warning when the rule is supposed to run
|
||||
// more than once.
|
||||
val maxIterations = 2
|
||||
val maxIterationsEnough = 5
|
||||
withSQLConf(SQLConf.ANALYZER_MAX_ITERATIONS.key -> maxIterations.toString) {
|
||||
val conf = SQLConf.get
|
||||
val testAnalyzer = new Analyzer(
|
||||
new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf)
|
||||
|
||||
val plan = testRelation2.select(
|
||||
$"a" / Literal(2) as "div1",
|
||||
$"a" / $"b" as "div2",
|
||||
$"a" / $"c" as "div3",
|
||||
$"a" / $"d" as "div4",
|
||||
$"e" / $"e" as "div5")
|
||||
|
||||
val message1 = intercept[TreeNodeException[LogicalPlan]] {
|
||||
testAnalyzer.execute(plan)
|
||||
}.getMessage
|
||||
assert(message1.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " +
|
||||
s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value."))
|
||||
|
||||
withSQLConf(SQLConf.ANALYZER_MAX_ITERATIONS.key -> maxIterationsEnough.toString) {
|
||||
try {
|
||||
testAnalyzer.execute(plan)
|
||||
} catch {
|
||||
case ex: TreeNodeException[_]
|
||||
if ex.getMessage.contains(SQLConf.ANALYZER_MAX_ITERATIONS.key) =>
|
||||
fail("analyzer.execute should not reach max iterations.")
|
||||
}
|
||||
}
|
||||
|
||||
val message2 = intercept[TreeNodeException[LogicalPlan]] {
|
||||
testAnalyzer.execute(plan)
|
||||
}.getMessage
|
||||
assert(message2.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " +
|
||||
s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value."))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue