[SPARK-35493][K8S] make spark.blockManager.port fallback for spark.driver.blockManager.port as same as other cluster managers

### What changes were proposed in this pull request?

`spark.blockManager.port` does not work for k8s driver pods now, we should make it work as other cluster managers.

### Why are the changes needed?

`spark.blockManager.port` should be able to work for spark driver pod

### Does this PR introduce _any_ user-facing change?

yes, `spark.blockManager.port` will be respect iff it is present  && `spark.driver.blockManager.port` is absent

### How was this patch tested?

new tests

Closes #32639 from yaooqinn/SPARK-35493.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Kent Yao 2021-05-23 08:07:57 -07:00 committed by Dongjoon Hyun
parent 1a43415d8d
commit 96b0548ab6
2 changed files with 20 additions and 1 deletions

View file

@ -96,7 +96,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
val driverBlockManagerPort = conf.sparkConf.getInt(
DRIVER_BLOCK_MANAGER_PORT.key,
DEFAULT_BLOCKMANAGER_PORT
conf.sparkConf.getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
)
val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
val driverContainer = new ContainerBuilder(pod.container)

View file

@ -213,6 +213,25 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
}
}
test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") {
val initPod = SparkPod.initialPod()
val sparkConf = new SparkConf()
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(BLOCK_MANAGER_PORT, 1234)
val driverConf1 = KubernetesTestConf.createDriverConf(sparkConf)
val pod1 = new BasicDriverFeatureStep(driverConf1).configurePod(initPod)
val portMap1 =
pod1.container.getPorts.asScala.map { cp => (cp.getName -> cp.getContainerPort) }.toMap
assert(portMap1(BLOCK_MANAGER_PORT_NAME) === 1234, s"fallback to $BLOCK_MANAGER_PORT.key")
val driverConf2 =
KubernetesTestConf.createDriverConf(sparkConf.set(DRIVER_BLOCK_MANAGER_PORT, 1235))
val pod2 = new BasicDriverFeatureStep(driverConf2).configurePod(initPod)
val portMap2 =
pod2.container.getPorts.asScala.map { cp => (cp.getName -> cp.getContainerPort) }.toMap
assert(portMap2(BLOCK_MANAGER_PORT_NAME) === 1235)
}
def containerPort(name: String, portNumber: Int): ContainerPort =
new ContainerPortBuilder()
.withName(name)