[SPARK-25295][K8S] Fix executor names collision

## What changes were proposed in this pull request?
Fixes the collision issue with spark executor names in client mode, see SPARK-25295 for the details.
It follows the cluster name convention as app-name will be used as the prefix and if that is not defined we use "spark" as the default prefix. Eg. `spark-pi-1536781360723-exec-1` where spark-pi is the name of the app passed at the config side or transformed if it contains illegal characters.

Also fixes the issue with spark app name having spaces in cluster mode.
If you run the Spark Pi test in client mode it passes.
The tricky part is the user may set the app name:
3030b82c89/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala (L30)
If i do:

```
./bin/spark-submit
...
 --deploy-mode cluster --name "spark pi"
...
```
it will fail as the app name is used for the prefix of driver's pod name and it cannot have spaces (according to k8s conventions).

## How was this patch tested?

Manually by running spark job in client mode.
To reproduce do:
```
kubectl create -f service.yaml
kubectl create -f pod.yaml
```
 service.yaml :
```
kind: Service
apiVersion: v1
metadata:
  name: spark-test-app-1-svc
spec:
  clusterIP: None
  selector:
    spark-app-selector: spark-test-app-1
  ports:
  - protocol: TCP
    name: driver-port
    port: 7077
    targetPort: 7077
  - protocol: TCP
    name: block-manager
    port: 10000
    targetPort: 10000
```
pod.yaml:

```
apiVersion: v1
kind: Pod
metadata:
  name: spark-test-app-1
  labels:
    spark-app-selector: spark-test-app-1
spec:
  containers:
  - name: spark-test
    image: skonto/spark:k8s-client-fix
    imagePullPolicy: Always
    command:
      - 'sh'
      - '-c'
      -  "/opt/spark/bin/spark-submit
              --verbose
              --master k8s://https://kubernetes.default.svc
              --deploy-mode client
              --class org.apache.spark.examples.SparkPi
              --conf spark.app.name=spark
              --conf spark.executor.instances=1
              --conf spark.kubernetes.container.image=skonto/spark:k8s-client-fix
              --conf spark.kubernetes.container.image.pullPolicy=Always
              --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token
              --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
              --conf spark.executor.memory=500m
              --conf spark.executor.cores=1
              --conf spark.executor.instances=1
              --conf spark.driver.host=spark-test-app-1-svc.default.svc
              --conf spark.driver.port=7077
              --conf spark.driver.blockManager.port=10000
              local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 1000000"
```

Closes #22405 from skonto/fix-k8s-client-mode-executor-names.

Authored-by: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>
Signed-off-by: Yinan Li <ynli@google.com>
This commit is contained in:
Stavros Kontopoulos 2018-09-12 22:02:59 -07:00 committed by Yinan Li
parent 8b702e1e0a
commit 3e75a9fa24
3 changed files with 42 additions and 8 deletions

View file

@ -24,6 +24,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.internal.config.ConfigEntry
@ -220,10 +221,20 @@ private[spark] object KubernetesConf {
val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
// If no prefix is defined then we are in pure client mode
// (not the one used by cluster mode inside the container)
val appResourceNamePrefix = {
if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) {
getResourceNamePrefix(getAppName(sparkConf))
} else {
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
}
}
KubernetesConf( KubernetesConf(
sparkConf.clone(), sparkConf.clone(),
KubernetesExecutorSpecificConf(executorId, driverPod), KubernetesExecutorSpecificConf(executorId, driverPod),
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX), appResourceNamePrefix,
appId, appId,
executorLabels, executorLabels,
executorAnnotations, executorAnnotations,

View file

@ -211,11 +211,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below. // a unique app ID (captured by spark.app.id) in the format below.
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val launchTime = System.currentTimeMillis()
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val kubernetesResourceNamePrefix = { val kubernetesResourceNamePrefix = KubernetesClientApplication.getResourceNamePrefix(appName)
s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
}
sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse("")) sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse(""))
val kubernetesConf = KubernetesConf.createDriverConf( val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf, sparkConf,
@ -254,3 +251,19 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
} }
} }
} }
private[spark] object KubernetesClientApplication {
def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark")
def getResourceNamePrefix(appName: String): String = {
val launchTime = System.currentTimeMillis()
s"$appName-$launchTime"
.trim
.toLowerCase
.replaceAll("\\s+", "-")
.replaceAll("\\.", "-")
.replaceAll("[^a-z0-9\\-]", "")
.replaceAll("-+", "-")
}
}

View file

@ -167,13 +167,23 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
executorSpecificConf.executorId, executorSpecificConf.executorId,
TEST_SPARK_APP_ID, TEST_SPARK_APP_ID,
Some(driverPod)) Some(driverPod))
k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
// Set prefixes to a common string since KUBERNETES_EXECUTOR_POD_NAME_PREFIX
// has not be set for the tests and thus KubernetesConf will use a random
// string for the prefix, based on the app name, and this comparison here will fail.
val k8sConfCopy = k8sConf
.copy(appResourceNamePrefix = "")
.copy(sparkConf = conf)
val expectedK8sConfCopy = expectedK8sConf
.copy(appResourceNamePrefix = "")
.copy(sparkConf = conf)
k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
// Since KubernetesConf.createExecutorConf clones the SparkConf object, force // Since KubernetesConf.createExecutorConf clones the SparkConf object, force
// deep equality comparison for the SparkConf object and use object equality // deep equality comparison for the SparkConf object and use object equality
// comparison on all other fields. // comparison on all other fields.
k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf) k8sConfCopy == expectedK8sConfCopy
} }
} }
}) })
} }