[SPARK-27061][K8S] Expose Driver UI port on driver service to access …

## What changes were proposed in this pull request?

Expose Spark UI port on driver service to access logs from service.

## How was this patch tested?

The patch was tested using unit tests being contributed as a part of the PR

Closes #23990 from chandulal/SPARK-27061.

Authored-by: chandulal.kavar <cckavar@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
chandulal.kavar 2019-03-11 10:41:31 -07:00 committed by Marcelo Vanzin
parent 31878c9daa
commit d4542a8ba8
2 changed files with 16 additions and 2 deletions

View file

@ -54,6 +54,7 @@ private[spark] class DriverServiceFeatureStep(
config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt(
config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
private val driverUIPort = kubernetesConf.get(config.UI.UI_PORT)
override def configurePod(pod: SparkPod): SparkPod = pod
@ -82,6 +83,11 @@ private[spark] class DriverServiceFeatureStep(
.withPort(driverBlockManagerPort)
.withNewTargetPort(driverBlockManagerPort)
.endPort()
.addNewPort()
.withName(UI_PORT_NAME)
.withPort(driverUIPort)
.withNewTargetPort(driverUIPort)
.endPort()
.endSpec()
.build()
Seq(driverService)

View file

@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.util.ManualClock
class DriverServiceFeatureStepSuite extends SparkFunSuite {
@ -38,10 +39,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
"label1key" -> "label1value",
"label2key" -> "label2value")
test("Headless service has a port for the driver RPC and the block manager.") {
test("Headless service has a port for the driver RPC, the block manager and driver ui.") {
val sparkConf = new SparkConf(false)
.set(DRIVER_PORT, 9000)
.set(DRIVER_BLOCK_MANAGER_PORT, 8080)
.set(UI_PORT, 4080)
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = DRIVER_LABELS)
@ -56,6 +58,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
verifyService(
9000,
8080,
4080,
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
driverService)
}
@ -85,6 +88,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
verifyService(
DEFAULT_DRIVER_PORT,
DEFAULT_BLOCKMANAGER_PORT,
UI_PORT.defaultValue.get,
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
resolvedService)
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
@ -152,6 +156,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
private def verifyService(
driverPort: Int,
blockManagerPort: Int,
drierUIPort: Int,
expectedServiceName: String,
service: Service): Unit = {
assert(service.getMetadata.getName === expectedServiceName)
@ -159,7 +164,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS.foreach { case (k, v) =>
assert(service.getSpec.getSelector.get(k) === v)
}
assert(service.getSpec.getPorts.size() === 2)
assert(service.getSpec.getPorts.size() === 3)
val driverServicePorts = service.getSpec.getPorts.asScala
assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
assert(driverServicePorts.head.getPort.intValue() === driverPort)
@ -167,5 +172,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
assert(driverServicePorts(2).getName === UI_PORT_NAME)
assert(driverServicePorts(2).getPort.intValue() === drierUIPort)
assert(driverServicePorts(2).getTargetPort.getIntVal === drierUIPort)
}
}