[SPARK-36246][CORE][TEST] GHA WorkerDecommissionExtended flake

### What changes were proposed in this pull request?

GHA probably doesn't have the same resources as jenkins so move down from 5 to 3 execs and give a bit more time for them to come up.

### Why are the changes needed?

Test is timing out in GHA

### Does this PR introduce _any_ user-facing change?
No, test only change.

### How was this patch tested?

Run through GHA verify no OOM during WorkerDecommissionExtended

Closes #33467 from holdenk/SPARK-36246-WorkerDecommissionExtendedSuite-flakes-in-GHA.

Lead-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: Holden Karau <hkarau@netflix.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Holden Karau 2021-07-22 15:17:48 +09:00 committed by Hyukjin Kwon
parent dcc0aaa3ef
commit 89a83196ac

View file

@ -31,17 +31,17 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext {
private val conf = new org.apache.spark.SparkConf()
.setAppName(getClass.getName)
.set(SPARK_MASTER, "local-cluster[5,1,512]")
.set(EXECUTOR_MEMORY, "512m")
.set(SPARK_MASTER, "local-cluster[3,1,384]")
.set(EXECUTOR_MEMORY, "384m")
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 3)
.set(DECOMMISSION_ENABLED, true)
test("Worker decommission and executor idle timeout") {
sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s"))
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 5, 60000)
TestUtils.waitUntilExecutorsUp(sc, 3, 80000)
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.map(x => (1, x))
val rdd3 = rdd2.reduceByKey(_ + _)
@ -53,10 +53,10 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte
}
}
test("Decommission 4 executors from 5 executors in total") {
test("Decommission 2 executors from 3 executors in total") {
sc = new SparkContext(conf)
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 5, 60000)
TestUtils.waitUntilExecutorsUp(sc, 3, 80000)
val rdd1 = sc.parallelize(1 to 100000, 200)
val rdd2 = rdd1.map(x => (x % 100, x))
val rdd3 = rdd2.reduceByKey(_ + _)