[SPARK-32971][K8S] Support dynamic PVC creation/deletion for K8s executors

### What changes were proposed in this pull request?

This PR aims to support dynamic PVC creation and deletion for K8s executors. The PVCs are created with executor pods and deleted when the executor pods are deleted.

**Configuration**
Mostly, this PR reuses the existing PVC volume configs and `storageClass` is added.
```
spark.executor.instances=2
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp2
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=500Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
```

**Executors**
```
$ kubectl get pod -l spark-role=executor
NAME                               READY   STATUS    RESTARTS   AGE
spark-pi-f4d80574b9bb0941-exec-1   1/1     Running   0          2m6s
spark-pi-f4d80574b9bb0941-exec-2   1/1     Running   0          2m6s
```

**PVCs**
```
$ kubectl get pvc
NAME                                     STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLA
SS   AGE
spark-pi-f4d80574b9bb0941-exec-1-pvc-0   Bound    pvc-7d20173f-278b-4c7b-b7e5-7f0ed414ee64   500Gi      RWO            gp2
     48s
spark-pi-f4d80574b9bb0941-exec-2-pvc-0   Bound    pvc-1138f00d-87f1-47f4-9b58-ce5d13ea0c3a   500Gi      RWO            gp2
     48s
```

**Executor Disk**
```
$ k exec -it spark-pi-f4d80574b9bb0941-exec-1 -- df -h /data
Filesystem      Size  Used Avail Use% Mounted on
/dev/nvme3n1    493G   74M  492G   1% /data
```

```
$ k exec -it spark-pi-f4d80574b9bb0941-exec-1 -- ls /data
blockmgr-81dcebaf-11a7-4d7b-91d6-3c580187d914
lost+found
spark-6be42db8-2c58-4389-b52c-8aeeafe76bd5
```

### Why are the changes needed?

While SPARK-32655 supports to mount a pre-created PVC, this PR can create PVC itself dynamically and reduce lots of manual efforts.

### Does this PR introduce _any_ user-facing change?

Yes. This is a new feature.

### How was this patch tested?

Pass the newly added test cases.

Closes #29846 from dongjoon-hyun/SPARK-32971.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2020-09-23 16:47:10 -07:00
parent 27f6b5a103
commit 527cd3fc3a
13 changed files with 167 additions and 41 deletions

View file

@ -439,6 +439,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY = "options.storageClass"
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"

View file

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.HasMetadata
private[spark] case class KubernetesExecutorSpec(
pod: SparkPod,
executorKubernetesResources: Seq[HasMetadata])

View file

@ -19,11 +19,11 @@ package org.apache.spark.deploy.k8s
import java.io.{File, IOException}
import java.net.URI
import java.security.SecureRandom
import java.util.UUID
import java.util.{Collections, UUID}
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
@ -323,4 +323,22 @@ private[spark] object KubernetesUtils extends Logging {
.build()
}
}
// Add a OwnerReference to the given resources making the pod an owner of them so when
// the pod is deleted, the resources are garbage collected.
def addOwnerReference(pod: Pod, resources: Seq[HasMetadata]): Unit = {
if (pod != null) {
val reference = new OwnerReferenceBuilder()
.withName(pod.getMetadata.getName)
.withApiVersion(pod.getApiVersion)
.withUid(pod.getMetadata.getUid)
.withKind(pod.getKind)
.withController(true)
.build()
resources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(reference))
}
}
}
}

View file

@ -21,7 +21,10 @@ private[spark] sealed trait KubernetesVolumeSpecificConf
private[spark] case class KubernetesHostPathVolumeConf(hostPath: String)
extends KubernetesVolumeSpecificConf
private[spark] case class KubernetesPVCVolumeConf(claimName: String)
private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None)
extends KubernetesVolumeSpecificConf
private[spark] case class KubernetesEmptyDirVolumeConf(

View file

@ -71,7 +71,13 @@ private[spark] object KubernetesVolumeUtils {
case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
KubernetesPVCVolumeConf(options(claimNameKey))
val storageClassKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY"
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey))
case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"

View file

@ -16,6 +16,9 @@
*/
package org.apache.spark.deploy.k8s.features
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import io.fabric8.kubernetes.api.model._
import org.apache.spark.deploy.k8s._
@ -23,6 +26,9 @@ import org.apache.spark.deploy.k8s.Constants.ENV_EXECUTOR_ID
private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
import MountVolumesFeatureStep._
val additionalResources = ArrayBuffer.empty[HasMetadata]
override def configurePod(pod: SparkPod): SparkPod = {
val (volumeMounts, volumes) = constructVolumes(conf.volumes).unzip
@ -43,7 +49,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
private def constructVolumes(
volumeSpecs: Iterable[KubernetesVolumeSpec]
): Iterable[(VolumeMount, Volume)] = {
volumeSpecs.map { spec =>
volumeSpecs.zipWithIndex.map { case (spec, i) =>
val volumeMount = new VolumeMountBuilder()
.withMountPath(spec.mountPath)
.withReadOnly(spec.mountReadOnly)
@ -57,10 +63,32 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, ""))
case KubernetesPVCVolumeConf(claimNameTemplate) =>
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate.replaceAll(ENV_EXECUTOR_ID, c.executorId)
val claimName = claimNameTemplate
.replaceAll(PVC_ON_DEMAND,
s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i")
.replaceAll(ENV_EXECUTOR_ID, c.executorId)
if (storageClass.isDefined && size.isDefined) {
additionalResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
.withAccessModes(PVC_ACCESS_MODE)
.withResources(new ResourceRequirementsBuilder()
.withRequests(Map("storage" -> new Quantity(size.get)).asJava).build())
.endSpec()
.build())
}
claimName
case _ =>
claimNameTemplate
}
@ -84,4 +112,15 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
(volumeMount, volume)
}
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
additionalResources
}
}
private[spark] object MountVolumesFeatureStep {
val PVC_ON_DEMAND = "OnDemand"
val PVC = "PersistentVolumeClaim"
val PVC_POSTFIX = "-pvc"
val PVC_ACCESS_MODE = "ReadWriteOnce"
}

View file

@ -20,18 +20,20 @@ import java.io.StringWriter
import java.util.{Collections, UUID}
import java.util.Properties
import scala.collection.mutable
import scala.util.control.Breaks._
import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.mutable
import scala.util.control.NonFatal
import util.control.Breaks._
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@ -134,7 +136,7 @@ private[spark] class Client(
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
addOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
@ -163,22 +165,6 @@ private[spark] class Client(
}
}
// Add a OwnerReference to the given resources making the driver pod an owner of them so when
// the driver pod is deleted, the resources are garbage collected.
private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(driverPod.getMetadata.getName)
.withApiVersion(driverPod.getApiVersion)
.withUid(driverPod.getMetadata.getUid)
.withKind(driverPod.getKind)
.withController(true)
.build()
resources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
}
}
// Build a Config Map that will house spark conf properties in a single file for spark-submit
private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
val properties = new Properties()

View file

@ -18,14 +18,17 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.mutable
import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.util.{Clock, Utils}
@ -212,16 +215,33 @@ private[spark] class ExecutorPodsAllocator(
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient)
val executorPod = resolvedExecutorSpec.pod
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
kubernetesClient.pods().create(podWithAttachedContainer)
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
try {
val resources = resolvedExecutorSpec.executorKubernetesResources
addOwnerReference(createdExecutorPod, resources)
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc)
}
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdExecutorPod)
throw e
}
}
}

View file

@ -29,7 +29,7 @@ private[spark] class KubernetesExecutorBuilder {
def buildFromFeatures(
conf: KubernetesExecutorConf,
secMgr: SecurityManager,
client: KubernetesClient): SparkPod = {
client: KubernetesClient): KubernetesExecutorSpec = {
val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
.map { file =>
KubernetesUtils.loadPodFromTemplate(
@ -47,7 +47,17 @@ private[spark] class KubernetesExecutorBuilder {
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf))
features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
val spec = KubernetesExecutorSpec(
initialPod,
executorKubernetesResources = Seq.empty)
features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesExecutorSpec(
configuredPod,
spec.executorKubernetesResources ++ addedResources)
}
}
}

View file

@ -114,9 +114,12 @@ object KubernetesTestConf {
(KUBERNETES_VOLUMES_HOSTPATH_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))
case KubernetesPVCVolumeConf(claimName) =>
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
(KUBERNETES_VOLUMES_PVC_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName))
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ sconf ++ lconf)
case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap

View file

@ -89,6 +89,23 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(executorPVC.getClaimName === s"pvc-spark-${KubernetesTestConf.EXECUTOR_ID}")
}
test("Create and mount persistentVolumeClaims in executors") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf(MountVolumesFeatureStep.PVC_ON_DEMAND)
)
val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf))
val executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())
assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}
test("Mounts emptyDir") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",

View file

@ -27,7 +27,7 @@ import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
@ -202,9 +202,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(!podsAllocatorUnderTest.isDeleted("4"))
}
private def executorPodAnswer(): Answer[SparkPod] =
private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
executorPodWithId(k8sConf.executorId.toInt)
KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt), Seq.empty)
}
}

View file

@ -32,7 +32,7 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
sparkConf.set("spark.driver.host", "https://driver.host.com")
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
val secMgr = new SecurityManager(sparkConf)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client).pod
}
}