[SPARK-32617][K8S][TESTS] Configure kubernetes client based on kubeconfig settings in kubernetes integration tests

### What changes were proposed in this pull request?

From [minikube version v1.1.0](https://github.com/kubernetes/minikube/blob/v1.1.0/CHANGELOG.md) kubectl is available as a command. So the kubeconfig settings can be accessed like:

```
$ minikube kubectl config view
apiVersion: v1
clusters:
- cluster:
    certificate-authority: /Users/attilazsoltpiros/.minikube/ca.crt
    server: https://127.0.0.1:32788
  name: minikube
contexts:
- context:
    cluster: minikube
    namespace: default
    user: minikube
  name: minikube
current-context: minikube
kind: Config
preferences: {}
users:
- name: minikube
  user:
    client-certificate: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.crt
    client-key: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.key
```

Here the vm-driver was docker and the server port (https://127.0.0.1:32788) is different from the hardcoded 8443.

So the main part of this PR is introducing kubernetes client configuration based on the kubeconfig (output of `minikube kubectl config view`) in case of minikube versions after v1.1.0 and the old legacy way of configuration is also kept as minikube version should be supported back to v0.34.1 .

Moreover as the old style of config parsing pattern wasn't sufficient in my case as when the `minikube kubectl config view` is called kubectl downloading message might be included before the first key I changed it even for the existent keys to be a consistent pattern in this file.

The old parsing in an example:
```
private val HOST_PREFIX = "host:"

val hostString = statusString.find(_.contains(s"$HOST_PREFIX "))

val status1 = hostString.get.split(HOST_PREFIX)(1)
```

The new parsing:
```
private val HOST_PREFIX = "host: "

val hostString = statusString.find(_.contains(HOST_PREFIX))

hostString.get.split(HOST_PREFIX)(1)
```

So the PREFIX is extended with the extra space at the declaration (this way the two separate string operation are more safe and consistent with each other) and the replace is changed to split and getting the 2nd string from the result (which is guaranteed to contain only the text after the PREFIX when the PREFIX is a contained substring).

Finally there is tiny change in `dev-run-integration-tests.sh` to introduce `--skip-building-dependencies` which switchs off building of maven dependencies of `kubernetes-integration-tests` from the Spark project.
This could be used when only the `kubernetes-integration-tests` should be rebuilded as only the tests are modified.

### Why are the changes needed?

Kubernetes client configuration based on kubeconfig settings is more reliable and provides a solution which is minikube version independent.

### Does this PR introduce _any_ user-facing change?

No. This is only test code.

### How was this patch tested?

tested manually on two minikube versions.

Minikube  v0.34.1:

```
$ minikube version
minikube version: v0.34.1

$ grep "version\|building" resource-managers/kubernetes/integration-tests/target/integration-tests.log
20/12/12 12:52:25.135 ScalaTest-main-running-DiscoverySuite INFO Minikube: minikube version: v0.34.1
20/12/12 12:52:25.761 ScalaTest-main-running-DiscoverySuite INFO Minikube: building kubernetes config with apiVersion: v1, masterUrl: https://192.168.99.103:8443, caCertFile: /Users/attilazsoltpiros/.minikube/ca.crt, clientCertFile: /Users/attilazsoltpiros/.minikube/apiserver.crt, clientKeyFile: /Users/attilazsoltpiros/.minikube/apiserver.key
```

Minikube v1.15.1
```
$ minikube version

minikube version: v1.15.1
commit: 23f40a012abb52eff365ff99a709501a61ac5876

$ grep "version\|building" resource-managers/kubernetes/integration-tests/target/integration-tests.log

20/12/13 06:25:55.086 ScalaTest-main-running-DiscoverySuite INFO Minikube: minikube version: v1.15.1
20/12/13 06:25:55.597 ScalaTest-main-running-DiscoverySuite INFO Minikube: building kubernetes config with apiVersion: v1, masterUrl: https://192.168.64.4:8443, caCertFile: /Users/attilazsoltpiros/.minikube/ca.crt, clientCertFile: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.crt, clientKeyFile: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.key

$ minikube kubectl config view
apiVersion: v1
clusters:
- cluster:
    certificate-authority: /Users/attilazsoltpiros/.minikube/ca.crt
    server: https://192.168.64.4:8443
  name: minikube
contexts:
- context:
    cluster: minikube
    namespace: default
    user: minikube
  name: minikube
current-context: minikube
kind: Config
preferences: {}
users:
- name: minikube
  user:
    client-certificate: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.crt
    client-key: /Users/attilazsoltpiros/.minikube/profiles/minikube/client.key
```

Closes #30751 from attilapiros/SPARK-32617.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
This commit is contained in:
“attilapiros” 2021-02-24 11:46:27 -08:00 committed by Holden Karau
parent 999d3b89b6
commit b17754a8cb
2 changed files with 101 additions and 43 deletions

View file

@ -35,6 +35,7 @@ CONTEXT=
INCLUDE_TAGS="k8s"
EXCLUDE_TAGS=
JAVA_VERSION="8"
BUILD_DEPENDENCIES_MVN_FLAG="-am"
HADOOP_PROFILE="hadoop-3.2"
MVN="$TEST_ROOT_DIR/build/mvn"
@ -117,6 +118,9 @@ while (( "$#" )); do
HADOOP_PROFILE="$2"
shift
;;
--skip-building-dependencies)
BUILD_DEPENDENCIES_MVN_FLAG=""
;;
*)
echo "Unexpected command line flag $2 $1."
exit 1
@ -176,4 +180,4 @@ properties+=(
-Dlog4j.logger.org.apache.spark=DEBUG
)
$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -P$HADOOP_PROFILE -Pkubernetes -Pkubernetes-integration-tests ${properties[@]}
$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests $BUILD_DEPENDENCIES_MVN_FLAG -Pscala-$SCALA_VERSION -P$HADOOP_PROFILE -Pkubernetes -Pkubernetes-integration-tests ${properties[@]}

View file

@ -16,9 +16,9 @@
*/
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
import java.nio.file.{Files, Paths}
import java.nio.file.Paths
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient}
import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils
import org.apache.spark.internal.Logging
@ -26,18 +26,26 @@ import org.apache.spark.internal.Logging
// TODO support windows
private[spark] object Minikube extends Logging {
private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
private val HOST_PREFIX = "host:"
private val KUBELET_PREFIX = "kubelet:"
private val APISERVER_PREFIX = "apiserver:"
private val KUBECTL_PREFIX = "kubectl:"
private val KUBECONFIG_PREFIX = "kubeconfig:"
private val VERSION_PREFIX = "minikube version: "
private val HOST_PREFIX = "host: "
private val KUBELET_PREFIX = "kubelet: "
private val APISERVER_PREFIX = "apiserver: "
private val KUBECTL_PREFIX = "kubectl: "
private val KUBECONFIG_PREFIX = "kubeconfig: "
private val MINIKUBE_VM_PREFIX = "minikubeVM: "
private val MINIKUBE_PREFIX = "minikube: "
private val MINIKUBE_PATH = ".minikube"
private val APIVERSION_PREFIX = "apiVersion: "
private val SERVER_PREFIX = "server: "
private val CA_PREFIX = "certificate-authority: "
private val CLIENTCERT_PREFIX = "client-certificate: "
private val CLIENTKEY_PREFIX = "client-key: "
def logVersion(): Unit = {
logInfo(executeMinikube("version").mkString("\n"))
}
lazy val minikubeVersionString =
executeMinikube("version").find(_.contains(VERSION_PREFIX)).get
def logVersion(): Unit =
logInfo(minikubeVersionString)
def getMinikubeIp: String = {
val outputs = executeMinikube("ip")
@ -56,60 +64,106 @@ private[spark] object Minikube extends Logging {
if (oldMinikube.isEmpty) {
getIfNewMinikubeStatus(statusString)
} else {
val finalStatusString = oldMinikube
.head
.replaceFirst(MINIKUBE_VM_PREFIX, "")
.replaceFirst(MINIKUBE_PREFIX, "")
val statusLine = oldMinikube.head
val finalStatusString = if (statusLine.contains(MINIKUBE_VM_PREFIX)) {
statusLine.split(MINIKUBE_VM_PREFIX)(1)
} else {
statusLine.split(MINIKUBE_PREFIX)(1)
}
MinikubeStatus.unapply(finalStatusString)
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
}
}
def getKubernetesClient: DefaultKubernetesClient = {
// only the three-part version number is matched (the optional suffix like "-beta.0" is dropped)
val versionArrayOpt = "\\d+\\.\\d+\\.\\d+".r
.findFirstIn(minikubeVersionString.split(VERSION_PREFIX)(1))
.map(_.split('.').map(_.toInt))
assert(versionArrayOpt.isDefined && versionArrayOpt.get.size == 3,
s"Unexpected version format detected in `$minikubeVersionString`." +
"For minikube version a three-part version number is expected (the optional non-numeric " +
"suffix is intentionally dropped)")
val kubernetesConf = versionArrayOpt.get match {
case Array(x, y, z) =>
// comparing the versions as the kubectl command is only introduced in version v1.1.0:
// https://github.com/kubernetes/minikube/blob/v1.1.0/CHANGELOG.md
if (Ordering.Tuple3[Int, Int, Int].gteq((x, y, z), (1, 1, 0))) {
kubectlBasedKubernetesClientConf
} else {
legacyKubernetesClientConf
}
}
new DefaultKubernetesClient(kubernetesConf)
}
private def legacyKubernetesClientConf: Config = {
val kubernetesMaster = s"https://${getMinikubeIp}:8443"
val userHome = System.getProperty("user.home")
val minikubeBasePath = Paths.get(userHome, MINIKUBE_PATH).toString
val profileDir = if (Files.exists(Paths.get(minikubeBasePath, "apiserver.crt"))) {
// For Minikube <1.9
""
} else {
// For Minikube >=1.9
Paths.get("profiles", executeMinikube("profile")(0)).toString
}
val apiServerCertPath = Paths.get(minikubeBasePath, profileDir, "apiserver.crt")
val apiServerKeyPath = Paths.get(minikubeBasePath, profileDir, "apiserver.key")
val kubernetesConf = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(kubernetesMaster)
.withCaCertFile(
Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath)
.withClientCertFile(apiServerCertPath.toFile.getAbsolutePath)
.withClientKeyFile(apiServerKeyPath.toFile.getAbsolutePath)
buildKubernetesClientConf(
"v1",
kubernetesMaster,
Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath,
Paths.get(userHome, MINIKUBE_PATH, "apiserver.crt").toFile.getAbsolutePath,
Paths.get(userHome, MINIKUBE_PATH, "apiserver.key").toFile.getAbsolutePath)
}
private def kubectlBasedKubernetesClientConf: Config = {
val outputs = executeMinikube("kubectl config view")
val apiVersionString = outputs.find(_.contains(APIVERSION_PREFIX))
val serverString = outputs.find(_.contains(SERVER_PREFIX))
val caString = outputs.find(_.contains(CA_PREFIX))
val clientCertString = outputs.find(_.contains(CLIENTCERT_PREFIX))
val clientKeyString = outputs.find(_.contains(CLIENTKEY_PREFIX))
assert(!apiVersionString.isEmpty && !serverString.isEmpty && !caString.isEmpty &&
!clientKeyString.isEmpty && !clientKeyString.isEmpty,
"The output of 'minikube kubectl config view' does not contain all the neccesary attributes")
buildKubernetesClientConf(
apiVersionString.get.split(APIVERSION_PREFIX)(1),
serverString.get.split(SERVER_PREFIX)(1),
caString.get.split(CA_PREFIX)(1),
clientCertString.get.split(CLIENTCERT_PREFIX)(1),
clientKeyString.get.split(CLIENTKEY_PREFIX)(1))
}
private def buildKubernetesClientConf(apiVersion: String, masterUrl: String, caCertFile: String,
clientCertFile: String, clientKeyFile: String): Config = {
logInfo(s"building kubernetes config with apiVersion: $apiVersion, masterUrl: $masterUrl, " +
s"caCertFile: $caCertFile, clientCertFile: $clientCertFile, clientKeyFile: $clientKeyFile")
new ConfigBuilder()
.withApiVersion(apiVersion)
.withMasterUrl(masterUrl)
.withCaCertFile(caCertFile)
.withClientCertFile(clientCertFile)
.withClientKeyFile(clientKeyFile)
.build()
new DefaultKubernetesClient(kubernetesConf)
}
// Covers minikube status output after Minikube V0.30.
private def getIfNewMinikubeStatus(statusString: Seq[String]): MinikubeStatus.Value = {
val hostString = statusString.find(_.contains(s"$HOST_PREFIX "))
val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX "))
val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX "))
val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX "))
val kubeconfigString = statusString.find(_.contains(s"$KUBECONFIG_PREFIX "))
val hostString = statusString.find(_.contains(HOST_PREFIX))
val kubeletString = statusString.find(_.contains(KUBELET_PREFIX))
val apiserverString = statusString.find(_.contains(APISERVER_PREFIX))
val kubectlString = statusString.find(_.contains(KUBECTL_PREFIX))
val kubeconfigString = statusString.find(_.contains(KUBECONFIG_PREFIX))
val hasConfigStatus = kubectlString.isDefined || kubeconfigString.isDefined
if (hostString.isEmpty || kubeletString.isEmpty || apiserverString.isEmpty ||
!hasConfigStatus) {
MinikubeStatus.NONE
} else {
val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "")
val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "")
val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "")
val status1 = hostString.get.split(HOST_PREFIX)(1)
val status2 = kubeletString.get.split(KUBELET_PREFIX)(1)
val status3 = apiserverString.get.split(APISERVER_PREFIX)(1)
val isConfigured = if (kubectlString.isDefined) {
val cfgStatus = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "")
val cfgStatus = kubectlString.get.split(KUBECTL_PREFIX)(1)
cfgStatus.contains("Correctly Configured:")
} else {
kubeconfigString.get.replaceFirst(s"$KUBECONFIG_PREFIX ", "") == "Configured"
kubeconfigString.get.split(KUBECONFIG_PREFIX)(1) == "Configured"
}
if (isConfigured) {
val stats = List(status1, status2, status3)