From 3e75a9fa24f8629d068b5fbbc7356ce2603fa58d Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Wed, 12 Sep 2018 22:02:59 -0700 Subject: [PATCH] [SPARK-25295][K8S] Fix executor names collision ## What changes were proposed in this pull request? Fixes the collision issue with spark executor names in client mode, see SPARK-25295 for the details. It follows the cluster name convention as app-name will be used as the prefix and if that is not defined we use "spark" as the default prefix. Eg. `spark-pi-1536781360723-exec-1` where spark-pi is the name of the app passed at the config side or transformed if it contains illegal characters. Also fixes the issue with spark app name having spaces in cluster mode. If you run the Spark Pi test in client mode it passes. The tricky part is the user may set the app name: https://github.com/apache/spark/blob/3030b82c89d3e45a2e361c469fbc667a1e43b854/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala#L30 If i do: ``` ./bin/spark-submit ... --deploy-mode cluster --name "spark pi" ... ``` it will fail as the app name is used for the prefix of driver's pod name and it cannot have spaces (according to k8s conventions). ## How was this patch tested? Manually by running spark job in client mode. To reproduce do: ``` kubectl create -f service.yaml kubectl create -f pod.yaml ``` service.yaml : ``` kind: Service apiVersion: v1 metadata: name: spark-test-app-1-svc spec: clusterIP: None selector: spark-app-selector: spark-test-app-1 ports: - protocol: TCP name: driver-port port: 7077 targetPort: 7077 - protocol: TCP name: block-manager port: 10000 targetPort: 10000 ``` pod.yaml: ``` apiVersion: v1 kind: Pod metadata: name: spark-test-app-1 labels: spark-app-selector: spark-test-app-1 spec: containers: - name: spark-test image: skonto/spark:k8s-client-fix imagePullPolicy: Always command: - 'sh' - '-c' - "/opt/spark/bin/spark-submit --verbose --master k8s://https://kubernetes.default.svc --deploy-mode client --class org.apache.spark.examples.SparkPi --conf spark.app.name=spark --conf spark.executor.instances=1 --conf spark.kubernetes.container.image=skonto/spark:k8s-client-fix --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt --conf spark.executor.memory=500m --conf spark.executor.cores=1 --conf spark.executor.instances=1 --conf spark.driver.host=spark-test-app-1-svc.default.svc --conf spark.driver.port=7077 --conf spark.driver.blockManager.port=10000 local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 1000000" ``` Closes #22405 from skonto/fix-k8s-client-mode-executor-names. Authored-by: Stavros Kontopoulos Signed-off-by: Yinan Li --- .../spark/deploy/k8s/KubernetesConf.scala | 13 +++++++++++- .../submit/KubernetesClientApplication.scala | 21 +++++++++++++++---- .../k8s/ExecutorPodsAllocatorSuite.scala | 16 +++++++++++--- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 3aa35d4190..cae6e7d5ad 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.internal.config.ConfigEntry @@ -220,10 +221,20 @@ private[spark] object KubernetesConf { val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) + // If no prefix is defined then we are in pure client mode + // (not the one used by cluster mode inside the container) + val appResourceNamePrefix = { + if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) { + getResourceNamePrefix(getAppName(sparkConf)) + } else { + sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + } + } + KubernetesConf( sparkConf.clone(), KubernetesExecutorSpecificConf(executorId, driverPod), - sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX), + appResourceNamePrefix, appId, executorLabels, executorAnnotations, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 986c950ab3..edeaa38019 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -211,11 +211,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication { // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" - val launchTime = System.currentTimeMillis() val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) - val kubernetesResourceNamePrefix = { - s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") - } + val kubernetesResourceNamePrefix = KubernetesClientApplication.getResourceNamePrefix(appName) sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse("")) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, @@ -254,3 +251,19 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } } } + +private[spark] object KubernetesClientApplication { + + def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") + + def getResourceNamePrefix(appName: String): String = { + val launchTime = System.currentTimeMillis() + s"$appName-$launchTime" + .trim + .toLowerCase + .replaceAll("\\s+", "-") + .replaceAll("\\.", "-") + .replaceAll("[^a-z0-9\\-]", "") + .replaceAll("-+", "-") + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index e847f8590d..0e617b0021 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -167,13 +167,23 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { executorSpecificConf.executorId, TEST_SPARK_APP_ID, Some(driverPod)) - k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap && + + // Set prefixes to a common string since KUBERNETES_EXECUTOR_POD_NAME_PREFIX + // has not be set for the tests and thus KubernetesConf will use a random + // string for the prefix, based on the app name, and this comparison here will fail. + val k8sConfCopy = k8sConf + .copy(appResourceNamePrefix = "") + .copy(sparkConf = conf) + val expectedK8sConfCopy = expectedK8sConf + .copy(appResourceNamePrefix = "") + .copy(sparkConf = conf) + + k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap && // Since KubernetesConf.createExecutorConf clones the SparkConf object, force // deep equality comparison for the SparkConf object and use object equality // comparison on all other fields. - k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf) + k8sConfCopy == expectedK8sConfCopy } } }) - }