[SPARK-35131][K8S] Support early driver service clean-up during app termination

### What changes were proposed in this pull request?

This PR aims to support a new configuration, `spark.kubernetes.driver.service.deleteOnTermination`, to clean up `Driver Service` resource during app termination.

### Why are the changes needed?

The K8s service is one of the important resources and sometimes it's controlled by quota.
```
$ k describe quota
Name:       service
Namespace:  default
Resource    Used  Hard
--------    ----  ----
services    1     3
```

Apache Spark creates a service for driver whose lifecycle is the same with driver pod.
It means a new Spark job submission fails if the number of completed Spark jobs equals the number of service quota.

**BEFORE**
```
$ k get pod
NAME                                                        READY   STATUS      RESTARTS   AGE
org-apache-spark-examples-sparkpi-a32c9278e7061b4d-driver   0/1     Completed   0          31m
org-apache-spark-examples-sparkpi-a9f1f578e721ef62-driver   0/1     Completed   0          78s

$ k get svc
NAME                                                            TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
kubernetes                                                      ClusterIP   10.96.0.1    <none>        443/TCP                      80m
org-apache-spark-examples-sparkpi-a32c9278e7061b4d-driver-svc   ClusterIP   None         <none>        7078/TCP,7079/TCP,4040/TCP   31m
org-apache-spark-examples-sparkpi-a9f1f578e721ef62-driver-svc   ClusterIP   None         <none>        7078/TCP,7079/TCP,4040/TCP   80s

$ k describe quota
Name:       service
Namespace:  default
Resource    Used  Hard
--------    ----  ----
services    3     3

$ bin/spark-submit...
Exception in thread "main" io.fabric8.kubernetes.client.KubernetesClientException:
Failure executing: POST at: https://192.168.64.50:8443/api/v1/namespaces/default/services.
Message: Forbidden! User minikube doesn't have permission.
services "org-apache-spark-examples-sparkpi-843f6978e722819c-driver-svc" is forbidden:
exceeded quota: service, requested: services=1, used: services=3, limited: services=3.
```

**AFTER**
```
$ k get pod
NAME                                                        READY   STATUS      RESTARTS   AGE
org-apache-spark-examples-sparkpi-23d5f278e77731a7-driver   0/1     Completed   0          26s
org-apache-spark-examples-sparkpi-d1292278e7768ed4-driver   0/1     Completed   0          67s
org-apache-spark-examples-sparkpi-e5bedf78e776ea9d-driver   0/1     Completed   0          44s

$ k get svc
NAME         TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)   AGE
kubernetes   ClusterIP   10.96.0.1    <none>        443/TCP   172m

$ k describe quota
Name:       service
Namespace:  default
Resource    Used  Hard
--------    ----  ----
services    1     3
```

### Does this PR introduce _any_ user-facing change?

Yes, this PR adds a new configuration, `spark.kubernetes.driver.service.deleteOnTermination`, and enables it by default.
The change is documented at the migration guide.

### How was this patch tested?

Pass the CIs.

This is tested with K8s IT manually.

```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- PVs with local storage
- Launcher client dependencies
- SPARK-33615: Launcher client archives
- SPARK-33748: Launcher python client respecting PYSPARK_PYTHON
- SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python
- Launcher python client dependencies using a zip file
- Test basic decommissioning
- Test basic decommissioning with shuffle cleanup
- Test decommissioning with dynamic allocation & shuffle cleanups
- Test decommissioning timeouts
- Run SparkR on simple dataframe.R example
Run completed in 19 minutes, 9 seconds.
Total number of tests run: 27
Suites: completed 2, aborted 0
Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes #32226 from dongjoon-hyun/SPARK-35131.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-04-19 12:11:08 -07:00
parent dc7d41eee9
commit 00f06dd267
5 changed files with 27 additions and 0 deletions

View file

@ -34,6 +34,8 @@ license: |
- In Spark 3.2, support for Apache Mesos as a resource manager is deprecated and will be removed in a future version. - In Spark 3.2, support for Apache Mesos as a resource manager is deprecated and will be removed in a future version.
- In Spark 3.2, Spark will delete K8s driver service resource when the application terminates by itself. To restore the behavior before Spark 3.2, you can set `spark.kubernetes.driver.service.deleteOnTermination` to `false`.
## Upgrading from Core 3.0 to 3.1 ## Upgrading from Core 3.0 to 3.1
- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.executor.allowSparkContext` when creating `SparkContext` in executors. - In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.executor.allowSparkContext` when creating `SparkContext` in executors.

View file

@ -54,6 +54,14 @@ private[spark] object Config extends Logging {
.stringConf .stringConf
.createWithDefault(KUBERNETES_MASTER_INTERNAL_URL) .createWithDefault(KUBERNETES_MASTER_INTERNAL_URL)
val KUBERNETES_DRIVER_SERVICE_DELETE_ON_TERMINATION =
ConfigBuilder("spark.kubernetes.driver.service.deleteOnTermination")
.doc("If true, driver service will be deleted on Spark application termination. " +
"If false, it will be cleaned up when the driver pod is deletion.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
val KUBERNETES_NAMESPACE = val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace") ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods.") .doc("The namespace that will be used for running the driver and executor pods.")

View file

@ -70,6 +70,7 @@ private[spark] class DriverServiceFeatureStep(
.withNewMetadata() .withNewMetadata()
.withName(resolvedServiceName) .withName(resolvedServiceName)
.addToAnnotations(kubernetesConf.serviceAnnotations.asJava) .addToAnnotations(kubernetesConf.serviceAnnotations.asJava)
.addToLabels(SPARK_APP_ID_LABEL, kubernetesConf.appId)
.endMetadata() .endMetadata()
.withNewSpec() .withNewSpec()
.withClusterIP("None") .withClusterIP("None")

View file

@ -58,6 +58,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
private val shouldDeleteDriverService = conf.get(KUBERNETES_DRIVER_SERVICE_DELETE_ON_TERMINATION)
private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
private val defaultProfile = scheduler.sc.resourceProfileManager.defaultResourceProfile private val defaultProfile = scheduler.sc.resourceProfileManager.defaultResourceProfile
@ -123,6 +125,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
pollEvents.stop() pollEvents.stop()
} }
if (shouldDeleteDriverService) {
Utils.tryLogNonFatalError {
kubernetesClient
.services()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.delete()
}
}
if (shouldDeleteExecutors) { if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError { Utils.tryLogNonFatalError {
kubernetesClient kubernetesClient

View file

@ -63,6 +63,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
8080, 8080,
4080, 4080,
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
kconf.appId,
driverService) driverService)
} }
@ -95,6 +96,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
DEFAULT_BLOCKMANAGER_PORT, DEFAULT_BLOCKMANAGER_PORT,
UI_PORT.defaultValue.get, UI_PORT.defaultValue.get,
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
kconf.appId,
resolvedService) resolvedService)
val additionalProps = configurationStep.getAdditionalPodSystemProperties() val additionalProps = configurationStep.getAdditionalPodSystemProperties()
assert(additionalProps(DRIVER_PORT.key) === DEFAULT_DRIVER_PORT.toString) assert(additionalProps(DRIVER_PORT.key) === DEFAULT_DRIVER_PORT.toString)
@ -163,8 +165,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
blockManagerPort: Int, blockManagerPort: Int,
drierUIPort: Int, drierUIPort: Int,
expectedServiceName: String, expectedServiceName: String,
appId: String,
service: Service): Unit = { service: Service): Unit = {
assert(service.getMetadata.getName === expectedServiceName) assert(service.getMetadata.getName === expectedServiceName)
assert(service.getMetadata.getLabels.containsKey(SPARK_APP_ID_LABEL) &&
service.getMetadata.getLabels.get(SPARK_APP_ID_LABEL).equals(appId))
assert(service.getSpec.getClusterIP === "None") assert(service.getSpec.getClusterIP === "None")
DRIVER_LABELS.foreach { case (k, v) => DRIVER_LABELS.foreach { case (k, v) =>
assert(service.getSpec.getSelector.get(k) === v) assert(service.getSpec.getSelector.get(k) === v)