[SPARK-25355][K8S] Add proxy user to driver if present on spark-submit

### What changes were proposed in this pull request?

This PR adds the proxy user on the spark-submit command to the childArgs, so the proxy user can be retrieved and used in the KubernetesAplication to add the proxy user in the driver container args

### Why are the changes needed?

The proxy user when used on the spark submit doesn't work on the Kubernetes environment since it doesn't add the `--proxy-user` argument on the driver container and when I added it manually to the Pod definition it worked just fine.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tests were added

Closes #27422 from PedroRossi/SPARK-25355.

Authored-by: Pedro Rossi <pgrr@cin.ufpe.br>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Pedro Rossi 2020-03-16 21:53:58 -07:00 committed by Dongjoon Hyun
parent d7b97a1d0d
commit ed06d98044
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
8 changed files with 89 additions and 11 deletions

View file

@ -773,6 +773,10 @@ private[spark] class SparkSubmit extends Logging {
childArgs += ("--arg", arg)
}
}
// Pass the proxyUser to the k8s app so it is possible to add it to the driver args
if (args.proxyUser != null) {
childArgs += ("--proxy-user", args.proxyUser)
}
}
// Load any properties specified through --conf and the default properties file

View file

@ -429,6 +429,7 @@ class SparkSubmitSuite
test("handles k8s cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--proxy-user", "test.user",
"--master", "k8s://host:port",
"--executor-memory", "5g",
"--class", "org.SomeClass",
@ -444,6 +445,7 @@ class SparkSubmitSuite
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
childArgsMap.get("--arg") should be (Some("arg1"))
childArgsMap.get("--proxy-user") should be (Some("test.user"))
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
classpath should have length (0)
conf.get("spark.master") should be ("k8s://https://host:port")

View file

@ -77,7 +77,8 @@ private[spark] class KubernetesDriverConf(
val appId: String,
val mainAppResource: MainAppResource,
val mainClass: String,
val appArgs: Array[String])
val appArgs: Array[String],
val proxyUser: Option[String])
extends KubernetesConf(sparkConf) {
override val resourceNamePrefix: String = {
@ -193,11 +194,18 @@ private[spark] object KubernetesConf {
appId: String,
mainAppResource: MainAppResource,
mainClass: String,
appArgs: Array[String]): KubernetesDriverConf = {
appArgs: Array[String],
proxyUser: Option[String]): KubernetesDriverConf = {
// Parse executor volumes in order to verify configuration before the driver pod is created.
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, mainClass, appArgs)
new KubernetesDriverConf(
sparkConf.clone(),
appId,
mainAppResource,
mainClass,
appArgs,
proxyUser)
}
def createExecutorConf(

View file

@ -92,8 +92,14 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
} else {
resource
}
var proxyUserArgs = Seq[String]()
if (!conf.proxyUser.isEmpty) {
proxyUserArgs = proxyUserArgs :+ "--proxy-user"
proxyUserArgs = proxyUserArgs :+ conf.proxyUser.get
}
new ContainerBuilder(pod.container)
.addToArgs("driver")
.addToArgs(proxyUserArgs: _*)
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", conf.mainClass)
.addToArgs(resolvedResource)

View file

@ -43,7 +43,8 @@ import org.apache.spark.util.Utils
private[spark] case class ClientArguments(
mainAppResource: MainAppResource,
mainClass: String,
driverArgs: Array[String])
driverArgs: Array[String],
proxyUser: Option[String])
private[spark] object ClientArguments {
@ -51,6 +52,7 @@ private[spark] object ClientArguments {
var mainAppResource: MainAppResource = JavaMainAppResource(None)
var mainClass: Option[String] = None
val driverArgs = mutable.ArrayBuffer.empty[String]
var proxyUser: Option[String] = None
args.sliding(2, 2).toList.foreach {
case Array("--primary-java-resource", primaryJavaResource: String) =>
@ -63,6 +65,8 @@ private[spark] object ClientArguments {
mainClass = Some(clazz)
case Array("--arg", arg: String) =>
driverArgs += arg
case Array("--proxy-user", user: String) =>
proxyUser = Some(user)
case other =>
val invalid = other.mkString(" ")
throw new RuntimeException(s"Unknown arguments: $invalid")
@ -73,7 +77,8 @@ private[spark] object ClientArguments {
ClientArguments(
mainAppResource,
mainClass.get,
driverArgs.toArray)
driverArgs.toArray,
proxyUser)
}
}
@ -199,7 +204,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
kubernetesAppId,
clientArguments.mainAppResource,
clientArguments.mainClass,
clientArguments.driverArgs)
clientArguments.driverArgs,
clientArguments.proxyUser)
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))

View file

@ -75,7 +75,8 @@ class KubernetesConfSuite extends SparkFunSuite {
KubernetesTestConf.APP_ID,
JavaMainAppResource(None),
KubernetesTestConf.MAIN_CLASS,
APP_ARGS)
APP_ARGS,
None)
assert(conf.labels === Map(
SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++

View file

@ -50,7 +50,8 @@ object KubernetesTestConf {
annotations: Map[String, String] = Map.empty,
secretEnvNamesToKeyRefs: Map[String, String] = Map.empty,
secretNamesToMountPaths: Map[String, String] = Map.empty,
volumes: Seq[KubernetesVolumeSpec] = Seq.empty): KubernetesDriverConf = {
volumes: Seq[KubernetesVolumeSpec] = Seq.empty,
proxyUser: Option[String] = None): KubernetesDriverConf = {
val conf = sparkConf.clone()
resourceNamePrefix.foreach { prefix =>
@ -63,7 +64,7 @@ object KubernetesTestConf {
setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, secretEnvNamesToKeyRefs)
setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes)
new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs)
new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, proxyUser)
}
// scalastyle:on argcount

View file

@ -76,14 +76,64 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
mainResource, "5", "7", "9"))
}
test("SPARK-25355: java resource args with proxy-user") {
val mainResource = "local:/main.jar"
val spec = applyFeatureStep(
JavaMainAppResource(Some(mainResource)),
appArgs = Array("5", "7"),
proxyUser = Some("test.user"))
assert(spec.pod.container.getArgs.asScala === List(
"driver",
"--proxy-user", "test.user",
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
mainResource, "5", "7"))
}
test("SPARK-25355: python resource args with proxy-user") {
val mainResource = "local:/main.py"
val sparkConf = new SparkConf(false)
.set(PYSPARK_MAJOR_PYTHON_VERSION, "2")
val spec = applyFeatureStep(
PythonMainAppResource(mainResource),
conf = sparkConf,
appArgs = Array("5", "7", "9"),
proxyUser = Some("test.user"))
assert(spec.pod.container.getArgs.asScala === List(
"driver",
"--proxy-user", "test.user",
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
mainResource, "5", "7", "9"))
}
test("SPARK-25355: R resource args with proxy-user") {
val mainResource = "local:/main.R"
val spec = applyFeatureStep(
RMainAppResource(mainResource),
appArgs = Array("5", "7", "9"),
proxyUser = Some("test.user"))
assert(spec.pod.container.getArgs.asScala === List(
"driver",
"--proxy-user", "test.user",
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
mainResource, "5", "7", "9"))
}
private def applyFeatureStep(
resource: MainAppResource,
conf: SparkConf = new SparkConf(false),
appArgs: Array[String] = Array()): KubernetesDriverSpec = {
appArgs: Array[String] = Array(),
proxyUser: Option[String] = None): KubernetesDriverSpec = {
val kubernetesConf = KubernetesTestConf.createDriverConf(
sparkConf = conf,
mainAppResource = resource,
appArgs = appArgs)
appArgs = appArgs,
proxyUser = proxyUser)
val step = new DriverCommandFeatureStep(kubernetesConf)
val pod = step.configurePod(SparkPod.initialPod())
val props = step.getAdditionalPodSystemProperties()