[SPARK-34948][K8S] Add ownerReference to executor configmap to fix leakages

### What changes were proposed in this pull request?

This PR aims to add `ownerReference` to the executor ConfigMap to fix leakage.

### Why are the changes needed?

SPARK-30985 maintains the executor config map explicitly inside Spark. However, this config map can be leaked when Spark drivers die accidentally or are killed by K8s. We need to add `ownerReference` to make K8s do the garbage collection these automatically.

The number of ConfigMap is one of the resource quota. So, the leaked configMaps currently cause Spark jobs submission failures.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs and check manually.

K8s IT is tested manually.
```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- PVs with local storage
- Launcher client dependencies
- SPARK-33615: Launcher client archives
- SPARK-33748: Launcher python client respecting PYSPARK_PYTHON
- SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python
- Launcher python client dependencies using a zip file
- Test basic decommissioning
- Test basic decommissioning with shuffle cleanup
- Test decommissioning with dynamic allocation & shuffle cleanups
- Test decommissioning timeouts
- Run SparkR on simple dataframe.R example
Run completed in 19 minutes, 2 seconds.
Total number of tests run: 27
Suites: completed 2, aborted 0
Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

**BEFORE**
```
$ k get cm spark-exec-450b417895b3b2c7-conf-map -oyaml | grep ownerReferences
```

**AFTER**
```
$ k get cm spark-exec-bb37a27895b1c26c-conf-map -oyaml | grep ownerReferences
        f:ownerReferences:
```

Closes #32042 from dongjoon-hyun/SPARK-34948.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-04-03 00:00:17 -07:00
parent f1d42bb68d
commit a42dc93a2a
3 changed files with 7 additions and 3 deletions

View file

@ -70,7 +70,7 @@ private[spark] class ExecutorPodsAllocator(
private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
private val driverPod = kubernetesDriverPodName
val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.withName(name)
.get())

View file

@ -20,11 +20,13 @@ import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import scala.concurrent.Future
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
@ -67,13 +69,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}
private def setUpExecutorConfigMap(): Unit = {
private def setUpExecutorConfigMap(driverPod: Option[Pod]): Unit = {
val configMapName = KubernetesClientUtils.configMapNameExecutor
val confFilesMap = KubernetesClientUtils
.buildSparkConfDirFilesMap(configMapName, conf, Map.empty)
val labels =
Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels)
KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap))
kubernetesClient.configMaps().create(configMap)
}
@ -97,7 +100,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
watchEvents.start(applicationId())
pollEvents.start(applicationId())
if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) {
setUpExecutorConfigMap()
setUpExecutorConfigMap(podAllocator.driverPod)
}
}

View file

@ -112,6 +112,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(kubernetesClient.configMaps()).thenReturn(configMapsOperations)
when(podAllocator.driverPod).thenReturn(None)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
taskScheduler,
sc,