[SPARK-35482][K8S] Use spark.blockManager.port not the wrong spark.blockmanager.port in BasicExecutorFeatureStep

### What changes were proposed in this pull request?

most spark conf keys are case sensitive, including `spark.blockManager.port`, we can not get the correct port number with `spark.blockmanager.port`.

This PR changes the wrong key to `spark.blockManager.port` in `BasicExecutorFeatureStep`.

This PR also ensures a fast fail when the port value is invalid for executor containers. When 0 is specified(it is valid as random port, but invalid as a k8s request), it should not be put in the `containerPort` field of executor pod desc. We do not expect executor pods to continuously fail to create because of invalid requests.

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #32621 from yaooqinn/SPARK-35482.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Kent Yao 2021-05-21 08:27:49 -07:00 committed by Dongjoon Hyun
parent d2bdd6595e
commit d957426351
2 changed files with 44 additions and 9 deletions

View file

@ -44,7 +44,10 @@ private[spark] class BasicExecutorFeatureStep(
.getOrElse(throw new SparkException("Must specify the executor container image"))
private val blockManagerPort = kubernetesConf
.sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
.getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
require(blockManagerPort == 0 || (1024 <= blockManagerPort && blockManagerPort < 65536),
"port number must be 0 or in [1024, 65535]")
private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix
@ -172,14 +175,17 @@ private[spark] class BasicExecutorFeatureStep(
.replaceAll(ENV_EXECUTOR_ID, kubernetesConf.executorId))
}
val requiredPorts = Seq(
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map { case (name, port) =>
new ContainerPortBuilder()
.withName(name)
.withContainerPort(port)
.build()
}
// 0 is invalid as kubernetes containerPort request, we shall leave it unmounted
val requiredPorts = if (blockManagerPort != 0) {
Seq(
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map { case (name, port) =>
new ContainerPortBuilder()
.withName(name)
.withContainerPort(port)
.build()
}
} else Nil
if (!isDefaultProfile) {
if (pod.container != null && pod.container.getResources() != null) {

View file

@ -341,6 +341,35 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
assert(!SecretVolumeUtils.podHasVolume(podConfigured.pod, SPARK_CONF_VOLUME_EXEC))
}
test("SPARK-35482: user correct block manager port for executor pods") {
try {
val initPod = SparkPod.initialPod()
val sm = new SecurityManager(baseConf)
val step1 =
new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile)
val containerPort1 = step1.configurePod(initPod).container.getPorts.get(0)
assert(containerPort1.getContainerPort === DEFAULT_BLOCKMANAGER_PORT,
s"should use port no. $DEFAULT_BLOCKMANAGER_PORT as default")
baseConf.set(BLOCK_MANAGER_PORT, 12345)
val step2 = new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile)
val containerPort2 = step2.configurePod(initPod).container.getPorts.get(0)
assert(containerPort2.getContainerPort === 12345)
baseConf.set(BLOCK_MANAGER_PORT, 1000)
val e = intercept[IllegalArgumentException] {
new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile)
}
assert(e.getMessage.contains("port number must be 0 or in [1024, 65535]"))
baseConf.set(BLOCK_MANAGER_PORT, 0)
val step3 = new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile)
assert(step3.configurePod(initPod).container.getPorts.isEmpty, "random port")
} finally {
baseConf.remove(BLOCK_MANAGER_PORT)
}
}
// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)