[SPARK-34316][K8S] Support spark.kubernetes.executor.disableConfigMap

### What changes were proposed in this pull request?

This PR aims to add a new configuration `spark.kubernetes.executor.disableConfigMap`.

### Why are the changes needed?

This can be use to disable config map creating for executor pods due to https://github.com/apache/spark/pull/27735 .

### Does this PR introduce _any_ user-facing change?

No. By default, this doesn't change AS-IS behavior.
This is a new feature to add an ability to disable SPARK-30985.

### How was this patch tested?

Pass the newly added UT.

Closes #31428 from dongjoon-hyun/SPARK-34316.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-02-01 22:26:07 -08:00
parent d99d0d27be
commit f66e38c963
4 changed files with 51 additions and 18 deletions

View file

@ -227,6 +227,13 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
val KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP =
ConfigBuilder("spark.kubernetes.executor.disableConfigMap")
.doc("If true, disable ConfigMap creation for executors.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)
val KUBERNETES_DRIVER_POD_FEATURE_STEPS =
ConfigBuilder("spark.kubernetes.driver.pod.featureSteps")
.doc("Class names of an extra driver pod feature step implementing " +

View file

@ -55,6 +55,7 @@ private[spark] class BasicExecutorFeatureStep(
private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)
private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)
val execResources = ResourceProfile.getResourcesForClusterManager(
resourceProfile.id,
@ -197,10 +198,6 @@ private[spark] class BasicExecutorFeatureStep(
.addToRequests("cpu", executorCpuQuantity)
.addToLimits(executorResourceQuantities.asJava)
.endResources()
.addNewVolumeMount()
.withName(SPARK_CONF_VOLUME_EXEC)
.withMountPath(SPARK_CONF_DIR_INTERNAL)
.endVolumeMount()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
@ -209,15 +206,25 @@ private[spark] class BasicExecutorFeatureStep(
.withPorts(requiredPorts.asJava)
.addToArgs("executor")
.build()
val executorContainerWithConfVolume = if (disableConfigMap) {
executorContainer
} else {
new ContainerBuilder(executorContainer)
.addNewVolumeMount()
.withName(SPARK_CONF_VOLUME_EXEC)
.withMountPath(SPARK_CONF_DIR_INTERNAL)
.endVolumeMount()
.build()
}
val containerWithLimitCores = if (isDefaultProfile) {
executorLimitCores.map { limitCores =>
val executorCpuLimitQuantity = new Quantity(limitCores)
new ContainerBuilder(executorContainer)
new ContainerBuilder(executorContainerWithConfVolume)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.build()
}.getOrElse(executorContainer)
}.getOrElse(executorContainerWithConfVolume)
} else {
executorContainer
}
@ -245,7 +252,7 @@ private[spark] class BasicExecutorFeatureStep(
.withUid(pod.getMetadata.getUid)
.build()
}
val executorPod = new PodBuilder(pod.pod)
val executorPodBuilder = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.addToLabels(kubernetesConf.labels.asJava)
@ -257,6 +264,10 @@ private[spark] class BasicExecutorFeatureStep(
.withRestartPolicy("Never")
.addToNodeSelector(kubernetesConf.nodeSelector.asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*)
val executorPod = if (disableConfigMap) {
executorPodBuilder.endSpec().build()
} else {
executorPodBuilder
.addNewVolume()
.withName(SPARK_CONF_VOLUME_EXEC)
.withNewConfigMap()
@ -266,7 +277,7 @@ private[spark] class BasicExecutorFeatureStep(
.endVolume()
.endSpec()
.build()
}
kubernetesConf.get(KUBERNETES_EXECUTOR_SCHEDULER_NAME)
.foreach(executorPod.getSpec.setSchedulerName)

View file

@ -95,7 +95,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
podAllocator.start(applicationId())
watchEvents.start(applicationId())
pollEvents.start(applicationId())
setUpExecutorConfigMap()
if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) {
setUpExecutorConfigMap()
}
}
override def stop(): Unit = {
@ -121,12 +123,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
}
Utils.tryLogNonFatalError {
kubernetesClient
.configMaps()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) {
Utils.tryLogNonFatalError {
kubernetesClient
.configMaps()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
}
}
}

View file

@ -325,9 +325,20 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val podConfigured = step.configurePod(baseDriverPod)
SecretVolumeUtils.containerHasVolume(podConfigured.container,
SPARK_CONF_VOLUME_EXEC, SPARK_CONF_DIR_INTERNAL)
SecretVolumeUtils.podHasVolume(podConfigured.pod, SPARK_CONF_VOLUME_EXEC)
assert(SecretVolumeUtils.containerHasVolume(podConfigured.container,
SPARK_CONF_VOLUME_EXEC, SPARK_CONF_DIR_INTERNAL))
assert(SecretVolumeUtils.podHasVolume(podConfigured.pod, SPARK_CONF_VOLUME_EXEC))
}
test("SPARK-34316 Disable configmap volume on executor pod's container") {
baseConf.set(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP, true)
val baseDriverPod = SparkPod.initialPod()
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val podConfigured = step.configurePod(baseDriverPod)
assert(!SecretVolumeUtils.containerHasVolume(podConfigured.container,
SPARK_CONF_VOLUME_EXEC, SPARK_CONF_DIR_INTERNAL))
assert(!SecretVolumeUtils.podHasVolume(podConfigured.pod, SPARK_CONF_VOLUME_EXEC))
}
// There is always exactly one controller reference, and it points to the driver pod.