diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh index b72a4f7491..c87437e48f 100755 --- a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh +++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh @@ -35,6 +35,7 @@ CONTEXT= INCLUDE_TAGS="k8s" EXCLUDE_TAGS= JAVA_VERSION="8" +BUILD_DEPENDENCIES_MVN_FLAG="-am" HADOOP_PROFILE="hadoop-3.2" MVN="$TEST_ROOT_DIR/build/mvn" @@ -117,6 +118,9 @@ while (( "$#" )); do HADOOP_PROFILE="$2" shift ;; + --skip-building-dependencies) + BUILD_DEPENDENCIES_MVN_FLAG="" + ;; *) echo "Unexpected command line flag $2 $1." exit 1 @@ -176,4 +180,4 @@ properties+=( -Dlog4j.logger.org.apache.spark=DEBUG ) -$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -P$HADOOP_PROFILE -Pkubernetes -Pkubernetes-integration-tests ${properties[@]} +$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests $BUILD_DEPENDENCIES_MVN_FLAG -Pscala-$SCALA_VERSION -P$HADOOP_PROFILE -Pkubernetes -Pkubernetes-integration-tests ${properties[@]} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index c33875243c..5cb068545e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -16,9 +16,9 @@ */ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube -import java.nio.file.{Files, Paths} +import java.nio.file.Paths -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils import org.apache.spark.internal.Logging @@ -26,18 +26,26 @@ import org.apache.spark.internal.Logging // TODO support windows private[spark] object Minikube extends Logging { private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 - private val HOST_PREFIX = "host:" - private val KUBELET_PREFIX = "kubelet:" - private val APISERVER_PREFIX = "apiserver:" - private val KUBECTL_PREFIX = "kubectl:" - private val KUBECONFIG_PREFIX = "kubeconfig:" + private val VERSION_PREFIX = "minikube version: " + private val HOST_PREFIX = "host: " + private val KUBELET_PREFIX = "kubelet: " + private val APISERVER_PREFIX = "apiserver: " + private val KUBECTL_PREFIX = "kubectl: " + private val KUBECONFIG_PREFIX = "kubeconfig: " private val MINIKUBE_VM_PREFIX = "minikubeVM: " private val MINIKUBE_PREFIX = "minikube: " private val MINIKUBE_PATH = ".minikube" + private val APIVERSION_PREFIX = "apiVersion: " + private val SERVER_PREFIX = "server: " + private val CA_PREFIX = "certificate-authority: " + private val CLIENTCERT_PREFIX = "client-certificate: " + private val CLIENTKEY_PREFIX = "client-key: " - def logVersion(): Unit = { - logInfo(executeMinikube("version").mkString("\n")) - } + lazy val minikubeVersionString = + executeMinikube("version").find(_.contains(VERSION_PREFIX)).get + + def logVersion(): Unit = + logInfo(minikubeVersionString) def getMinikubeIp: String = { val outputs = executeMinikube("ip") @@ -56,60 +64,106 @@ private[spark] object Minikube extends Logging { if (oldMinikube.isEmpty) { getIfNewMinikubeStatus(statusString) } else { - val finalStatusString = oldMinikube - .head - .replaceFirst(MINIKUBE_VM_PREFIX, "") - .replaceFirst(MINIKUBE_PREFIX, "") + val statusLine = oldMinikube.head + val finalStatusString = if (statusLine.contains(MINIKUBE_VM_PREFIX)) { + statusLine.split(MINIKUBE_VM_PREFIX)(1) + } else { + statusLine.split(MINIKUBE_PREFIX)(1) + } MinikubeStatus.unapply(finalStatusString) .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) } } def getKubernetesClient: DefaultKubernetesClient = { + // only the three-part version number is matched (the optional suffix like "-beta.0" is dropped) + val versionArrayOpt = "\\d+\\.\\d+\\.\\d+".r + .findFirstIn(minikubeVersionString.split(VERSION_PREFIX)(1)) + .map(_.split('.').map(_.toInt)) + + assert(versionArrayOpt.isDefined && versionArrayOpt.get.size == 3, + s"Unexpected version format detected in `$minikubeVersionString`." + + "For minikube version a three-part version number is expected (the optional non-numeric " + + "suffix is intentionally dropped)") + + val kubernetesConf = versionArrayOpt.get match { + case Array(x, y, z) => + // comparing the versions as the kubectl command is only introduced in version v1.1.0: + // https://github.com/kubernetes/minikube/blob/v1.1.0/CHANGELOG.md + if (Ordering.Tuple3[Int, Int, Int].gteq((x, y, z), (1, 1, 0))) { + kubectlBasedKubernetesClientConf + } else { + legacyKubernetesClientConf + } + } + new DefaultKubernetesClient(kubernetesConf) + } + + private def legacyKubernetesClientConf: Config = { val kubernetesMaster = s"https://${getMinikubeIp}:8443" val userHome = System.getProperty("user.home") - val minikubeBasePath = Paths.get(userHome, MINIKUBE_PATH).toString - val profileDir = if (Files.exists(Paths.get(minikubeBasePath, "apiserver.crt"))) { - // For Minikube <1.9 - "" - } else { - // For Minikube >=1.9 - Paths.get("profiles", executeMinikube("profile")(0)).toString - } - val apiServerCertPath = Paths.get(minikubeBasePath, profileDir, "apiserver.crt") - val apiServerKeyPath = Paths.get(minikubeBasePath, profileDir, "apiserver.key") - val kubernetesConf = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(kubernetesMaster) - .withCaCertFile( - Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath) - .withClientCertFile(apiServerCertPath.toFile.getAbsolutePath) - .withClientKeyFile(apiServerKeyPath.toFile.getAbsolutePath) + buildKubernetesClientConf( + "v1", + kubernetesMaster, + Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath, + Paths.get(userHome, MINIKUBE_PATH, "apiserver.crt").toFile.getAbsolutePath, + Paths.get(userHome, MINIKUBE_PATH, "apiserver.key").toFile.getAbsolutePath) + } + + private def kubectlBasedKubernetesClientConf: Config = { + val outputs = executeMinikube("kubectl config view") + val apiVersionString = outputs.find(_.contains(APIVERSION_PREFIX)) + val serverString = outputs.find(_.contains(SERVER_PREFIX)) + val caString = outputs.find(_.contains(CA_PREFIX)) + val clientCertString = outputs.find(_.contains(CLIENTCERT_PREFIX)) + val clientKeyString = outputs.find(_.contains(CLIENTKEY_PREFIX)) + + assert(!apiVersionString.isEmpty && !serverString.isEmpty && !caString.isEmpty && + !clientKeyString.isEmpty && !clientKeyString.isEmpty, + "The output of 'minikube kubectl config view' does not contain all the neccesary attributes") + + buildKubernetesClientConf( + apiVersionString.get.split(APIVERSION_PREFIX)(1), + serverString.get.split(SERVER_PREFIX)(1), + caString.get.split(CA_PREFIX)(1), + clientCertString.get.split(CLIENTCERT_PREFIX)(1), + clientKeyString.get.split(CLIENTKEY_PREFIX)(1)) + } + + private def buildKubernetesClientConf(apiVersion: String, masterUrl: String, caCertFile: String, + clientCertFile: String, clientKeyFile: String): Config = { + logInfo(s"building kubernetes config with apiVersion: $apiVersion, masterUrl: $masterUrl, " + + s"caCertFile: $caCertFile, clientCertFile: $clientCertFile, clientKeyFile: $clientKeyFile") + new ConfigBuilder() + .withApiVersion(apiVersion) + .withMasterUrl(masterUrl) + .withCaCertFile(caCertFile) + .withClientCertFile(clientCertFile) + .withClientKeyFile(clientKeyFile) .build() - new DefaultKubernetesClient(kubernetesConf) } // Covers minikube status output after Minikube V0.30. private def getIfNewMinikubeStatus(statusString: Seq[String]): MinikubeStatus.Value = { - val hostString = statusString.find(_.contains(s"$HOST_PREFIX ")) - val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX ")) - val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX ")) - val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX ")) - val kubeconfigString = statusString.find(_.contains(s"$KUBECONFIG_PREFIX ")) + val hostString = statusString.find(_.contains(HOST_PREFIX)) + val kubeletString = statusString.find(_.contains(KUBELET_PREFIX)) + val apiserverString = statusString.find(_.contains(APISERVER_PREFIX)) + val kubectlString = statusString.find(_.contains(KUBECTL_PREFIX)) + val kubeconfigString = statusString.find(_.contains(KUBECONFIG_PREFIX)) val hasConfigStatus = kubectlString.isDefined || kubeconfigString.isDefined if (hostString.isEmpty || kubeletString.isEmpty || apiserverString.isEmpty || !hasConfigStatus) { MinikubeStatus.NONE } else { - val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "") - val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "") - val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "") + val status1 = hostString.get.split(HOST_PREFIX)(1) + val status2 = kubeletString.get.split(KUBELET_PREFIX)(1) + val status3 = apiserverString.get.split(APISERVER_PREFIX)(1) val isConfigured = if (kubectlString.isDefined) { - val cfgStatus = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "") + val cfgStatus = kubectlString.get.split(KUBECTL_PREFIX)(1) cfgStatus.contains("Correctly Configured:") } else { - kubeconfigString.get.replaceFirst(s"$KUBECONFIG_PREFIX ", "") == "Configured" + kubeconfigString.get.split(KUBECONFIG_PREFIX)(1) == "Configured" } if (isConfigured) { val stats = List(status1, status2, status3)