[SPARK-27754][K8S] Introduce additional config (spark.kubernetes.driver.request.cores) for driver request cores for spark on k8s
## What changes were proposed in this pull request? Spark on k8s supports config for specifying the executor cpu requests (spark.kubernetes.executor.request.cores) but a similar config is missing for the driver. Instead, currently `spark.driver.cores` value is used for integer value. Although `pod spec` can have `cpu` for the fine-grained control like the following, this PR proposes additional configuration `spark.kubernetes.driver.request.cores` for driver request cores. ``` resources: requests: memory: "64Mi" cpu: "250m" ``` ## How was this patch tested? Unit tests Closes #24630 from arunmahadevan/SPARK-27754. Authored-by: Arun Mahadevan <arunm@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
9bca99b29b
commit
1a8c09334d
|
@ -793,6 +793,15 @@ See the [configuration page](configuration.html) for information on Spark config
|
|||
Interval between reports of the current Spark job status in cluster mode.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.kubernetes.driver.request.cores</code></td>
|
||||
<td>(none)</td>
|
||||
<td>
|
||||
Specify the cpu request for the driver pod. Values conform to the Kubernetes <a href="https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu">convention</a>.
|
||||
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in <a href="https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units">CPU units</a>.
|
||||
This takes precedence over <code>spark.driver.cores</code> for specifying the driver pod cpu request if set.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.kubernetes.driver.limit.cores</code></td>
|
||||
<td>(none)</td>
|
||||
|
|
|
@ -125,6 +125,12 @@ private[spark] object Config extends Logging {
|
|||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val KUBERNETES_DRIVER_REQUEST_CORES =
|
||||
ConfigBuilder("spark.kubernetes.driver.request.cores")
|
||||
.doc("Specify the cpu request for the driver pod")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val KUBERNETES_DRIVER_SUBMIT_CHECK =
|
||||
ConfigBuilder("spark.kubernetes.submitInDriver")
|
||||
.internal()
|
||||
|
|
|
@ -43,7 +43,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
|
|||
.getOrElse(throw new SparkException("Must specify the driver container image"))
|
||||
|
||||
// CPU settings
|
||||
private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
|
||||
private val driverCpuCores = conf.get(DRIVER_CORES)
|
||||
private val driverCoresRequest = conf
|
||||
.get(KUBERNETES_DRIVER_REQUEST_CORES)
|
||||
.getOrElse(driverCpuCores.toString)
|
||||
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)
|
||||
|
||||
// Memory settings
|
||||
|
@ -77,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
|
|||
}
|
||||
|
||||
val driverCpuQuantity = new QuantityBuilder(false)
|
||||
.withAmount(driverCpuCores)
|
||||
.withAmount(driverCoresRequest)
|
||||
.build()
|
||||
val driverMemoryQuantity = new QuantityBuilder(false)
|
||||
.withAmount(s"${driverMemoryWithOverheadMiB}Mi")
|
||||
|
|
|
@ -117,6 +117,38 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
|
|||
assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
|
||||
}
|
||||
|
||||
test("Check driver pod respects kubernetes driver request cores") {
|
||||
val sparkConf = new SparkConf()
|
||||
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
|
||||
.set(CONTAINER_IMAGE, "spark-driver:latest")
|
||||
|
||||
val basePod = SparkPod.initialPod()
|
||||
// if spark.driver.cores is not set default is 1
|
||||
val requests1 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
|
||||
.configurePod(basePod)
|
||||
.container.getResources
|
||||
.getRequests.asScala
|
||||
assert(requests1("cpu").getAmount === "1")
|
||||
|
||||
// if spark.driver.cores is set it should be used
|
||||
sparkConf.set(DRIVER_CORES, 10)
|
||||
val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
|
||||
.configurePod(basePod)
|
||||
.container.getResources
|
||||
.getRequests.asScala
|
||||
assert(requests2("cpu").getAmount === "10")
|
||||
|
||||
// spark.kubernetes.driver.request.cores should be preferred over spark.driver.cores
|
||||
Seq("0.1", "100m").foreach { value =>
|
||||
sparkConf.set(KUBERNETES_DRIVER_REQUEST_CORES, value)
|
||||
val requests3 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
|
||||
.configurePod(basePod)
|
||||
.container.getResources
|
||||
.getRequests.asScala
|
||||
assert(requests3("cpu").getAmount === value)
|
||||
}
|
||||
}
|
||||
|
||||
test("Check appropriate entrypoint rerouting for various bindings") {
|
||||
val javaSparkConf = new SparkConf()
|
||||
.set(DRIVER_MEMORY.key, "4g")
|
||||
|
|
Loading…
Reference in a new issue