[SPARK-26194][K8S] Auto generate auth secret for k8s apps.

This change modifies the logic in the SecurityManager to do two
things:

- generate unique app secrets also when k8s is being used
- only store the secret in the user's UGI on YARN

The latter is needed so that k8s won't unnecessarily create
k8s secrets for the UGI credentials when only the auth token
is stored there.

On the k8s side, the secret is propagated to executors using
an environment variable instead. This ensures it works in both
client and cluster mode.

Security doc was updated to mention the feature and clarify that
proper access control in k8s should be enabled for it to be secure.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #23174 from vanzin/SPARK-26194.
This commit is contained in:
Marcelo Vanzin 2018-12-06 14:17:13 -08:00 committed by mcheah
parent b14a26ee57
commit dbd90e5440
13 changed files with 205 additions and 113 deletions

View file

@ -348,15 +348,23 @@ private[spark] class SecurityManager(
*/ */
def initializeAuth(): Unit = { def initializeAuth(): Unit = {
import SparkMasterRegex._ import SparkMasterRegex._
val k8sRegex = "k8s.*".r
if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
return return
} }
// TODO: this really should be abstracted somewhere else.
val master = sparkConf.get(SparkLauncher.SPARK_MASTER, "") val master = sparkConf.get(SparkLauncher.SPARK_MASTER, "")
master match { val storeInUgi = master match {
case "yarn" | "local" | LOCAL_N_REGEX(_) | LOCAL_N_FAILURES_REGEX(_, _) => case "yarn" | "local" | LOCAL_N_REGEX(_) | LOCAL_N_FAILURES_REGEX(_, _) =>
// Secret generation allowed here true
case k8sRegex() =>
// Don't propagate the secret through the user's credentials in kubernetes. That conflicts
// with the way k8s handles propagation of delegation tokens.
false
case _ => case _ =>
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF), require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.") s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
@ -364,9 +372,12 @@ private[spark] class SecurityManager(
} }
secretKey = Utils.createSecret(sparkConf) secretKey = Utils.createSecret(sparkConf)
val creds = new Credentials()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8)) if (storeInUgi) {
UserGroupInformation.getCurrentUser().addCredentials(creds) val creds = new Credentials()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
UserGroupInformation.getCurrentUser().addCredentials(creds)
}
} }
// Default SecurityManager only has a single secret key, so ignore appId. // Default SecurityManager only has a single secret key, so ignore appId.

View file

@ -395,15 +395,23 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
assert(keyFromEnv === new SecurityManager(conf2).getSecretKey()) assert(keyFromEnv === new SecurityManager(conf2).getSecretKey())
} }
test("secret key generation") { // How is the secret expected to be generated and stored.
Seq( object SecretTestType extends Enumeration {
("yarn", true), val MANUAL, AUTO, UGI = Value
("local", true), }
("local[*]", true),
("local[1, 2]", true), import SecretTestType._
("local-cluster[2, 1, 1024]", false),
("invalid", false) Seq(
).foreach { case (master, shouldGenerateSecret) => ("yarn", UGI),
("local", UGI),
("local[*]", UGI),
("local[1, 2]", UGI),
("k8s://127.0.0.1", AUTO),
("local-cluster[2, 1, 1024]", MANUAL),
("invalid", MANUAL)
).foreach { case (master, secretType) =>
test(s"secret key generation: master '$master'") {
val conf = new SparkConf() val conf = new SparkConf()
.set(NETWORK_AUTH_ENABLED, true) .set(NETWORK_AUTH_ENABLED, true)
.set(SparkLauncher.SPARK_MASTER, master) .set(SparkLauncher.SPARK_MASTER, master)
@ -412,19 +420,26 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
UserGroupInformation.createUserForTesting("authTest", Array()).doAs( UserGroupInformation.createUserForTesting("authTest", Array()).doAs(
new PrivilegedExceptionAction[Unit]() { new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = { override def run(): Unit = {
if (shouldGenerateSecret) { secretType match {
mgr.initializeAuth() case UGI =>
val creds = UserGroupInformation.getCurrentUser().getCredentials()
val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
assert(secret != null)
assert(new String(secret, UTF_8) === mgr.getSecretKey())
} else {
intercept[IllegalArgumentException] {
mgr.initializeAuth() mgr.initializeAuth()
} val creds = UserGroupInformation.getCurrentUser().getCredentials()
intercept[IllegalArgumentException] { val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
mgr.getSecretKey() assert(secret != null)
} assert(new String(secret, UTF_8) === mgr.getSecretKey())
case AUTO =>
mgr.initializeAuth()
val creds = UserGroupInformation.getCurrentUser().getCredentials()
assert(creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY) === null)
case MANUAL =>
intercept[IllegalArgumentException] {
mgr.initializeAuth()
}
intercept[IllegalArgumentException] {
mgr.getSecretKey()
}
} }
} }
} }

View file

@ -26,21 +26,29 @@ not documented, Spark does not support.
Spark currently supports authentication for RPC channels using a shared secret. Authentication can Spark currently supports authentication for RPC channels using a shared secret. Authentication can
be turned on by setting the `spark.authenticate` configuration parameter. be turned on by setting the `spark.authenticate` configuration parameter.
The exact mechanism used to generate and distribute the shared secret is deployment-specific. The exact mechanism used to generate and distribute the shared secret is deployment-specific. Unless
specified below, the secret must be defined by setting the `spark.authenticate.secret` config
option. The same secret is shared by all Spark applications and daemons in that case, which limits
the security of these deployments, especially on multi-tenant clusters.
For Spark on [YARN](running-on-yarn.html) and local deployments, Spark will automatically handle The REST Submission Server and the MesosClusterDispatcher do not support authentication. You should
generating and distributing the shared secret. Each application will use a unique shared secret. In ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077
respectively by default) are restricted to hosts that are trusted to submit jobs.
### YARN
For Spark on [YARN](running-on-yarn.html), Spark will automatically handle generating and
distributing the shared secret. Each application will use a unique shared secret. In
the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of
secrets to be secure. secrets to be secure.
For other resource managers, `spark.authenticate.secret` must be configured on each of the nodes. ### Kubernetes
This secret will be shared by all the daemons and applications, so this deployment configuration is
not as secure as the above, especially when considering multi-tenant clusters. In this
configuration, a user with the secret can effectively impersonate any other user.
The Rest Submission Server and the MesosClusterDispatcher do not support authentication. You should On Kubernetes, Spark will also automatically generate an authentication secret unique to each
ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077 application. The secret is propagated to executor pods using environment variables. This means
respectively by default) are restricted to hosts that are trusted to submit jobs. that any user that can list pods in the namespace where the Spark application is running can
also see their authentication secret. Access control rules should be properly set up by the
Kubernetes admin to ensure that Spark authentication is secure.
<table class="table"> <table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
@ -738,10 +746,10 @@ tokens for supported will be created.
## Secure Interaction with Kubernetes ## Secure Interaction with Kubernetes
When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens
so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are
shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job: shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job:
In all cases you must define the environment variable: `HADOOP_CONF_DIR` or In all cases you must define the environment variable: `HADOOP_CONF_DIR` or
`spark.kubernetes.hadoop.configMapName.` `spark.kubernetes.hadoop.configMapName.`
It also important to note that the KDC needs to be visible from inside the containers. It also important to note that the KDC needs to be visible from inside the containers.

View file

@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model._
import org.apache.spark.SparkException import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
@ -29,11 +29,12 @@ import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutorConf) private[spark] class BasicExecutorFeatureStep(
kubernetesConf: KubernetesExecutorConf,
secMgr: SecurityManager)
extends KubernetesFeatureConfigStep { extends KubernetesFeatureConfigStep {
// Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH)
private val executorContainerImage = kubernetesConf private val executorContainerImage = kubernetesConf
.get(EXECUTOR_CONTAINER_IMAGE) .get(EXECUTOR_CONTAINER_IMAGE)
.getOrElse(throw new SparkException("Must specify the executor container image")) .getOrElse(throw new SparkException("Must specify the executor container image"))
@ -87,44 +88,61 @@ private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutor
val executorCpuQuantity = new QuantityBuilder(false) val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCoresRequest) .withAmount(executorCoresRequest)
.build() .build()
val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
new EnvVarBuilder() val executorEnv: Seq[EnvVar] = {
.withName(ENV_CLASSPATH) (Seq(
.withValue(cp) (ENV_DRIVER_URL, driverUrl),
.build() (ENV_EXECUTOR_CORES, executorCores.toString),
} (ENV_EXECUTOR_MEMORY, executorMemoryString),
val executorExtraJavaOptionsEnv = kubernetesConf (ENV_APPLICATION_ID, kubernetesConf.appId),
.get(EXECUTOR_JAVA_OPTIONS) // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
.map { opts => (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId, (ENV_EXECUTOR_ID, kubernetesConf.executorId)
kubernetesConf.executorId) ) ++ kubernetesConf.environment).map { case (k, v) =>
val delimitedOpts = Utils.splitCommandString(subsOpts) new EnvVarBuilder()
delimitedOpts.zipWithIndex.map { .withName(k)
case (opt, index) => .withValue(v)
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() .build()
} }
}.getOrElse(Seq.empty[EnvVar]) } ++ {
val executorEnv = (Seq( Seq(new EnvVarBuilder()
(ENV_DRIVER_URL, driverUrl), .withName(ENV_EXECUTOR_POD_IP)
(ENV_EXECUTOR_CORES, executorCores.toString), .withValueFrom(new EnvVarSourceBuilder()
(ENV_EXECUTOR_MEMORY, executorMemoryString), .withNewFieldRef("v1", "status.podIP")
(ENV_APPLICATION_ID, kubernetesConf.appId), .build())
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
(ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
(ENV_EXECUTOR_ID, kubernetesConf.executorId)) ++
kubernetesConf.environment)
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
) ++ Seq(
new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_IP)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build()) .build())
.build() } ++ {
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq Option(secMgr.getSecretKey()).map { authSecret =>
new EnvVarBuilder()
.withName(SecurityManager.ENV_AUTH_SECRET)
.withValue(authSecret)
.build()
}
} ++ {
kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
} ++ {
val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
Utils.splitCommandString(subsOpts)
}
val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)
(userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
new EnvVarBuilder()
.withName(s"$ENV_JAVA_OPT_PREFIX$index")
.withValue(opt)
.build()
}
}
val requiredPorts = Seq( val requiredPorts = Seq(
(BLOCK_MANAGER_PORT_NAME, blockManagerPort)) (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map { case (name, port) => .map { case (name, port) =>

View file

@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.mutable import scala.collection.mutable
import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf import org.apache.spark.deploy.k8s.KubernetesConf
@ -31,6 +31,7 @@ import org.apache.spark.util.{Clock, Utils}
private[spark] class ExecutorPodsAllocator( private[spark] class ExecutorPodsAllocator(
conf: SparkConf, conf: SparkConf,
secMgr: SecurityManager,
executorBuilder: KubernetesExecutorBuilder, executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient, kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore, snapshotsStore: ExecutorPodsSnapshotsStore,
@ -135,7 +136,7 @@ private[spark] class ExecutorPodsAllocator(
newExecutorId.toString, newExecutorId.toString,
applicationId, applicationId,
driverPod) driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf) val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr)
val podWithAttachedContainer = new PodBuilder(executorPod.pod) val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec() .editOrNewSpec()
.addToContainers(executorPod.container) .addToContainers(executorPod.container)

View file

@ -94,6 +94,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorPodsAllocator = new ExecutorPodsAllocator( val executorPodsAllocator = new ExecutorPodsAllocator(
sc.conf, sc.conf,
sc.env.securityManager,
KubernetesExecutorBuilder(kubernetesClient, sc.conf), KubernetesExecutorBuilder(kubernetesClient, sc.conf),
kubernetesClient, kubernetesClient,
snapshotsStore, snapshotsStore,
@ -110,7 +111,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
new KubernetesClusterSchedulerBackend( new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl], scheduler.asInstanceOf[TaskSchedulerImpl],
sc.env.rpcEnv, sc,
kubernetesClient, kubernetesClient,
requestExecutorsService, requestExecutorsService,
snapshotsStore, snapshotsStore,

View file

@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.rpc.{RpcAddress, RpcEnv}
@ -30,7 +31,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class KubernetesClusterSchedulerBackend( private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl, scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv, sc: SparkContext,
kubernetesClient: KubernetesClient, kubernetesClient: KubernetesClient,
requestExecutorsService: ExecutorService, requestExecutorsService: ExecutorService,
snapshotsStore: ExecutorPodsSnapshotsStore, snapshotsStore: ExecutorPodsSnapshotsStore,
@ -38,7 +39,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
lifecycleEventHandler: ExecutorPodsLifecycleManager, lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource, watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource) pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService) requestExecutorsService)

View file

@ -20,14 +20,14 @@ import java.io.File
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.SparkConf import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features._
private[spark] class KubernetesExecutorBuilder( private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesExecutorConf => BasicExecutorFeatureStep) = provideBasicStep: (KubernetesExecutorConf, SecurityManager) => BasicExecutorFeatureStep =
new BasicExecutorFeatureStep(_), new BasicExecutorFeatureStep(_, _),
provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) = provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_), new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) = provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
@ -44,13 +44,16 @@ private[spark] class KubernetesExecutorBuilder(
new HadoopSparkUserExecutorFeatureStep(_), new HadoopSparkUserExecutorFeatureStep(_),
provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) { provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
def buildFromFeatures(kubernetesConf: KubernetesExecutorConf): SparkPod = { def buildFromFeatures(
kubernetesConf: KubernetesExecutorConf,
secMgr: SecurityManager): SparkPod = {
val sparkConf = kubernetesConf.sparkConf val sparkConf = kubernetesConf.sparkConf
val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME) val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME)
val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY) val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY)
val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) val baseFeatures = Seq(provideBasicStep(kubernetesConf, secMgr),
provideLocalDirsStep(kubernetesConf))
val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) { val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
Seq(provideSecretsStep(kubernetesConf)) Seq(provideSecretsStep(kubernetesConf))
} else Nil } else Nil

View file

@ -21,13 +21,14 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model._
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
@ -63,7 +64,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
private var baseConf: SparkConf = _ private var baseConf: SparkConf = _
before { before {
baseConf = new SparkConf() baseConf = new SparkConf(false)
.set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME) .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX)
.set(CONTAINER_IMAGE, EXECUTOR_IMAGE) .set(CONTAINER_IMAGE, EXECUTOR_IMAGE)
@ -84,7 +85,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
} }
test("basic executor pod has reasonable defaults") { test("basic executor pod has reasonable defaults") {
val step = new BasicExecutorFeatureStep(newExecutorConf()) val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
// The executor pod name and default labels. // The executor pod name and default labels.
@ -106,7 +107,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
assert(executor.pod.getSpec.getNodeSelector.isEmpty) assert(executor.pod.getSpec.getNodeSelector.isEmpty)
assert(executor.pod.getSpec.getVolumes.isEmpty) assert(executor.pod.getSpec.getVolumes.isEmpty)
checkEnv(executor, Map()) checkEnv(executor, baseConf, Map())
checkOwnerReferences(executor.pod, DRIVER_POD_UID) checkOwnerReferences(executor.pod, DRIVER_POD_UID)
} }
@ -114,7 +115,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple" val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple"
baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix) baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix)
val step = new BasicExecutorFeatureStep(newExecutorConf()) val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
} }
@ -122,10 +123,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar") baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar")
baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz") baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz")
val kconf = newExecutorConf(environment = Map("qux" -> "quux")) val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
val step = new BasicExecutorFeatureStep(kconf) val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf))
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, checkEnv(executor, baseConf,
Map("SPARK_JAVA_OPT_0" -> "foo=bar", Map("SPARK_JAVA_OPT_0" -> "foo=bar",
ENV_CLASSPATH -> "bar=baz", ENV_CLASSPATH -> "bar=baz",
"qux" -> "quux")) "qux" -> "quux"))
@ -136,12 +137,27 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
baseConf.set("spark.kubernetes.resource.type", "python") baseConf.set("spark.kubernetes.resource.type", "python")
baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L) baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)
val step = new BasicExecutorFeatureStep(newExecutorConf()) val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
// This is checking that basic executor + executorMemory = 1408 + 42 = 1450 // This is checking that basic executor + executorMemory = 1408 + 42 = 1450
assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi") assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi")
} }
test("auth secret propagation") {
val conf = baseConf.clone()
.set(NETWORK_AUTH_ENABLED, true)
.set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf)
secMgr.initializeAuth()
val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
secMgr)
val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey()))
}
// There is always exactly one controller reference, and it points to the driver pod. // There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1) assert(executor.getMetadata.getOwnerReferences.size() === 1)
@ -150,7 +166,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
} }
// Check that the expected environment variables are present. // Check that the expected environment variables are present.
private def checkEnv(executorPod: SparkPod, additionalEnvVars: Map[String, String]): Unit = { private def checkEnv(
executorPod: SparkPod,
conf: SparkConf,
additionalEnvVars: Map[String, String]): Unit = {
val defaultEnvs = Map( val defaultEnvs = Map(
ENV_EXECUTOR_ID -> "1", ENV_EXECUTOR_ID -> "1",
ENV_DRIVER_URL -> DRIVER_ADDRESS.toString, ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
@ -160,10 +179,15 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
assert(executorPod.container.getEnv.size() === defaultEnvs.size) val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val extraJavaOptsEnvs = extraJavaOpts.zipWithIndex.map { case (opt, ind) =>
s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt
}.toMap
val mapEnvs = executorPod.container.getEnv.asScala.map { val mapEnvs = executorPod.container.getEnv.asScala.map {
x => (x.getName, x.getValue) x => (x.getName, x.getValue)
}.toMap }.toMap
assert(defaultEnvs === mapEnvs) assert((defaultEnvs ++ extraJavaOptsEnvs) === mapEnvs)
} }
} }

View file

@ -20,13 +20,13 @@ import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations} import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations}
import org.mockito.Matchers.any import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito.{never, times, verify, when} import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
@ -52,6 +52,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L) private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L)
private val secMgr = new SecurityManager(conf)
private var waitForExecutorPodsClock: ManualClock = _ private var waitForExecutorPodsClock: ManualClock = _
@ -79,12 +80,12 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations) when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod) when(driverPodOperations.get).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]))) when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr)))
.thenAnswer(executorPodAnswer()) .thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
waitForExecutorPodsClock = new ManualClock(0L) waitForExecutorPodsClock = new ManualClock(0L)
podsAllocatorUnderTest = new ExecutorPodsAllocator( podsAllocatorUnderTest = new ExecutorPodsAllocator(
conf, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID)
} }

View file

@ -23,7 +23,7 @@ import org.mockito.Matchers.{eq => mockitoEq}
import org.mockito.Mockito.{never, verify, when} import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
@ -41,6 +41,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
@Mock @Mock
private var sc: SparkContext = _ private var sc: SparkContext = _
@Mock
private var env: SparkEnv = _
@Mock @Mock
private var rpcEnv: RpcEnv = _ private var rpcEnv: RpcEnv = _
@ -81,6 +84,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
MockitoAnnotations.initMocks(this) MockitoAnnotations.initMocks(this)
when(taskScheduler.sc).thenReturn(sc) when(taskScheduler.sc).thenReturn(sc)
when(sc.conf).thenReturn(sparkConf) when(sc.conf).thenReturn(sparkConf)
when(sc.env).thenReturn(env)
when(env.rpcEnv).thenReturn(rpcEnv)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint]) driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
when(rpcEnv.setupEndpoint( when(rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture())) mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
@ -88,7 +93,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(kubernetesClient.pods()).thenReturn(podOperations) when(kubernetesClient.pods()).thenReturn(podOperations)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend( schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
taskScheduler, taskScheduler,
rpcEnv, sc,
kubernetesClient, kubernetesClient,
requestExecutorsService, requestExecutorsService,
eventQueue, eventQueue,

View file

@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{Config => _, _}
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import org.mockito.Mockito.{mock, never, verify} import org.mockito.Mockito.{mock, never, verify}
import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features._
@ -39,6 +39,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
private val KERBEROS_CONF_STEP_TYPE = "kerberos-step" private val KERBEROS_CONF_STEP_TYPE = "kerberos-step"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
private val secMgr = new SecurityManager(new SparkConf(false))
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep]) BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
@ -57,7 +59,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
private val builderUnderTest = new KubernetesExecutorBuilder( private val builderUnderTest = new KubernetesExecutorBuilder(
_ => basicFeatureStep, (_, _) => basicFeatureStep,
_ => mountSecretsStep, _ => mountSecretsStep,
_ => envSecretsStep, _ => envSecretsStep,
_ => localDirsStep, _ => localDirsStep,
@ -69,7 +71,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
test("Basic steps are consistently applied.") { test("Basic steps are consistently applied.") {
val conf = KubernetesTestConf.createExecutorConf() val conf = KubernetesTestConf.createExecutorConf()
validateStepTypesApplied( validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
} }
test("Apply secrets step if secrets are present.") { test("Apply secrets step if secrets are present.") {
@ -77,7 +79,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
secretEnvNamesToKeyRefs = Map("secret-name" -> "secret-key"), secretEnvNamesToKeyRefs = Map("secret-name" -> "secret-key"),
secretNamesToMountPaths = Map("secret" -> "secretMountPath")) secretNamesToMountPaths = Map("secret" -> "secretMountPath"))
validateStepTypesApplied( validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), builderUnderTest.buildFromFeatures(conf, secMgr),
BASIC_STEP_TYPE, BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE, LOCAL_DIRS_STEP_TYPE,
SECRETS_STEP_TYPE, SECRETS_STEP_TYPE,
@ -94,7 +96,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
val conf = KubernetesTestConf.createExecutorConf( val conf = KubernetesTestConf.createExecutorConf(
volumes = Seq(volumeSpec)) volumes = Seq(volumeSpec))
validateStepTypesApplied( validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), builderUnderTest.buildFromFeatures(conf, secMgr),
BASIC_STEP_TYPE, BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE, LOCAL_DIRS_STEP_TYPE,
MOUNT_VOLUMES_STEP_TYPE) MOUNT_VOLUMES_STEP_TYPE)
@ -107,7 +109,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
.set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name")
.set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name")) .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name"))
validateStepTypesApplied( validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), builderUnderTest.buildFromFeatures(conf, secMgr),
BASIC_STEP_TYPE, BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE, LOCAL_DIRS_STEP_TYPE,
HADOOP_CONF_STEP_TYPE, HADOOP_CONF_STEP_TYPE,
@ -123,7 +125,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
.set(KERBEROS_DT_SECRET_NAME, "dt-secret") .set(KERBEROS_DT_SECRET_NAME, "dt-secret")
.set(KERBEROS_DT_SECRET_KEY, "dt-key" )) .set(KERBEROS_DT_SECRET_KEY, "dt-key" ))
validateStepTypesApplied( validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), builderUnderTest.buildFromFeatures(conf, secMgr),
BASIC_STEP_TYPE, BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE, LOCAL_DIRS_STEP_TYPE,
HADOOP_CONF_STEP_TYPE, HADOOP_CONF_STEP_TYPE,
@ -154,7 +156,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
.endMetadata() .endMetadata()
.build())) .build()))
val sparkPod = KubernetesExecutorBuilder(kubernetesClient, sparkConf) val sparkPod = KubernetesExecutorBuilder(kubernetesClient, sparkConf)
.buildFromFeatures(kubernetesConf) .buildFromFeatures(kubernetesConf, secMgr)
PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod) PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod)
} }
} }

View file

@ -36,6 +36,7 @@ import org.apache.spark.{SPARK_VERSION, SparkFunSuite}
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
class KubernetesSuite extends SparkFunSuite class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
@ -138,6 +139,7 @@ class KubernetesSuite extends SparkFunSuite
.set("spark.kubernetes.driver.pod.name", driverPodName) .set("spark.kubernetes.driver.pod.name", driverPodName)
.set("spark.kubernetes.driver.label.spark-app-locator", appLocator) .set("spark.kubernetes.driver.label.spark-app-locator", appLocator)
.set("spark.kubernetes.executor.label.spark-app-locator", appLocator) .set("spark.kubernetes.executor.label.spark-app-locator", appLocator)
.set(NETWORK_AUTH_ENABLED.key, "true")
if (!kubernetesTestComponents.hasUserSpecifiedNamespace) { if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
kubernetesTestComponents.createNamespace() kubernetesTestComponents.createNamespace()
} }