[SPARK-32287][CORE] Fix flaky o.a.s.ExecutorAllocationManagerSuite on GithubActions

### What changes were proposed in this pull request?

To fix the flaky `ExecutorAllocationManagerSuite`: Avoid first `schedule()` invocation after `ExecutorAllocationManager` started.

### Why are the changes needed?

`ExecutorAllocationManagerSuite` is still flaky, see:

https://github.com/apache/spark/pull/29722/checks?check_run_id=1117979237

By checking the below logs, we can see that there's a race condition between thread `pool-1-thread-1-ScalaTest-running` and thread `spark-dynamic-executor-allocation`.  The only possibility of thread `spark-dynamic-executor-allocation` becoming active is the first time invocation of `schedule()`(since the `TEST_SCHEDULE_INTERVAL`(30s) is really long, so it's impossible the second invocation would happen).  Thus, I think we shall avoid the first invocation too.

```scala
20/09/15 12:41:20.831 pool-1-thread-1-ScalaTest-running-ExecutorAllocationManagerSuite INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2 for resource profile id: 0)
20/09/15 12:41:20.832 spark-dynamic-executor-allocation INFO ExecutorAllocationManager: Requesting 2 new executors because tasks are backlogged (new desired total will be 4 for resource profile id: 0)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The flaky can't be reproduced locally so it's hard to say it has been completely fixed by now. We need time to see the result.

Closes #29773 from Ngone51/fix-SPARK-32287.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yi.wu 2020-09-17 11:20:50 +00:00 committed by Wenchen Fan
parent e5e54a3614
commit a54a6a0113
3 changed files with 16 additions and 16 deletions

View file

@ -28,7 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.internal.config.Tests.TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
import org.apache.spark.resource.ResourceProfileManager
@ -150,11 +150,7 @@ private[spark] class ExecutorAllocationManager(
private var addTime: Long = NOT_SET
// Polling loop interval (ms)
private val intervalMillis: Long = if (Utils.isTesting) {
conf.get(TEST_SCHEDULE_INTERVAL)
} else {
100
}
private val intervalMillis: Long = 100
// Listener for Spark events that impact the allocation policy
val listener = new ExecutorAllocationListener
@ -247,7 +243,10 @@ private[spark] class ExecutorAllocationManager(
}
}
}
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
if (!testing || conf.get(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED)) {
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
// copy the maps inside synchonize to ensure not being modified
val (numExecutorsTarget, numLocalityAware) = synchronized {

View file

@ -26,11 +26,11 @@ private[spark] object Tests {
.longConf
.createWithDefault(Runtime.getRuntime.maxMemory)
val TEST_SCHEDULE_INTERVAL =
ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval")
.version("2.3.0")
.longConf
.createWithDefault(100)
val TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED =
ConfigBuilder("spark.testing.dynamicAllocation.schedule.enabled")
.version("3.1.0")
.booleanConf
.createWithDefault(true)
val IS_TESTING = ConfigBuilder("spark.testing")
.version("1.0.1")

View file

@ -28,7 +28,7 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.config
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.internal.config.Tests.TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.resource._
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
@ -1665,9 +1665,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
.set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, s"${executorIdleTimeout.toString}s")
.set(config.SHUFFLE_SERVICE_ENABLED, true)
.set(config.DYN_ALLOCATION_TESTING, true)
// SPARK-22864: effectively disable the allocation schedule by setting the period to a
// really long value.
.set(TEST_SCHEDULE_INTERVAL, 30000L)
// SPARK-22864/SPARK-32287: effectively disable the allocation schedule for the tests so that
// we won't result in the race condition between thread "spark-dynamic-executor-allocation"
// and thread "pool-1-thread-1-ScalaTest-running".
.set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
.set(DECOMMISSION_ENABLED, decommissioningEnabled)
sparkConf
}