diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index aa41b7ec2d..8df8e66912 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -70,6 +70,20 @@ private[spark] object Config extends Logging { .booleanConf .createWithDefault(false) + val KUBERNETES_DRIVER_REUSE_PVC = + ConfigBuilder("spark.kubernetes.driver.reusePersistentVolumeClaim") + .doc("If true, driver pod tries to reuse driver-owned on-demand persistent volume claims " + + "of the deleted executor pods if exists. This can be useful to reduce executor pod " + + "creation delay by skipping persistent volume creations. Note that a pod in " + + "`Terminating` pod status is not a deleted pod by definition and its resources " + + "including persistent volume claims are not reusable yet. Spark will create new " + + "persistent volume claims when there exists no reusable one. In other words, the total " + + "number of persistent volume claims can be larger than the number of running executors " + + s"sometimes. This config requires ${KUBERNETES_DRIVER_OWN_PVC.key}=true.") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace") .doc("The namespace that will be used for running the driver and executor pods.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index d54f665a38..3349e0c147 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal -import io.fabric8.kubernetes.api.model.{PersistentVolumeClaim, PodBuilder} +import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.{SecurityManager, SparkConf, SparkException} @@ -125,6 +125,14 @@ private[spark] class ExecutorPodsAllocator( newlyCreatedExecutors --= k8sKnownExecIds schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds + // Although we are going to delete some executors due to timeout in this function, + // it takes undefined time before the actual deletion. Hence, we should collect all PVCs + // in use at the beginning. False positive is okay in this context in order to be safe. + val k8sKnownPVCNames = snapshots.flatMap(_.executorPods.values.map(_.pod)).flatMap { pod => + pod.getSpec.getVolumes.asScala + .flatMap { v => Option(v.getPersistentVolumeClaim).map(_.getClaimName) } + } + // transfer the scheduler backend known executor requests from the newlyCreatedExecutors // to the schedulerKnownNewlyCreatedExecs val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet @@ -280,7 +288,7 @@ private[spark] class ExecutorPodsAllocator( if (newlyCreatedExecutorsForRpId.isEmpty && knownPodCount < targetNum) { - requestNewExecutors(targetNum, knownPodCount, applicationId, rpId) + requestNewExecutors(targetNum, knownPodCount, applicationId, rpId, k8sKnownPVCNames) } totalPendingCount += knownPendingCount @@ -308,14 +316,35 @@ private[spark] class ExecutorPodsAllocator( numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size) } + private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = { + if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && conf.get(KUBERNETES_DRIVER_REUSE_PVC) && + driverPod.nonEmpty) { + val createdPVCs = kubernetesClient + .persistentVolumeClaims + .withLabel("spark-app-selector", applicationId) + .list() + .getItems + .asScala + + val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) + logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") + reusablePVCs + } else { + mutable.Buffer.empty[PersistentVolumeClaim] + } + } + private def requestNewExecutors( expected: Int, running: Int, applicationId: String, - resourceProfileId: Int): Unit = { + resourceProfileId: Int, + pvcsInUse: Seq[String]): Unit = { val numExecutorsToAllocate = math.min(expected - running, podAllocationSize) logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") + // Check reusable PVCs for this executor allocation batch + val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) for ( _ <- 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val executorConf = KubernetesConf.createExecutorConf( @@ -332,9 +361,10 @@ private[spark] class ExecutorPodsAllocator( .addToContainers(executorPod.container) .endSpec() .build() + val resources = replacePVCsIfNeeded( + podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer) try { - val resources = resolvedExecutorSpec.executorKubernetesResources addOwnerReference(createdExecutorPod, resources) resources .filter(_.getKind == "PersistentVolumeClaim") @@ -357,6 +387,36 @@ private[spark] class ExecutorPodsAllocator( } } + private def replacePVCsIfNeeded( + pod: Pod, + resources: Seq[HasMetadata], + reusablePVCs: mutable.Buffer[PersistentVolumeClaim]) = { + val replacedResources = mutable.ArrayBuffer[HasMetadata]() + resources.foreach { + case pvc: PersistentVolumeClaim => + // Find one with the same storage class and size. + val index = reusablePVCs.indexWhere { p => + p.getSpec.getStorageClassName == pvc.getSpec.getStorageClassName && + p.getSpec.getResources.getRequests.get("storage") == + pvc.getSpec.getResources.getRequests.get("storage") + } + if (index >= 0) { + val volume = pod.getSpec.getVolumes.asScala.find { v => + v.getPersistentVolumeClaim != null && + v.getPersistentVolumeClaim.getClaimName == pvc.getMetadata.getName + } + if (volume.nonEmpty) { + val matchedPVC = reusablePVCs.remove(index) + replacedResources.append(pvc) + logInfo(s"Reuse PersistentVolumeClaim ${matchedPVC.getMetadata.getName}") + volume.get.getPersistentVolumeClaim.setClaimName(matchedPVC.getMetadata.getName) + } + } + case _ => // no-op + } + resources.filterNot(replacedResources.contains) + } + private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = { try { val startTime = Instant.parse(state.pod.getStatus.getStartTime).toEpochMilli() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala index 792f3096ee..14405da728 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.k8s -import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, Pod, PodList} +import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList} import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource} object Fabric8Aliases { @@ -28,4 +28,8 @@ object Fabric8Aliases { type SINGLE_POD = PodResource[Pod] type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[ HasMetadata] + type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList, + Resource[PersistentVolumeClaim]] + type LABELED_PERSISTENT_VOLUME_CLAIMS = + FilterWatchListDeletable[PersistentVolumeClaim, PersistentVolumeClaimList] } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala index 41cba573d8..78f11f9aa3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala @@ -18,7 +18,9 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant -import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder} +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.SparkPod @@ -170,6 +172,18 @@ object ExecutorLifecycleTestUtils { podWithAttachedContainer } + def podWithAttachedContainerForIdAndVolume( + executorId: Long, + rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = { + val sparkPod = executorPodWithIdAndVolume(executorId, rpId) + val podWithAttachedContainer = new PodBuilder(sparkPod.pod) + .editOrNewSpec() + .addToContainers(sparkPod.container) + .endSpec() + .build() + podWithAttachedContainer + } + def executorPodWithId(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): SparkPod = { val pod = new PodBuilder() .withNewMetadata() @@ -189,4 +203,32 @@ object ExecutorLifecycleTestUtils { .build() SparkPod(pod, container) } + + def executorPodWithIdAndVolume(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID) + : SparkPod = { + val sparkPod = executorPodWithId(executorId, rpId) + sparkPod.pod.getSpec.getVolumes.add(new VolumeBuilder() + .withName("spark-volume") + .withPersistentVolumeClaim(new PersistentVolumeClaimVolumeSource("pvc-0", false)) + .build()) + sparkPod + } + + def persistentVolumeClaim(claimName: String, storageClass: String, size: String) + : PersistentVolumeClaim = { + new PersistentVolumeClaimBuilder() + .withKind("PersistentVolumeClaim") + .withApiVersion("v1") + .withNewMetadata() + .withName(claimName) + .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID) + .endMetadata() + .withNewSpec() + .withStorageClassName(storageClass) + .withAccessModes("ReadWriteOnce") + .withResources(new ResourceRequirementsBuilder() + .withRequests(Map("storage" -> new Quantity(size)).asJava).build()) + .endSpec() + .build() + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index c5291ab973..17bdb1d2bc 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant -import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} @@ -72,6 +74,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var podOperations: PODS = _ + @Mock + private var persistentVolumeClaims: PERSISTENT_VOLUME_CLAIMS = _ + + @Mock + private var labeledPersistentVolumeClaims: LABELED_PERSISTENT_VOLUME_CLAIMS = _ + + @Mock + private var persistentVolumeClaimList: PersistentVolumeClaimList = _ + @Mock private var labeledPods: LABELED_PODS = _ @@ -569,6 +580,72 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(podOperations).create(podWithAttachedContainerForId(6)) } + test("SPARK-35416: Support PersistentVolumeClaim Reuse") { + val prefix = "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1" + val confWithPVC = conf.clone + .set(KUBERNETES_DRIVER_OWN_PVC.key, "true") + .set(KUBERNETES_DRIVER_REUSE_PVC.key, "true") + .set(s"$prefix.mount.path", "/spark-local-dir") + .set(s"$prefix.mount.readOnly", "false") + .set(s"$prefix.option.claimName", "OnDemand") + .set(s"$prefix.option.sizeLimit", "200Gi") + .set(s"$prefix.option.storageClass", "gp2") + + when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) + when(persistentVolumeClaims.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims) + when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList) + when(persistentVolumeClaimList.getItems) + .thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava) + when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), + meq(kubernetesClient), any(classOf[ResourceProfile]))) + .thenAnswer((invocation: InvocationOnMock) => { + val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) + KubernetesExecutorSpec( + executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId), + Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi"))) + }) + + podsAllocatorUnderTest = new ExecutorPodsAllocator( + confWithPVC, secMgr, executorBuilder, + kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + when(podOperations + .withField("status.phase", "Pending")) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(podOperations) + when(podOperations + .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) + .thenReturn(podOperations) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + // Target 1 executor, make sure it's requested, even with an empty initial snapshot. + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + verify(podOperations).create(podWithAttachedContainerForIdAndVolume(1)) + + // Mark executor as running, verify that subsequent allocation cycle is a no-op. + snapshotsStore.updatePod(runningExecutor(1)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podOperations, times(1)).create(any()) + verify(podOperations, never()).delete() + + // Request a new executor, make sure it's using reused PVC + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + verify(podOperations).create(podWithAttachedContainerForIdAndVolume(2)) + verify(persistentVolumeClaims, never()).create(any()) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)