[SPARK-32100][CORE][TESTS][FOLLOWUP] Reduce the required test resources

### What changes were proposed in this pull request?

This PR aims to reduce the required test resources in WorkerDecommissionExtendedSuite.

### Why are the changes needed?

When Jenkins farms is crowded, the following failure happens currently [here](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2-hive-2.3/890/testReport/junit/org.apache.spark.scheduler/WorkerDecommissionExtendedSuite/Worker_decommission_and_executor_idle_timeout/)
```
java.util.concurrent.TimeoutException: Can't find 20 executors before 60000 milliseconds elapsed
	at org.apache.spark.TestUtils$.waitUntilExecutorsUp(TestUtils.scala:326)
	at org.apache.spark.scheduler.WorkerDecommissionExtendedSuite.$anonfun$new$2(WorkerDecommissionExtendedSuite.scala:45)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the Jenkins.

Closes #29001 from dongjoon-hyun/SPARK-32100-2.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Dongjoon Hyun 2020-07-05 20:12:41 -07:00
parent 0e33b5ecde
commit dea7bc464d

View file

@ -32,17 +32,17 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext {
private val conf = new org.apache.spark.SparkConf()
.setAppName(getClass.getName)
.set(SPARK_MASTER, "local-cluster[20,1,512]")
.set(SPARK_MASTER, "local-cluster[5,1,512]")
.set(EXECUTOR_MEMORY, "512m")
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 20)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5)
.set(WORKER_DECOMMISSION_ENABLED, true)
test("Worker decommission and executor idle timeout") {
sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s"))
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 20, 60000)
TestUtils.waitUntilExecutorsUp(sc, 5, 60000)
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.map(x => (1, x))
val rdd3 = rdd2.reduceByKey(_ + _)
@ -54,10 +54,10 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte
}
}
test("Decommission 19 executors from 20 executors in total") {
test("Decommission 4 executors from 5 executors in total") {
sc = new SparkContext(conf)
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 20, 60000)
TestUtils.waitUntilExecutorsUp(sc, 5, 60000)
val rdd1 = sc.parallelize(1 to 100000, 200)
val rdd2 = rdd1.map(x => (x % 100, x))
val rdd3 = rdd2.reduceByKey(_ + _)