[SPARK-28937][SPARK-28936][KUBERNETES] Reduce test flakyness
### What changes were proposed in this pull request? Switch from using a Thread sleep for waiting for commands to finish to just waiting for the command to finish with a watcher & improve the error messages in the SecretsTestsSuite. ### Why are the changes needed? Currently some of the Spark Kubernetes tests have race conditions with command execution, and the frequent use of eventually makes debugging test failures difficult. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests pass after removal of thread.sleep Closes #25765 from holdenk/SPARK-28937SPARK-28936-improve-kubernetes-integration-tests. Authored-by: Holden Karau <hkarau@apple.com> Signed-off-by: Holden Karau <hkarau@apple.com>
This commit is contained in:
parent
42050c3f4f
commit
4080c4beeb
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.spark.deploy.k8s.integrationtest
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder}
|
||||
|
@ -57,11 +59,17 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
|
|||
createTestSecret()
|
||||
sparkAppConf
|
||||
.set(s"spark.kubernetes.driver.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH)
|
||||
.set(s"spark.kubernetes.driver.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username")
|
||||
.set(s"spark.kubernetes.driver.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password")
|
||||
.set(
|
||||
s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
|
||||
s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_1}")
|
||||
.set(
|
||||
s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
|
||||
s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_2}")
|
||||
.set(s"spark.kubernetes.executor.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH)
|
||||
.set(s"spark.kubernetes.executor.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username")
|
||||
.set(s"spark.kubernetes.executor.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password")
|
||||
.set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
|
||||
s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_1")
|
||||
.set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
|
||||
s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_2")
|
||||
try {
|
||||
runSparkPiAndVerifyCompletion(
|
||||
driverPodChecker = (driverPod: Pod) => {
|
||||
|
@ -81,19 +89,30 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
|
|||
}
|
||||
|
||||
private def checkSecrets(pod: Pod): Unit = {
|
||||
Eventually.eventually(TIMEOUT, INTERVAL) {
|
||||
implicit val podName: String = pod.getMetadata.getName
|
||||
implicit val components: KubernetesTestComponents = kubernetesTestComponents
|
||||
logDebug(s"Checking secrets for ${pod}")
|
||||
// Wait for the pod to become ready & have secrets provisioned
|
||||
implicit val podName: String = pod.getMetadata.getName
|
||||
implicit val components: KubernetesTestComponents = kubernetesTestComponents
|
||||
val env = Eventually.eventually(TIMEOUT, INTERVAL) {
|
||||
logDebug(s"Checking env of ${pod.getMetadata().getName()} ....")
|
||||
val env = Utils.executeCommand("env")
|
||||
assert(env.toString.contains(ENV_SECRET_VALUE_1))
|
||||
assert(env.toString.contains(ENV_SECRET_VALUE_2))
|
||||
val fileUsernameContents = Utils
|
||||
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
|
||||
val filePasswordContents = Utils
|
||||
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
|
||||
assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
|
||||
assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
|
||||
assert(!env.isEmpty)
|
||||
env
|
||||
}
|
||||
env.toString should include (s"${ENV_SECRET_KEY_1_CAP}=$ENV_SECRET_VALUE_1")
|
||||
env.toString should include (s"${ENV_SECRET_KEY_2_CAP}=$ENV_SECRET_VALUE_2")
|
||||
|
||||
// Make sure our secret files are mounted correctly
|
||||
val files = Utils.executeCommand("ls", s"$SECRET_MOUNT_PATH")
|
||||
files should include (ENV_SECRET_KEY_1)
|
||||
files should include (ENV_SECRET_KEY_2)
|
||||
// Validate the contents
|
||||
val fileUsernameContents = Utils
|
||||
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
|
||||
fileUsernameContents.toString.trim should equal(ENV_SECRET_VALUE_1)
|
||||
val filePasswordContents = Utils
|
||||
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
|
||||
filePasswordContents.toString.trim should equal(ENV_SECRET_VALUE_2)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,6 +121,8 @@ private[spark] object SecretsTestsSuite {
|
|||
val SECRET_MOUNT_PATH = "/etc/secret"
|
||||
val ENV_SECRET_KEY_1 = "username"
|
||||
val ENV_SECRET_KEY_2 = "password"
|
||||
val ENV_SECRET_KEY_1_CAP = ENV_SECRET_KEY_1.toUpperCase(Locale.ROOT)
|
||||
val ENV_SECRET_KEY_2_CAP = ENV_SECRET_KEY_2.toUpperCase(Locale.ROOT)
|
||||
val ENV_SECRET_VALUE_1 = "secretusername"
|
||||
val ENV_SECRET_VALUE_2 = "secretpassword"
|
||||
}
|
||||
|
|
|
@ -18,9 +18,12 @@ package org.apache.spark.deploy.k8s.integrationtest
|
|||
|
||||
import java.io.{Closeable, File, PrintWriter}
|
||||
import java.nio.file.{Files, Path}
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import io.fabric8.kubernetes.client.dsl.ExecListener
|
||||
import okhttp3.Response
|
||||
import org.apache.commons.io.output.ByteArrayOutputStream
|
||||
|
||||
import org.apache.spark.{SPARK_VERSION, SparkException}
|
||||
|
@ -45,20 +48,49 @@ object Utils extends Logging {
|
|||
implicit podName: String,
|
||||
kubernetesTestComponents: KubernetesTestComponents): String = {
|
||||
val out = new ByteArrayOutputStream()
|
||||
val watch = kubernetesTestComponents
|
||||
val pod = kubernetesTestComponents
|
||||
.kubernetesClient
|
||||
.pods()
|
||||
.withName(podName)
|
||||
// Avoid timing issues by looking for open/close
|
||||
class ReadyListener extends ExecListener {
|
||||
val openLatch: CountDownLatch = new CountDownLatch(1)
|
||||
val closeLatch: CountDownLatch = new CountDownLatch(1)
|
||||
|
||||
override def onOpen(response: Response) {
|
||||
openLatch.countDown()
|
||||
}
|
||||
|
||||
override def onClose(a: Int, b: String) {
|
||||
closeLatch.countDown()
|
||||
}
|
||||
|
||||
override def onFailure(e: Throwable, r: Response) {
|
||||
}
|
||||
|
||||
def waitForInputStreamToConnect(): Unit = {
|
||||
openLatch.await()
|
||||
}
|
||||
|
||||
def waitForClose(): Unit = {
|
||||
closeLatch.await()
|
||||
}
|
||||
}
|
||||
val listener = new ReadyListener()
|
||||
val watch = pod
|
||||
.readingInput(System.in)
|
||||
.writingOutput(out)
|
||||
.writingError(System.err)
|
||||
.withTTY()
|
||||
.usingListener(listener)
|
||||
.exec(cmd.toArray: _*)
|
||||
// wait to get some result back
|
||||
Thread.sleep(1000)
|
||||
// under load sometimes the stdout isn't connected by the time we try to read from it.
|
||||
listener.waitForInputStreamToConnect()
|
||||
listener.waitForClose()
|
||||
watch.close()
|
||||
out.flush()
|
||||
out.toString()
|
||||
val result = out.toString()
|
||||
result
|
||||
}
|
||||
|
||||
def createTempFile(contents: String, hostPath: String): String = {
|
||||
|
|
Loading…
Reference in a new issue