[SPARK-35595][TESTS] Support multiple loggers in testing method withLogAppender
### What changes were proposed in this pull request? A test case of AdaptiveQueryExecSuite becomes flaky since there are too many debug logs in RootLogger: https://github.com/Yikun/spark/runs/2715222392?check_suite_focus=true https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139125/testReport/ To fix it, I suggest supporting multiple loggers in the testing method withLogAppender. So that the LogAppender gets clean target log outputs. ### Why are the changes needed? Fix a flaky test case. Also, reduce unnecessary memory cost in tests. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #32725 from gengliangwang/fixFlakyLogAppender. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
This commit is contained in:
parent
0ad5ae54b2
commit
9d0d4edb43
|
@ -217,19 +217,27 @@ abstract class SparkFunSuite
|
||||||
*/
|
*/
|
||||||
protected def withLogAppender(
|
protected def withLogAppender(
|
||||||
appender: Appender,
|
appender: Appender,
|
||||||
loggerName: Option[String] = None,
|
loggerNames: Seq[String] = Seq.empty,
|
||||||
level: Option[Level] = None)(
|
level: Option[Level] = None)(
|
||||||
f: => Unit): Unit = {
|
f: => Unit): Unit = {
|
||||||
val logger = loggerName.map(Logger.getLogger).getOrElse(Logger.getRootLogger)
|
val loggers = if (loggerNames.nonEmpty) {
|
||||||
val restoreLevel = logger.getLevel
|
loggerNames.map(Logger.getLogger)
|
||||||
logger.addAppender(appender)
|
} else {
|
||||||
if (level.isDefined) {
|
Seq(Logger.getRootLogger)
|
||||||
logger.setLevel(level.get)
|
}
|
||||||
|
val restoreLevels = loggers.map(_.getLevel)
|
||||||
|
loggers.foreach { logger =>
|
||||||
|
logger.addAppender(appender)
|
||||||
|
if (level.isDefined) {
|
||||||
|
logger.setLevel(level.get)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
try f finally {
|
try f finally {
|
||||||
logger.removeAppender(appender)
|
loggers.foreach(_.removeAppender(appender))
|
||||||
if (level.isDefined) {
|
if (level.isDefined) {
|
||||||
logger.setLevel(restoreLevel)
|
loggers.zipWithIndex.foreach { case (logger, i) =>
|
||||||
|
logger.setLevel(restoreLevels(i))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -516,7 +516,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
|
||||||
|
|
||||||
test("SPARK-25113: should log when there exists generated methods above HugeMethodLimit") {
|
test("SPARK-25113: should log when there exists generated methods above HugeMethodLimit") {
|
||||||
val appender = new LogAppender("huge method limit")
|
val appender = new LogAppender("huge method limit")
|
||||||
withLogAppender(appender, loggerName = Some(classOf[CodeGenerator[_, _]].getName)) {
|
withLogAppender(appender, loggerNames = Seq(classOf[CodeGenerator[_, _]].getName)) {
|
||||||
val x = 42
|
val x = 42
|
||||||
val expr = HugeCodeIntExpression(x)
|
val expr = HugeCodeIntExpression(x)
|
||||||
val proj = GenerateUnsafeProjection.generate(Seq(expr))
|
val proj = GenerateUnsafeProjection.generate(Seq(expr))
|
||||||
|
|
|
@ -823,7 +823,7 @@ class AdaptiveQueryExecSuite
|
||||||
val logAppender = new LogAppender("adaptive execution")
|
val logAppender = new LogAppender("adaptive execution")
|
||||||
withLogAppender(
|
withLogAppender(
|
||||||
logAppender,
|
logAppender,
|
||||||
loggerName = Some(AdaptiveSparkPlanExec.getClass.getName.dropRight(1)),
|
loggerNames = Seq(AdaptiveSparkPlanExec.getClass.getName.dropRight(1)),
|
||||||
level = Some(Level.TRACE)) {
|
level = Some(Level.TRACE)) {
|
||||||
withSQLConf(
|
withSQLConf(
|
||||||
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
|
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
|
||||||
|
@ -1613,7 +1613,9 @@ class AdaptiveQueryExecSuite
|
||||||
val testDf = df.groupBy("index")
|
val testDf = df.groupBy("index")
|
||||||
.agg(sum($"pv").alias("pv"))
|
.agg(sum($"pv").alias("pv"))
|
||||||
.join(dim, Seq("index"))
|
.join(dim, Seq("index"))
|
||||||
withLogAppender(testAppender, level = Some(Level.DEBUG)) {
|
val loggerNames =
|
||||||
|
Seq(classOf[BroadcastQueryStageExec].getName, classOf[ShuffleQueryStageExec].getName)
|
||||||
|
withLogAppender(testAppender, loggerNames, level = Some(Level.DEBUG)) {
|
||||||
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
|
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
|
||||||
val result = testDf.collect()
|
val result = testDf.collect()
|
||||||
assert(result.length == 26)
|
assert(result.length == 26)
|
||||||
|
|
Loading…
Reference in a new issue