[SPARK-28042][K8S] Support using volume mount as local storage

## What changes were proposed in this pull request?

This pr is used to support using hostpath/PV volume mounts as local storage. In KubernetesExecutorBuilder.scala, the LocalDrisFeatureStep is built before MountVolumesFeatureStep which means we cannot use any volumes mount later. This pr adjust the order of feature building steps which moves localDirsFeature at last so that we can check if directories in SPARK_LOCAL_DIRS are set to volumes mounted such as hostPath, PV, or others.

## How was this patch tested?
Unit tests

Closes #24879 from chenjunjiedada/SPARK-28042.

Lead-authored-by: Junjie Chen <jimmyjchen@tencent.com>
Co-authored-by: Junjie Chen <cjjnjust@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Junjie Chen 2019-07-29 10:44:17 -07:00 committed by Marcelo Vanzin
parent d98aa2a184
commit 780d176136
5 changed files with 82 additions and 36 deletions

View file

@ -308,7 +308,15 @@ The configuration properties for mounting volumes into the executor pods use pre
## Local Storage ## Local Storage
Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-`, for example:
```
--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path=<mount path>
--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false
```
If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured appropriately.
`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod.

View file

@ -18,7 +18,9 @@ package org.apache.spark.deploy.k8s.features
import java.util.UUID import java.util.UUID
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
@ -28,21 +30,29 @@ private[spark] class LocalDirsFeatureStep(
defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}") defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}")
extends KubernetesFeatureConfigStep { extends KubernetesFeatureConfigStep {
private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
override def configurePod(pod: SparkPod): SparkPod = {
var localDirs = pod.container.getVolumeMounts.asScala
.filter(_.getName.startsWith("spark-local-dir-"))
.map(_.getMountPath)
var localDirVolumes : Seq[Volume] = Seq()
var localDirVolumeMounts : Seq[VolumeMount] = Seq()
if (localDirs.isEmpty) {
// Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system // Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
// property - we want to instead default to mounting an emptydir volume that doesn't already // property - we want to instead default to mounting an emptydir volume that doesn't already
// exist in the image. // exist in the image.
// We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already // We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
// a bit opinionated about YARN and Mesos. // a bit opinionated about YARN and Mesos.
private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS")) val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
.orElse(conf.getOption("spark.local.dir")) .orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir) .getOrElse(defaultLocalDir)
.split(",") .split(",")
private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) localDirs = resolvedLocalDirs.toBuffer
localDirVolumes = resolvedLocalDirs
override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex .zipWithIndex
.map { case (localDir, index) => .map { case (_, index) =>
new VolumeBuilder() new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}") .withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir() .withNewEmptyDir()
@ -50,7 +60,8 @@ private[spark] class LocalDirsFeatureStep(
.endEmptyDir() .endEmptyDir()
.build() .build()
} }
val localDirVolumeMounts = localDirVolumes
localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs) .zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) => .map { case (localDirVolume, localDirPath) =>
new VolumeMountBuilder() new VolumeMountBuilder()
@ -58,6 +69,8 @@ private[spark] class LocalDirsFeatureStep(
.withMountPath(localDirPath) .withMountPath(localDirPath)
.build() .build()
} }
}
val podWithLocalDirVolumes = new PodBuilder(pod.pod) val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec() .editSpec()
.addToVolumes(localDirVolumes: _*) .addToVolumes(localDirVolumes: _*)
@ -66,7 +79,7 @@ private[spark] class LocalDirsFeatureStep(
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
.addNewEnv() .addNewEnv()
.withName("SPARK_LOCAL_DIRS") .withName("SPARK_LOCAL_DIRS")
.withValue(resolvedLocalDirs.mkString(",")) .withValue(localDirs.mkString(","))
.endEnv() .endEnv()
.addToVolumeMounts(localDirVolumeMounts: _*) .addToVolumeMounts(localDirVolumeMounts: _*)
.build() .build()

View file

@ -43,12 +43,12 @@ private[spark] class KubernetesDriverBuilder {
new DriverServiceFeatureStep(conf), new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf), new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf), new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf), new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf), new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf)) new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf))
val spec = KubernetesDriverSpec( val spec = KubernetesDriverSpec(
initialPod, initialPod,

View file

@ -44,8 +44,8 @@ private[spark] class KubernetesExecutorBuilder {
new ExecutorKubernetesCredentialsFeatureStep(conf), new ExecutorKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf), new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf), new MountVolumesFeatureStep(conf),
new MountVolumesFeatureStep(conf)) new LocalDirsFeatureStep(conf))
features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) } features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
} }

View file

@ -19,9 +19,8 @@ package org.apache.spark.deploy.k8s.features
import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder} import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder}
import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.util.SparkConfWithEnv import org.apache.spark.util.SparkConfWithEnv
class LocalDirsFeatureStepSuite extends SparkFunSuite { class LocalDirsFeatureStepSuite extends SparkFunSuite {
@ -116,4 +115,30 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite {
.withValue(defaultLocalDir) .withValue(defaultLocalDir)
.build()) .build())
} }
test("local dir on mounted volume") {
val volumeConf = KubernetesVolumeSpec(
"spark-local-dir-test",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod())
val localDirStep = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
val newConfiguredPod = localDirStep.configurePod(configuredPod)
assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1)
assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp")
assert(newConfiguredPod.container.getVolumeMounts.size() === 1)
assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-test")
assert(newConfiguredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue("/tmp")
.build())
}
} }