diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 769eed1e6f..2d4e5cd65f 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -308,7 +308,15 @@ The configuration properties for mounting volumes into the executor pods use pre ## Local Storage -Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. +Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-`, for example: + +``` +--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path= +--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false +``` + + +If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured appropriately. `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 19ed2df555..91edee72fc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -18,7 +18,9 @@ package org.apache.spark.deploy.k8s.features import java.util.UUID -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ @@ -28,36 +30,47 @@ private[spark] class LocalDirsFeatureStep( defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}") extends KubernetesFeatureConfigStep { - // Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system - // property - we want to instead default to mounting an emptydir volume that doesn't already - // exist in the image. - // We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already - // a bit opinionated about YARN and Mesos. - private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS")) - .orElse(conf.getOption("spark.local.dir")) - .getOrElse(defaultLocalDir) - .split(",") private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { - val localDirVolumes = resolvedLocalDirs - .zipWithIndex - .map { case (localDir, index) => - new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .withMedium(if (useLocalDirTmpFs) "Memory" else null) - .endEmptyDir() - .build() - } - val localDirVolumeMounts = localDirVolumes - .zip(resolvedLocalDirs) - .map { case (localDirVolume, localDirPath) => - new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() - } + var localDirs = pod.container.getVolumeMounts.asScala + .filter(_.getName.startsWith("spark-local-dir-")) + .map(_.getMountPath) + var localDirVolumes : Seq[Volume] = Seq() + var localDirVolumeMounts : Seq[VolumeMount] = Seq() + + if (localDirs.isEmpty) { + // Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system + // property - we want to instead default to mounting an emptydir volume that doesn't already + // exist in the image. + // We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already + // a bit opinionated about YARN and Mesos. + val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS")) + .orElse(conf.getOption("spark.local.dir")) + .getOrElse(defaultLocalDir) + .split(",") + localDirs = resolvedLocalDirs.toBuffer + localDirVolumes = resolvedLocalDirs + .zipWithIndex + .map { case (_, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-${index + 1}") + .withNewEmptyDir() + .withMedium(if (useLocalDirTmpFs) "Memory" else null) + .endEmptyDir() + .build() + } + + localDirVolumeMounts = localDirVolumes + .zip(resolvedLocalDirs) + .map { case (localDirVolume, localDirPath) => + new VolumeMountBuilder() + .withName(localDirVolume.getName) + .withMountPath(localDirPath) + .build() + } + } + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() .addToVolumes(localDirVolumes: _*) @@ -66,7 +79,7 @@ private[spark] class LocalDirsFeatureStep( val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) .addNewEnv() .withName("SPARK_LOCAL_DIRS") - .withValue(resolvedLocalDirs.mkString(",")) + .withValue(localDirs.mkString(",")) .endEnv() .addToVolumeMounts(localDirVolumeMounts: _*) .build() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 57e4060bc8..43639a3b7d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -43,12 +43,12 @@ private[spark] class KubernetesDriverBuilder { new DriverServiceFeatureStep(conf), new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), - new LocalDirsFeatureStep(conf), new MountVolumesFeatureStep(conf), new DriverCommandFeatureStep(conf), new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), - new PodTemplateConfigMapStep(conf)) + new PodTemplateConfigMapStep(conf), + new LocalDirsFeatureStep(conf)) val spec = KubernetesDriverSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index c55488bafa..22bff2c807 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -44,8 +44,8 @@ private[spark] class KubernetesExecutorBuilder { new ExecutorKubernetesCredentialsFeatureStep(conf), new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), - new LocalDirsFeatureStep(conf), - new MountVolumesFeatureStep(conf)) + new MountVolumesFeatureStep(conf), + new LocalDirsFeatureStep(conf)) features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 8f34ce5c6b..13bac43600 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -19,9 +19,8 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.util.SparkConfWithEnv class LocalDirsFeatureStepSuite extends SparkFunSuite { @@ -116,4 +115,30 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { .withValue(defaultLocalDir) .build()) } + + test("local dir on mounted volume") { + val volumeConf = KubernetesVolumeSpec( + "spark-local-dir-test", + "/tmp", + "", + false, + KubernetesHostPathVolumeConf("/hostPath/tmp") + ) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod()) + val localDirStep = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val newConfiguredPod = localDirStep.configurePod(configuredPod) + + assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1) + assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp") + assert(newConfiguredPod.container.getVolumeMounts.size() === 1) + assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-test") + assert(newConfiguredPod.container.getEnv.get(0) === + new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue("/tmp") + .build()) + } }