[SPARK-33262][K8S][FOLLOWUP] Verify pod allocation does not stall

### What changes were proposed in this pull request?

Add a test that pending executor does not stall pod allocation.

### Why are the changes needed?

Better test coverage

### Does this PR introduce _any_ user-facing change?

Test only change.

### How was this patch tested?

New test passes.

Closes #30205 from holdenk/verify-pod-allocation-does-not-stall.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Holden Karau 2020-10-30 11:26:30 -07:00 committed by Dongjoon Hyun
parent 7c897c1216
commit 491a0fb08b

View file

@ -255,6 +255,40 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
verify(podOperations).delete()
}
test("SPARK-33262: pod allocator does not stall with pending pods") {
when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(podOperations)
when(podOperations
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
.thenReturn(labeledPods)
when(podOperations
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "3", "4", "5", "6"))
.thenReturn(podOperations)
podsAllocatorUnderTest.setTotalExpectedExecutors(6)
// Initial request of pods
verify(podOperations).create(podWithAttachedContainerForId(1))
verify(podOperations).create(podWithAttachedContainerForId(2))
verify(podOperations).create(podWithAttachedContainerForId(3))
verify(podOperations).create(podWithAttachedContainerForId(4))
verify(podOperations).create(podWithAttachedContainerForId(5))
// 4 come up, 1 pending
snapshotsStore.updatePod(pendingExecutor(1))
snapshotsStore.updatePod(runningExecutor(2))
snapshotsStore.updatePod(runningExecutor(3))
snapshotsStore.updatePod(runningExecutor(4))
snapshotsStore.updatePod(runningExecutor(5))
// We move forward one allocation cycle
waitForExecutorPodsClock.setTime(podAllocationDelay + 1)
snapshotsStore.notifySubscribers()
// We request pod 6
verify(podOperations).create(podWithAttachedContainerForId(6))
}
private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)