[SPARK-31062][K8S][TESTS] Improve spark decommissioning k8s test reliability

### What changes were proposed in this pull request?

Replace a sleep with waiting for the first collect to happen to try and make the K8s test code more reliable.

### Why are the changes needed?

Currently the Decommissioning test appears to be flaky in Jenkins.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Ran K8s test suite in a loop on minikube on my desktop for 10 iterations without this test failing on any of the runs.

Closes #27858 from holdenk/SPARK-31062-Improve-Spark-Decommissioning-K8s-test-teliability.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
This commit is contained in:
Holden Karau 2020-03-11 14:42:31 -07:00
parent a1a665bece
commit 2825237448
2 changed files with 20 additions and 25 deletions

View file

@ -32,8 +32,9 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
runSparkApplicationAndVerifyCompletion( runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_DECOMISSIONING, appResource = PYSPARK_DECOMISSIONING,
mainClass = "", mainClass = "",
expectedLogOnCompletion = Seq("decommissioning executor", expectedLogOnCompletion = Seq(
"Finished waiting, stopping Spark"), "Finished waiting, stopping Spark",
"decommissioning executor"),
appArgs = Array.empty[String], appArgs = Array.empty[String],
driverPodChecker = doBasicDriverPyPodCheck, driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck, executorPodChecker = doBasicExecutorPyPodCheck,

View file

@ -298,6 +298,7 @@ class KubernetesSuite extends SparkFunSuite
.headOption.getOrElse(false) .headOption.getOrElse(false)
result result
} }
val execWatcher = kubernetesTestComponents.kubernetesClient val execWatcher = kubernetesTestComponents.kubernetesClient
.pods() .pods()
.withLabel("spark-app-locator", appLocator) .withLabel("spark-app-locator", appLocator)
@ -316,17 +317,25 @@ class KubernetesSuite extends SparkFunSuite
logDebug(s"Add event received for $name.") logDebug(s"Add event received for $name.")
execPods(name) = resource execPods(name) = resource
// If testing decommissioning start a thread to simulate // If testing decommissioning start a thread to simulate
// decommissioning. // decommissioning on the first exec pod.
if (decommissioningTest && execPods.size == 1) { if (decommissioningTest && execPods.size == 1) {
// Wait for all the containers in the pod to be running // Wait for all the containers in the pod to be running
logDebug("Waiting for first pod to become OK prior to deletion") logDebug("Waiting for pod to become OK prior to deletion")
Eventually.eventually(patienceTimeout, patienceInterval) { Eventually.eventually(patienceTimeout, patienceInterval) {
val result = checkPodReady(namespace, name) val result = checkPodReady(namespace, name)
result shouldBe (true) result shouldBe (true)
} }
// Sleep a small interval to allow execution of job // Look for the string that indicates we're good to clean up
logDebug("Sleeping before killing pod.") // on the driver
Thread.sleep(2000) logDebug("Waiting for first collect...")
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPodName)
.getLog
.contains("Waiting to give nodes time to finish."),
"Decommission test did not complete first collect.")
}
// Delete the pod to simulate cluster scale down/migration. // Delete the pod to simulate cluster scale down/migration.
val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name)
pod.delete() pod.delete()
@ -361,23 +370,6 @@ class KubernetesSuite extends SparkFunSuite
Eventually.eventually(patienceTimeout, patienceInterval) { Eventually.eventually(patienceTimeout, patienceInterval) {
execPods.values.nonEmpty should be (true) execPods.values.nonEmpty should be (true)
} }
// If decommissioning we need to wait and check the executors were removed
if (decommissioningTest) {
// Sleep a small interval to ensure everything is registered.
Thread.sleep(100)
// Wait for the executors to become ready
Eventually.eventually(patienceTimeout, patienceInterval) {
val anyReadyPods = ! execPods.map{
case (name, resource) =>
(name, resource.getMetadata().getNamespace())
}.filter{
case (name, namespace) => checkPodReady(namespace, name)
}.isEmpty
val podsEmpty = execPods.values.isEmpty
val podsReadyOrDead = anyReadyPods || podsEmpty
podsReadyOrDead shouldBe (true)
}
}
execWatcher.close() execWatcher.close()
execPods.values.foreach(executorPodChecker(_)) execPods.values.foreach(executorPodChecker(_))
Eventually.eventually(patienceTimeout, patienceInterval) { Eventually.eventually(patienceTimeout, patienceInterval) {
@ -386,10 +378,12 @@ class KubernetesSuite extends SparkFunSuite
.pods() .pods()
.withName(driverPod.getMetadata.getName) .withName(driverPod.getMetadata.getName)
.getLog .getLog
.contains(e), "The application did not complete.") .contains(e),
s"The application did not complete, did not find str ${e}")
} }
} }
} }
protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === image) assert(driverPod.getSpec.getContainers.get(0).getImage === image)