[SPARK-36075][K8S] Support for specifiying executor/driver node selector

### What changes were proposed in this pull request?

Add the support for specifiying executor/driver node selector:
- spark.kubernetes.driver.node.selector.
- spark.kubernetes.executor.node.selector.

### Why are the changes needed?
Now we can only use "spark.kubernetes.node.selector" to set lable for executor/driver. Sometimes, we need set executor/driver pods to different selector separately.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
- KubernetesConfSuite for new added configure
- BasicDriverFeatureStepSuite to make sure driver pods node selector set properly
- BasicExecutorFeatureStepSuite to make sure excutor pods node selector set properly

Closes #33283 from Yikun/SPARK-36075.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Yikun Jiang 2021-07-18 15:59:34 -07:00 committed by Dongjoon Hyun
parent a9e2156ee5
commit f85855c115
8 changed files with 94 additions and 0 deletions

View file

@ -972,6 +972,28 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.node.selector.[labelKey]</code></td>
<td>(none)</td>
<td>
Adds to the driver node selector of the driver pod, with key <code>labelKey</code> and the value as the
configuration's value. For example, setting <code>spark.kubernetes.driver.node.selector.identifier</code> to <code>myIdentifier</code>
will result in the driver pod having a node selector with key <code>identifier</code> and value
<code>myIdentifier</code>. Multiple driver node selector keys can be added by setting multiple configurations with this prefix.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.node.selector.[labelKey]</code></td>
<td>(none)</td>
<td>
Adds to the executor node selector of the executor pods, with key <code>labelKey</code> and the value as the
configuration's value. For example, setting <code>spark.kubernetes.executor.node.selector.identifier</code> to <code>myIdentifier</code>
will result in the executors having a node selector with key <code>identifier</code> and value
<code>myIdentifier</code>. Multiple executor node selector keys can be added by setting multiple configurations with this prefix.
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>

View file

@ -514,6 +514,10 @@ private[spark] object Config extends Logging {
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
val KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX = "spark.kubernetes.driver.node.selector."
val KUBERNETES_EXECUTOR_NODE_SELECTOR_PREFIX = "spark.kubernetes.executor.node.selector."
val KUBERNETES_DELETE_EXECUTORS =
ConfigBuilder("spark.kubernetes.executor.deleteOnTermination")
.doc("If set to false then executor pods will not be deleted in case " +

View file

@ -82,6 +82,9 @@ private[spark] class KubernetesDriverConf(
val proxyUser: Option[String])
extends KubernetesConf(sparkConf) {
def driverNodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX)
override val resourceNamePrefix: String = {
val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None
custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
@ -137,6 +140,9 @@ private[spark] class KubernetesExecutorConf(
val resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID)
extends KubernetesConf(sparkConf) with Logging {
def executorNodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_NODE_SELECTOR_PREFIX)
override val resourceNamePrefix: String = {
get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX).getOrElse(
KubernetesConf.getResourceNamePrefix(appName))

View file

@ -147,6 +147,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.editOrNewSpec()
.withRestartPolicy("Never")
.addToNodeSelector(conf.nodeSelector.asJava)
.addToNodeSelector(conf.driverNodeSelector.asJava)
.addToImagePullSecrets(conf.imagePullSecrets: _*)
.endSpec()
.build()

View file

@ -271,6 +271,7 @@ private[spark] class BasicExecutorFeatureStep(
.withHostname(hostname)
.withRestartPolicy("Never")
.addToNodeSelector(kubernetesConf.nodeSelector.asJava)
.addToNodeSelector(kubernetesConf.executorNodeSelector.asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*)
val executorPod = if (disableConfigMap) {
executorPodBuilder.endSpec().build()

View file

@ -28,6 +28,15 @@ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
class KubernetesConfSuite extends SparkFunSuite {
private val APP_ARGS = Array("arg1", "arg2")
private val CUSTOM_NODE_SELECTOR = Map(
"nodeSelectorKey1" -> "nodeSelectorValue1",
"nodeSelectorKey2" -> "nodeSelectorValue2")
private val CUSTOM_DRIVER_NODE_SELECTOR = Map(
"driverNodeSelectorKey1" -> "driverNodeSelectorValue1",
"driverNodeSelectorKey2" -> "driverNodeSelectorValue2")
private val CUSTOM_EXECUTOR_NODE_SELECTOR = Map(
"execNodeSelectorKey1" -> "execNodeSelectorValue1",
"execNodeSelectorKey2" -> "execNodeSelectorValue2")
private val CUSTOM_LABELS = Map(
"customLabel1Key" -> "customLabel1Value",
"customLabel2Key" -> "customLabel2Value")
@ -170,4 +179,23 @@ class KubernetesConfSuite extends SparkFunSuite {
"executorEnvVars4-var4" -> "executorEnvVars4",
"executorEnvVars5-var5" -> "executorEnvVars5/var5"))
}
test("SPARK-36075: Set nodeSelector, driverNodeSelector, executorNodeSelect") {
val sparkConf = new SparkConf(false)
CUSTOM_NODE_SELECTOR.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_NODE_SELECTOR_PREFIX$key", value)
}
CUSTOM_DRIVER_NODE_SELECTOR.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX$key", value)
}
CUSTOM_EXECUTOR_NODE_SELECTOR.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_NODE_SELECTOR_PREFIX$key", value)
}
val execConf = KubernetesTestConf.createExecutorConf(sparkConf)
assert(execConf.nodeSelector === CUSTOM_NODE_SELECTOR)
assert(execConf.executorNodeSelector === CUSTOM_EXECUTOR_NODE_SELECTOR)
val driverConf = KubernetesTestConf.createDriverConf(sparkConf)
assert(driverConf.nodeSelector === CUSTOM_NODE_SELECTOR)
assert(driverConf.driverNodeSelector === CUSTOM_DRIVER_NODE_SELECTOR)
}
}

View file

@ -232,6 +232,21 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(portMap2(BLOCK_MANAGER_PORT_NAME) === 1235)
}
test("SPARK-36075: Check driver pod respects nodeSelector/driverNodeSelector") {
val initPod = SparkPod.initialPod()
val sparkConf = new SparkConf()
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(s"${KUBERNETES_NODE_SELECTOR_PREFIX}nodeLabelKey", "nodeLabelValue")
.set(s"${KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX}driverNodeLabelKey", "driverNodeLabelValue")
.set(s"${KUBERNETES_EXECUTOR_NODE_SELECTOR_PREFIX}execNodeLabelKey", "execNodeLabelValue")
val driverConf = KubernetesTestConf.createDriverConf(sparkConf)
val driver = new BasicDriverFeatureStep(driverConf).configurePod(initPod)
assert(driver.pod.getSpec.getNodeSelector.asScala === Map(
"nodeLabelKey" -> "nodeLabelValue",
"driverNodeLabelKey" -> "driverNodeLabelValue"
))
}
def containerPort(name: String, portNumber: Int): ContainerPort =
new ContainerPortBuilder()
.withName(name)

View file

@ -419,6 +419,23 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
}
test("SPARK-36075: Check executor pod respects nodeSelector/executorNodeSelector") {
val initPod = SparkPod.initialPod()
val sparkConf = new SparkConf()
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(s"${KUBERNETES_NODE_SELECTOR_PREFIX}nodeLabelKey", "nodeLabelValue")
.set(s"${KUBERNETES_EXECUTOR_NODE_SELECTOR_PREFIX}execNodeLabelKey", "execNodeLabelValue")
.set(s"${KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX}driverNodeLabelKey", "driverNodeLabelValue")
val executorConf = KubernetesTestConf.createExecutorConf(sparkConf)
val executor = new BasicExecutorFeatureStep(executorConf, new SecurityManager(baseConf),
defaultProfile).configurePod(initPod)
assert(executor.pod.getSpec.getNodeSelector.asScala === Map(
"nodeLabelKey" -> "nodeLabelValue",
"execNodeLabelKey" -> "execNodeLabelValue"
))
}
// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)