[SPARK-35416][K8S] Support PersistentVolumeClaim Reuse

### What changes were proposed in this pull request?

This PR aims to add a new configuration, `spark.kubernetes.driver.reusePersistentVolumeClaim`, to reuse driver-owned `PersistentVolumeClaims` of the **deleted** executor pods.

Note also that `driver-owned PersistentVolumeClaims` is controlled by `spark.kubernetes.driver.ownPersistentVolumeClaim` which is recently added.

### Why are the changes needed?

PVC creations take some times. This feature can reduce it by reusing it.

For example, we can start `Pi` app with two executors with PVCs.
```
$ k logs -f pi | grep ExecutorPodsAllocator
21/05/16 23:36:32 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 0.
21/05/16 23:36:32 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs
21/05/16 23:36:32 INFO ExecutorPodsAllocator: Trying to create PersistentVolumeClaim pi-exec-1-pvc-0 with StorageClass scaleio
21/05/16 23:36:33 INFO ExecutorPodsAllocator: Trying to create PersistentVolumeClaim pi-exec-2-pvc-0 with StorageClass scaleio
```

After killing one executor, Spark is trying to look up the reusable PVCs, but the dead-executor's PVC may not returned yet because K8s works asynchronously. In this case, Spark is trying to create a new PVC as a normal operation.
```
21/05/16 23:38:51 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1.
21/05/16 23:38:51 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 2 PVCs
21/05/16 23:38:51 INFO ExecutorPodsAllocator: Trying to create PersistentVolumeClaim pi-exec-3-pvc-0 with StorageClass scaleio
```

After killing another executor, Spark found one reusable PVC, `pi-exec-1-pvc-0`, and reuse it.
```
21/05/16 23:39:18 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1.
21/05/16 23:39:18 INFO ExecutorPodsAllocator: Found 1 reusable PVCs from 3 PVCs
21/05/16 23:39:18 INFO ExecutorPodsAllocator: Reuse PersistentVolumeClaim pi-exec-1-pvc-0
```

In this case, we can easily notice the remounted PVC because `ClaimName`, `pi-exec-1-pvc-0`, doesn't have the prefix of pod name, `pi-exec-4`.
```
$ k describe pod pi-exec-4 | grep pi-exec-1-pvc-0
    ClaimName:  pi-exec-1-pvc-0
```

### Does this PR introduce _any_ user-facing change?

Yes, but this is a new feature which is disabled by the new conf.

### How was this patch tested?

Pass the CIs with the newly added test case.

K8S IT test also passed.
```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- Launcher client dependencies
- SPARK-33615: Launcher client archives
- SPARK-33748: Launcher python client respecting PYSPARK_PYTHON
- SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python
- Launcher python client dependencies using a zip file
- Test basic decommissioning
- Test basic decommissioning with shuffle cleanup
- Test decommissioning with dynamic allocation & shuffle cleanups
- Test decommissioning timeouts
- Run SparkR on simple dataframe.R example
Run completed in 17 minutes, 7 seconds.
Total number of tests run: 26
Suites: completed 2, aborted 0
Tests: succeeded 26, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  24:14 min
[INFO] Finished at: 2021-05-16T17:24:40-07:00
[INFO] ------------------------------------------------------------------------
```

Closes #32564 from dongjoon-hyun/SPARK-35416.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-05-17 00:20:48 -07:00
parent fb9316388a
commit 4c015555da
5 changed files with 204 additions and 7 deletions

View file

@ -70,6 +70,20 @@ private[spark] object Config extends Logging {
.booleanConf .booleanConf
.createWithDefault(false) .createWithDefault(false)
val KUBERNETES_DRIVER_REUSE_PVC =
ConfigBuilder("spark.kubernetes.driver.reusePersistentVolumeClaim")
.doc("If true, driver pod tries to reuse driver-owned on-demand persistent volume claims " +
"of the deleted executor pods if exists. This can be useful to reduce executor pod " +
"creation delay by skipping persistent volume creations. Note that a pod in " +
"`Terminating` pod status is not a deleted pod by definition and its resources " +
"including persistent volume claims are not reusable yet. Spark will create new " +
"persistent volume claims when there exists no reusable one. In other words, the total " +
"number of persistent volume claims can be larger than the number of running executors " +
s"sometimes. This config requires ${KUBERNETES_DRIVER_OWN_PVC.key}=true.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)
val KUBERNETES_NAMESPACE = val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace") ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods.") .doc("The namespace that will be used for running the driver and executor pods.")

View file

@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.util.control.NonFatal import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model.{PersistentVolumeClaim, PodBuilder} import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@ -125,6 +125,14 @@ private[spark] class ExecutorPodsAllocator(
newlyCreatedExecutors --= k8sKnownExecIds newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds
// Although we are going to delete some executors due to timeout in this function,
// it takes undefined time before the actual deletion. Hence, we should collect all PVCs
// in use at the beginning. False positive is okay in this context in order to be safe.
val k8sKnownPVCNames = snapshots.flatMap(_.executorPods.values.map(_.pod)).flatMap { pod =>
pod.getSpec.getVolumes.asScala
.flatMap { v => Option(v.getPersistentVolumeClaim).map(_.getClaimName) }
}
// transfer the scheduler backend known executor requests from the newlyCreatedExecutors // transfer the scheduler backend known executor requests from the newlyCreatedExecutors
// to the schedulerKnownNewlyCreatedExecs // to the schedulerKnownNewlyCreatedExecs
val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet
@ -280,7 +288,7 @@ private[spark] class ExecutorPodsAllocator(
if (newlyCreatedExecutorsForRpId.isEmpty if (newlyCreatedExecutorsForRpId.isEmpty
&& knownPodCount < targetNum) { && knownPodCount < targetNum) {
requestNewExecutors(targetNum, knownPodCount, applicationId, rpId) requestNewExecutors(targetNum, knownPodCount, applicationId, rpId, k8sKnownPVCNames)
} }
totalPendingCount += knownPendingCount totalPendingCount += knownPendingCount
@ -308,14 +316,35 @@ private[spark] class ExecutorPodsAllocator(
numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size) numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size)
} }
private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = {
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && conf.get(KUBERNETES_DRIVER_REUSE_PVC) &&
driverPod.nonEmpty) {
val createdPVCs = kubernetesClient
.persistentVolumeClaims
.withLabel("spark-app-selector", applicationId)
.list()
.getItems
.asScala
val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs")
reusablePVCs
} else {
mutable.Buffer.empty[PersistentVolumeClaim]
}
}
private def requestNewExecutors( private def requestNewExecutors(
expected: Int, expected: Int,
running: Int, running: Int,
applicationId: String, applicationId: String,
resourceProfileId: Int): Unit = { resourceProfileId: Int,
pvcsInUse: Seq[String]): Unit = {
val numExecutorsToAllocate = math.min(expected - running, podAllocationSize) val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
// Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
for ( _ <- 0 until numExecutorsToAllocate) { for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf( val executorConf = KubernetesConf.createExecutorConf(
@ -332,9 +361,10 @@ private[spark] class ExecutorPodsAllocator(
.addToContainers(executorPod.container) .addToContainers(executorPod.container)
.endSpec() .endSpec()
.build() .build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer) val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
try { try {
val resources = resolvedExecutorSpec.executorKubernetesResources
addOwnerReference(createdExecutorPod, resources) addOwnerReference(createdExecutorPod, resources)
resources resources
.filter(_.getKind == "PersistentVolumeClaim") .filter(_.getKind == "PersistentVolumeClaim")
@ -357,6 +387,36 @@ private[spark] class ExecutorPodsAllocator(
} }
} }
private def replacePVCsIfNeeded(
pod: Pod,
resources: Seq[HasMetadata],
reusablePVCs: mutable.Buffer[PersistentVolumeClaim]) = {
val replacedResources = mutable.ArrayBuffer[HasMetadata]()
resources.foreach {
case pvc: PersistentVolumeClaim =>
// Find one with the same storage class and size.
val index = reusablePVCs.indexWhere { p =>
p.getSpec.getStorageClassName == pvc.getSpec.getStorageClassName &&
p.getSpec.getResources.getRequests.get("storage") ==
pvc.getSpec.getResources.getRequests.get("storage")
}
if (index >= 0) {
val volume = pod.getSpec.getVolumes.asScala.find { v =>
v.getPersistentVolumeClaim != null &&
v.getPersistentVolumeClaim.getClaimName == pvc.getMetadata.getName
}
if (volume.nonEmpty) {
val matchedPVC = reusablePVCs.remove(index)
replacedResources.append(pvc)
logInfo(s"Reuse PersistentVolumeClaim ${matchedPVC.getMetadata.getName}")
volume.get.getPersistentVolumeClaim.setClaimName(matchedPVC.getMetadata.getName)
}
}
case _ => // no-op
}
resources.filterNot(replacedResources.contains)
}
private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = { private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = {
try { try {
val startTime = Instant.parse(state.pod.getStatus.getStartTime).toEpochMilli() val startTime = Instant.parse(state.pod.getStatus.getStartTime).toEpochMilli()

View file

@ -16,7 +16,7 @@
*/ */
package org.apache.spark.deploy.k8s package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, Pod, PodList} import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource} import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource}
object Fabric8Aliases { object Fabric8Aliases {
@ -28,4 +28,8 @@ object Fabric8Aliases {
type SINGLE_POD = PodResource[Pod] type SINGLE_POD = PodResource[Pod]
type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[ type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata] HasMetadata]
type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
Resource[PersistentVolumeClaim]]
type LABELED_PERSISTENT_VOLUME_CLAIMS =
FilterWatchListDeletable[PersistentVolumeClaim, PersistentVolumeClaimList]
} }

View file

@ -18,7 +18,9 @@ package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant import java.time.Instant
import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder} import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkPod import org.apache.spark.deploy.k8s.SparkPod
@ -170,6 +172,18 @@ object ExecutorLifecycleTestUtils {
podWithAttachedContainer podWithAttachedContainer
} }
def podWithAttachedContainerForIdAndVolume(
executorId: Long,
rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
val sparkPod = executorPodWithIdAndVolume(executorId, rpId)
val podWithAttachedContainer = new PodBuilder(sparkPod.pod)
.editOrNewSpec()
.addToContainers(sparkPod.container)
.endSpec()
.build()
podWithAttachedContainer
}
def executorPodWithId(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): SparkPod = { def executorPodWithId(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): SparkPod = {
val pod = new PodBuilder() val pod = new PodBuilder()
.withNewMetadata() .withNewMetadata()
@ -189,4 +203,32 @@ object ExecutorLifecycleTestUtils {
.build() .build()
SparkPod(pod, container) SparkPod(pod, container)
} }
def executorPodWithIdAndVolume(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID)
: SparkPod = {
val sparkPod = executorPodWithId(executorId, rpId)
sparkPod.pod.getSpec.getVolumes.add(new VolumeBuilder()
.withName("spark-volume")
.withPersistentVolumeClaim(new PersistentVolumeClaimVolumeSource("pvc-0", false))
.build())
sparkPod
}
def persistentVolumeClaim(claimName: String, storageClass: String, size: String)
: PersistentVolumeClaim = {
new PersistentVolumeClaimBuilder()
.withKind("PersistentVolumeClaim")
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass)
.withAccessModes("ReadWriteOnce")
.withResources(new ResourceRequirementsBuilder()
.withRequests(Map("storage" -> new Quantity(size)).asJava).build())
.endSpec()
.build()
}
} }

View file

@ -18,7 +18,9 @@ package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant import java.time.Instant
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations} import org.mockito.{Mock, MockitoAnnotations}
@ -72,6 +74,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
@Mock @Mock
private var podOperations: PODS = _ private var podOperations: PODS = _
@Mock
private var persistentVolumeClaims: PERSISTENT_VOLUME_CLAIMS = _
@Mock
private var labeledPersistentVolumeClaims: LABELED_PERSISTENT_VOLUME_CLAIMS = _
@Mock
private var persistentVolumeClaimList: PersistentVolumeClaimList = _
@Mock @Mock
private var labeledPods: LABELED_PODS = _ private var labeledPods: LABELED_PODS = _
@ -569,6 +580,72 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
verify(podOperations).create(podWithAttachedContainerForId(6)) verify(podOperations).create(podWithAttachedContainerForId(6))
} }
test("SPARK-35416: Support PersistentVolumeClaim Reuse") {
val prefix = "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1"
val confWithPVC = conf.clone
.set(KUBERNETES_DRIVER_OWN_PVC.key, "true")
.set(KUBERNETES_DRIVER_REUSE_PVC.key, "true")
.set(s"$prefix.mount.path", "/spark-local-dir")
.set(s"$prefix.mount.readOnly", "false")
.set(s"$prefix.option.claimName", "OnDemand")
.set(s"$prefix.option.sizeLimit", "200Gi")
.set(s"$prefix.option.storageClass", "gp2")
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
when(persistentVolumeClaims.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
when(persistentVolumeClaimList.getItems)
.thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient), any(classOf[ResourceProfile])))
.thenAnswer((invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
KubernetesExecutorSpec(
executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId),
Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")))
})
podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
when(podOperations
.withField("status.phase", "Pending"))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(podOperations)
when(podOperations
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
.thenReturn(podOperations)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
// Target 1 executor, make sure it's requested, even with an empty initial snapshot.
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
verify(podOperations).create(podWithAttachedContainerForIdAndVolume(1))
// Mark executor as running, verify that subsequent allocation cycle is a no-op.
snapshotsStore.updatePod(runningExecutor(1))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
verify(podOperations, times(1)).create(any())
verify(podOperations, never()).delete()
// Request a new executor, make sure it's using reused PVC
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
verify(podOperations).create(podWithAttachedContainerForIdAndVolume(2))
verify(persistentVolumeClaims, never()).create(any())
}
private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => { (invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)