[SPARK-22757][KUBERNETES] Enable use of remote dependencies (http, s3, gcs, etc.) in Kubernetes mode

## What changes were proposed in this pull request?

This PR expands the Kubernetes mode to be able to use remote dependencies on http/https endpoints, GCS, S3, etc. It adds steps for configuring and appending the Kubernetes init-container into the driver and executor pods for downloading remote dependencies.
[Init-containers](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/), as the name suggests, are containers that are run to completion before the main containers start, and are often used to perform initialization tasks prior to starting the main containers. We use init-containers to localize remote application dependencies before the driver/executors start running. The code that the init-container runs is also included. This PR also adds a step to the driver and executors for mounting user-specified secrets that may store credentials for accessing data storage, e.g., S3 and Google Cloud Storage (GCS), into the driver and executors.

## How was this patch tested?

* The patch contains unit tests which are passing.
* Manual testing: `./build/mvn -Pkubernetes clean package` succeeded.
* Manual testing of the following cases:
  * [x] Running SparkPi using container-local spark-example jar.
  * [x] Running SparkPi using container-local spark-example jar with user-specific secret mounted.
  * [x] Running SparkPi using spark-example jar hosted remotely on an https endpoint.

cc rxin felixcheung mateiz (shepherd)
k8s-big-data SIG members & contributors: mccheah foxish ash211 ssuchter varunkatta kimoonkim erikerlandson tnachen ifilonenko liyinan926
reviewers: vanzin felixcheung jiangxb1987 mridulm

Author: Yinan Li <liyinan926@gmail.com>

Closes #19954 from liyinan926/init-container.
This commit is contained in:
Yinan Li 2017-12-28 13:44:44 +09:00 committed by Takuya UESHIN
parent 32ec269d08
commit 171f6ddadc
36 changed files with 1783 additions and 171 deletions

View file

@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
private[spark] object Config extends Logging {
@ -132,20 +131,72 @@ private[spark] object Config extends Logging {
val JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.doc("Location to download jars to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-jars")
val FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.doc("Location to download files to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-files")
val INIT_CONTAINER_IMAGE =
ConfigBuilder("spark.kubernetes.initContainer.image")
.doc("Image for the driver and executor's init-container for downloading dependencies.")
.stringConf
.createOptional
val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountDependencies.timeout")
.doc("Timeout before aborting the attempt to download and unpack dependencies from remote " +
"locations into the driver and executor pods.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(300)
val INIT_CONTAINER_MAX_THREAD_POOL_SIZE =
ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads")
.doc("Maximum number of remote dependencies to download simultaneously in a driver or " +
"executor pod.")
.intConf
.createWithDefault(5)
val INIT_CONTAINER_REMOTE_JARS =
ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
.doc("Comma-separated list of jar URIs to download in the init-container. This is " +
"calculated from spark.jars.")
.internal()
.stringConf
.createOptional
val INIT_CONTAINER_REMOTE_FILES =
ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
.doc("Comma-separated list of file URIs to download in the init-container. This is " +
"calculated from spark.files.")
.internal()
.stringConf
.createOptional
val INIT_CONTAINER_CONFIG_MAP_NAME =
ConfigBuilder("spark.kubernetes.initContainer.configMapName")
.doc("Name of the config map to use in the init-container that retrieves submitted files " +
"for the executor.")
.internal()
.stringConf
.createOptional
val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
.doc("Key for the entry in the init container config map for submitted files that " +
"corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
@ -153,9 +204,11 @@ private[spark] object Config extends Logging {
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
}

View file

@ -69,6 +69,17 @@ private[spark] object Constants {
val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
// Bootstrapping dependencies with the init-container
val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
val INIT_CONTAINER_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"

View file

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
/**
* Bootstraps an init-container for downloading remote dependencies. This is separated out from
* the init-container steps API because this component can be used to bootstrap init-containers
* for both the driver and executors.
*/
private[spark] class InitContainerBootstrap(
initContainerImage: String,
imagePullPolicy: String,
jarsDownloadPath: String,
filesDownloadPath: String,
configMapName: String,
configMapKey: String,
sparkRole: String,
sparkConf: SparkConf) {
/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
*/
def bootstrapInitContainer(
original: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withMountPath(jarsDownloadPath)
.build(),
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withMountPath(filesDownloadPath)
.build())
val customEnvVarKeyPrefix = sparkRole match {
case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
}
val customEnvVars = sparkConf.getAllWithPrefix(customEnvVarKeyPrefix).toSeq.map {
case (key, value) =>
new EnvVarBuilder()
.withName(key)
.withValue(value)
.build()
}
val initContainer = new ContainerBuilder(original.initContainer)
.withName("spark-init")
.withImage(initContainerImage)
.withImagePullPolicy(imagePullPolicy)
.addAllToEnv(customEnvVars.asJava)
.addNewVolumeMount()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
.build()
val podWithBasicVolumes = new PodBuilder(original.pod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withNewConfigMap()
.withName(configMapName)
.addNewItem()
.withKey(configMapKey)
.withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.endSpec()
.build()
val mainContainer = new ContainerBuilder(original.mainContainer)
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.build()
PodWithDetachedInitContainer(
podWithBasicVolumes,
initContainer,
mainContainer)
}
}

View file

@ -14,13 +14,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
package org.apache.spark.deploy.k8s
import java.io.File
import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
private[spark] object KubernetesFileUtils {
private[spark] object KubernetesUtils {
/**
* Extract and parse Spark configuration properties with a given name prefix and
* return the result as a Map. Keys must not have more than one value.
*
* @param sparkConf Spark configuration
* @param prefix the given property name prefix
* @return a Map storing the configuration property keys and values
*/
def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String): Map[String, String] = {
sparkConf.getAllWithPrefix(prefix).toMap
}
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
/**
* Append the given init-container to a pod's list of init-containers.
*
* @param originalPodSpec original specification of the pod
* @param initContainer the init-container to add to the pod
* @return the pod with the init-container added to the list of InitContainers
*/
def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
new PodBuilder(originalPodSpec)
.editOrNewSpec()
.addToInitContainers(initContainer)
.endSpec()
.build()
}
/**
* For the given collection of file URIs, resolves them as follows:
@ -47,6 +83,16 @@ private[spark] object KubernetesFileUtils {
}
}
/**
* Get from a given collection of file URIs the ones that represent remote files.
*/
def getOnlyRemoteFiles(uris: Iterable[String]): Iterable[String] = {
uris.filter { uri =>
val scheme = Utils.resolveURI(uri).getScheme
scheme != "file" && scheme != "local"
}
}
private def resolveFileUri(
uri: String,
fileDownloadPath: String,

View file

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
/**
* Bootstraps a driver or executor container or an init-container with needed secrets mounted.
*/
private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {
/**
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
*
* @param pod the pod into which the secret volumes are being added.
* @param container the container into which the secret volumes are being mounted.
* @return the updated pod and container with the secrets mounted.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach { name =>
podBuilder = podBuilder
.editOrNewSpec()
.addNewVolume()
.withName(secretVolumeName(name))
.withNewSecret()
.withSecretName(name)
.endSecret()
.endVolume()
.endSpec()
}
var containerBuilder = new ContainerBuilder(container)
secretNamesToMountPaths.foreach { case (name, path) =>
containerBuilder = containerBuilder
.addNewVolumeMount()
.withName(secretVolumeName(name))
.withMountPath(path)
.endVolumeMount()
}
(podBuilder.build(), containerBuilder.build())
}
private def secretVolumeName(secretName: String): String = {
secretName + "-volume"
}
}

View file

@ -14,28 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s
import org.apache.spark.SparkConf
import io.fabric8.kubernetes.api.model.{Container, Pod}
private[spark] object ConfigurationUtils {
/**
* Extract and parse Spark configuration properties with a given name prefix and
* return the result as a Map. Keys must not have more than one value.
*
* @param sparkConf Spark configuration
* @param prefix the given property name prefix
* @return a Map storing the configuration property keys and values
*/
def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String): Map[String, String] = {
sparkConf.getAllWithPrefix(prefix).toMap
}
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
}
/**
* Represents a pod with a detached init-container (not yet added to the pod).
*
* @param pod the pod
* @param initContainer the init-container in the pod
* @param mainContainer the main container in the pod
*/
private[spark] case class PodWithDetachedInitContainer(
pod: Pod,
initContainer: Container,
mainContainer: Container)

View file

@ -48,7 +48,7 @@ private[spark] object SparkKubernetesClientFactory {
.map(new File(_))
.orElse(defaultServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
ConfigurationUtils.requireNandDefined(
KubernetesUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " +

View file

@ -21,25 +21,31 @@ import java.util.UUID
import com.google.common.primitives.Longs
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.steps._
import org.apache.spark.deploy.k8s.submit.steps.initcontainer.InitContainerConfigOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.SystemClock
import org.apache.spark.util.Utils
/**
* Constructs the complete list of driver configuration steps to run to deploy the Spark driver.
* Figures out and returns the complete ordered list of needed DriverConfigurationSteps to
* configure the Spark driver pod. The returned steps will be applied one by one in the given
* order to produce a final KubernetesDriverSpec that is used in KubernetesClientApplication
* to construct and create the driver pod. It uses the InitContainerConfigOrchestrator to
* configure the driver init-container if one is needed, i.e., when there are remote dependencies
* to localize.
*/
private[spark] class DriverConfigurationStepsOrchestrator(
namespace: String,
private[spark] class DriverConfigOrchestrator(
kubernetesAppId: String,
launchTime: Long,
mainAppResource: Option[MainAppResource],
appName: String,
mainClass: String,
appArgs: Array[String],
submissionSparkConf: SparkConf) {
sparkConf: SparkConf) {
// The resource name prefix is derived from the Spark application name, making it easy to connect
// the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
@ -49,13 +55,14 @@ private[spark] class DriverConfigurationStepsOrchestrator(
s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
}
private val imagePullPolicy = submissionSparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
private val jarsDownloadPath = submissionSparkConf.get(JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = submissionSparkConf.get(FILES_DOWNLOAD_LOCATION)
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
private val jarsDownloadPath = sparkConf.get(JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = sparkConf.get(FILES_DOWNLOAD_LOCATION)
def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf,
def getAllConfigurationSteps: Seq[DriverConfigurationStep] = {
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
@ -64,11 +71,15 @@ private[spark] class DriverConfigurationStepsOrchestrator(
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_DRIVER_SECRETS_PREFIX)
val allDriverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val initialSubmissionStep = new BaseDriverConfigurationStep(
val initialSubmissionStep = new BasicDriverConfigurationStep(
kubernetesAppId,
kubernetesResourceNamePrefix,
allDriverLabels,
@ -76,16 +87,16 @@ private[spark] class DriverConfigurationStepsOrchestrator(
appName,
mainClass,
appArgs,
submissionSparkConf)
sparkConf)
val driverAddressStep = new DriverServiceBootstrapStep(
val serviceBootstrapStep = new DriverServiceBootstrapStep(
kubernetesResourceNamePrefix,
allDriverLabels,
submissionSparkConf,
sparkConf,
new SystemClock)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)
sparkConf, kubernetesResourceNamePrefix)
val additionalMainAppJar = if (mainAppResource.nonEmpty) {
val mayBeResource = mainAppResource.get match {
@ -98,28 +109,62 @@ private[spark] class DriverConfigurationStepsOrchestrator(
None
}
val sparkJars = submissionSparkConf.getOption("spark.jars")
val sparkJars = sparkConf.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
additionalMainAppJar.toSeq
val sparkFiles = submissionSparkConf.getOption("spark.files")
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val maybeDependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
Some(new DependencyResolutionStep(
val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
Seq(new DependencyResolutionStep(
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath))
} else {
None
Nil
}
val initContainerBootstrapStep = if (existNonContainerLocalFiles(sparkJars ++ sparkFiles)) {
val orchestrator = new InitContainerConfigOrchestrator(
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
imagePullPolicy,
initContainerConfigMapName,
INIT_CONTAINER_PROPERTIES_FILE_NAME,
sparkConf)
val bootstrapStep = new DriverInitContainerBootstrapStep(
orchestrator.getAllConfigurationSteps,
initContainerConfigMapName,
INIT_CONTAINER_PROPERTIES_FILE_NAME)
Seq(bootstrapStep)
} else {
Nil
}
val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
} else {
Nil
}
Seq(
initialSubmissionStep,
driverAddressStep,
serviceBootstrapStep,
kubernetesCredentialsStep) ++
maybeDependencyResolutionStep.toSeq
dependencyResolutionStep ++
initContainerBootstrapStep ++
mountSecretsStep
}
private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
files.exists { uri =>
Utils.resolveURI(uri).getScheme != "local"
}
}
}

View file

@ -80,22 +80,22 @@ private[spark] object ClientArguments {
* spark.kubernetes.submission.waitAppCompletion is true.
*
* @param submissionSteps steps that collectively configure the driver
* @param submissionSparkConf the submission client Spark configuration
* @param sparkConf the submission client Spark configuration
* @param kubernetesClient the client to talk to the Kubernetes API server
* @param waitForAppCompletion a flag indicating whether the client should wait for the application
* to complete
* @param appName the application name
* @param loggingPodStatusWatcher a watcher that monitors and logs the application status
* @param watcher a watcher that monitors and logs the application status
*/
private[spark] class Client(
submissionSteps: Seq[DriverConfigurationStep],
submissionSparkConf: SparkConf,
sparkConf: SparkConf,
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
appName: String,
loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
watcher: LoggingPodStatusWatcher) extends Logging {
private val driverJavaOptions = submissionSparkConf.get(
private val driverJavaOptions = sparkConf.get(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
/**
@ -104,7 +104,7 @@ private[spark] class Client(
* will be used to build the Driver Container, Driver Pod, and Kubernetes Resources
*/
def run(): Unit = {
var currentDriverSpec = KubernetesDriverSpec.initialSpec(submissionSparkConf)
var currentDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf)
// submissionSteps contain steps necessary to take, to resolve varying
// client arguments that are passed in, created by orchestrator
for (nextStep <- submissionSteps) {
@ -141,7 +141,7 @@ private[spark] class Client(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(loggingPodStatusWatcher)) { _ =>
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
@ -157,7 +157,7 @@ private[spark] class Client(
if (waitForAppCompletion) {
logInfo(s"Waiting for application $appName to finish...")
loggingPodStatusWatcher.awaitCompletion()
watcher.awaitCompletion()
logInfo(s"Application $appName finished.")
} else {
logInfo(s"Deployed Spark application $appName into Kubernetes.")
@ -207,11 +207,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val master = sparkConf.get("spark.master").substring("k8s://".length)
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
kubernetesAppId, loggingInterval)
val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
val configurationStepsOrchestrator = new DriverConfigurationStepsOrchestrator(
namespace,
val orchestrator = new DriverConfigOrchestrator(
kubernetesAppId,
launchTime,
clientArguments.mainAppResource,
@ -228,12 +226,12 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
None,
None)) { kubernetesClient =>
val client = new Client(
configurationStepsOrchestrator.getAllConfigurationSteps(),
orchestrator.getAllConfigurationSteps,
sparkConf,
kubernetesClient,
waitForAppCompletion,
appName,
loggingPodStatusWatcher)
watcher)
client.run()
}
}

View file

@ -22,49 +22,46 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarS
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
/**
* Represents the initial setup required for the driver.
* Performs basic configuration for the driver pod.
*/
private[spark] class BaseDriverConfigurationStep(
private[spark] class BasicDriverConfigurationStep(
kubernetesAppId: String,
kubernetesResourceNamePrefix: String,
resourceNamePrefix: String,
driverLabels: Map[String, String],
imagePullPolicy: String,
appName: String,
mainClass: String,
appArgs: Array[String],
submissionSparkConf: SparkConf) extends DriverConfigurationStep {
sparkConf: SparkConf) extends DriverConfigurationStep {
private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(s"$kubernetesResourceNamePrefix-driver")
private val driverPodName = sparkConf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(s"$resourceNamePrefix-driver")
private val driverExtraClasspath = submissionSparkConf.get(
DRIVER_CLASS_PATH)
private val driverExtraClasspath = sparkConf.get(DRIVER_CLASS_PATH)
private val driverContainerImage = submissionSparkConf
private val driverContainerImage = sparkConf
.get(DRIVER_CONTAINER_IMAGE)
.getOrElse(throw new SparkException("Must specify the driver container image"))
// CPU settings
private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
// Memory settings
private val driverMemoryMiB = submissionSparkConf.get(
DRIVER_MEMORY)
private val driverMemoryString = submissionSparkConf.get(
DRIVER_MEMORY.key,
DRIVER_MEMORY.defaultValueString)
private val memoryOverheadMiB = submissionSparkConf
private val driverMemoryMiB = sparkConf.get(DRIVER_MEMORY)
private val driverMemoryString = sparkConf.get(
DRIVER_MEMORY.key, DRIVER_MEMORY.defaultValueString)
private val memoryOverheadMiB = sparkConf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
@ -74,15 +71,13 @@ private[spark] class BaseDriverConfigurationStep(
.build()
}
val driverCustomAnnotations = ConfigurationUtils
.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverCustomAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
" Spark bookkeeping operations.")
val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
val driverCustomEnvs = sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
.map { env =>
new EnvVarBuilder()
.withName(env._1)
@ -90,10 +85,10 @@ private[spark] class BaseDriverConfigurationStep(
.build()
}
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
val driverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
val nodeSelector = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
@ -102,7 +97,7 @@ private[spark] class BaseDriverConfigurationStep(
.withAmount(s"${driverMemoryMiB}Mi")
.build()
val driverMemoryLimitQuantity = new QuantityBuilder(false)
.withAmount(s"${driverContainerMemoryWithOverheadMiB}Mi")
.withAmount(s"${driverMemoryWithOverheadMiB}Mi")
.build()
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
@ -142,9 +137,9 @@ private[spark] class BaseDriverConfigurationStep(
val baseDriverPod = new PodBuilder(driverSpec.driverPod)
.editOrNewMetadata()
.withName(kubernetesDriverPodName)
.withName(driverPodName)
.addToLabels(driverLabels.asJava)
.addToAnnotations(allDriverAnnotations.asJava)
.addToAnnotations(driverAnnotations.asJava)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
@ -153,9 +148,9 @@ private[spark] class BaseDriverConfigurationStep(
.build()
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
.setIfMissing(KUBERNETES_DRIVER_POD_NAME, driverPodName)
.set("spark.app.id", kubernetesAppId)
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, kubernetesResourceNamePrefix)
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, resourceNamePrefix)
driverSpec.copy(
driverPod = baseDriverPod,

View file

@ -21,7 +21,8 @@ import java.io.File
import io.fabric8.kubernetes.api.model.ContainerBuilder
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, KubernetesFileUtils}
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
/**
* Step that configures the classpath, spark.jars, and spark.files for the driver given that the
@ -31,21 +32,22 @@ private[spark] class DependencyResolutionStep(
sparkJars: Seq[String],
sparkFiles: Seq[String],
jarsDownloadPath: String,
localFilesDownloadPath: String) extends DriverConfigurationStep {
filesDownloadPath: String) extends DriverConfigurationStep {
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val resolvedSparkJars = KubernetesFileUtils.resolveFileUris(sparkJars, jarsDownloadPath)
val resolvedSparkFiles = KubernetesFileUtils.resolveFileUris(
sparkFiles, localFilesDownloadPath)
val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone()
val resolvedSparkJars = KubernetesUtils.resolveFileUris(sparkJars, jarsDownloadPath)
val resolvedSparkFiles = KubernetesUtils.resolveFileUris(sparkFiles, filesDownloadPath)
val sparkConf = driverSpec.driverSparkConf.clone()
if (resolvedSparkJars.nonEmpty) {
sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(","))
sparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
}
if (resolvedSparkFiles.nonEmpty) {
sparkConfResolvedSparkDependencies.set("spark.files", resolvedSparkFiles.mkString(","))
sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
}
val resolvedClasspath = KubernetesFileUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
val driverContainerWithResolvedClasspath = if (resolvedClasspath.nonEmpty) {
val resolvedClasspath = KubernetesUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
val resolvedDriverContainer = if (resolvedClasspath.nonEmpty) {
new ContainerBuilder(driverSpec.driverContainer)
.addNewEnv()
.withName(ENV_MOUNTED_CLASSPATH)
@ -55,8 +57,9 @@ private[spark] class DependencyResolutionStep(
} else {
driverSpec.driverContainer
}
driverSpec.copy(
driverContainer = driverContainerWithResolvedClasspath,
driverSparkConf = sparkConfResolvedSparkDependencies)
driverContainer = resolvedDriverContainer,
driverSparkConf = sparkConf)
}
}

View file

@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit.steps
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
/**
* Represents a step in preparing the Kubernetes driver.
* Represents a step in configuring the Spark driver pod.
*/
private[spark] trait DriverConfigurationStep {

View file

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import java.io.StringWriter
import java.util.Properties
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, ContainerBuilder, HasMetadata}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
/**
* Configures the driver init-container that localizes remote dependencies into the driver pod.
* It applies the given InitContainerConfigurationSteps in the given order to produce a final
* InitContainerSpec that is then used to configure the driver pod with the init-container attached.
* It also builds a ConfigMap that will be mounted into the init-container. The ConfigMap carries
* configuration properties for the init-container.
*/
private[spark] class DriverInitContainerBootstrapStep(
steps: Seq[InitContainerConfigurationStep],
configMapName: String,
configMapKey: String)
extends DriverConfigurationStep {
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
var initContainerSpec = InitContainerSpec(
properties = Map.empty[String, String],
driverSparkConf = Map.empty[String, String],
initContainer = new ContainerBuilder().build(),
driverContainer = driverSpec.driverContainer,
driverPod = driverSpec.driverPod,
dependentResources = Seq.empty[HasMetadata])
for (nextStep <- steps) {
initContainerSpec = nextStep.configureInitContainer(initContainerSpec)
}
val configMap = buildConfigMap(
configMapName,
configMapKey,
initContainerSpec.properties)
val resolvedDriverSparkConf = driverSpec.driverSparkConf
.clone()
.set(INIT_CONTAINER_CONFIG_MAP_NAME, configMapName)
.set(INIT_CONTAINER_CONFIG_MAP_KEY_CONF, configMapKey)
.setAll(initContainerSpec.driverSparkConf)
val resolvedDriverPod = KubernetesUtils.appendInitContainer(
initContainerSpec.driverPod, initContainerSpec.initContainer)
driverSpec.copy(
driverPod = resolvedDriverPod,
driverContainer = initContainerSpec.driverContainer,
driverSparkConf = resolvedDriverSparkConf,
otherKubernetesResources =
driverSpec.otherKubernetesResources ++
initContainerSpec.dependentResources ++
Seq(configMap))
}
private def buildConfigMap(
configMapName: String,
configMapKey: String,
config: Map[String, String]): ConfigMap = {
val properties = new Properties()
config.foreach { entry =>
properties.setProperty(entry._1, entry._2)
}
val propertiesWriter = new StringWriter()
properties.store(propertiesWriter,
s"Java properties built from Kubernetes config map with name: $configMapName " +
s"and config map key: $configMapKey")
new ConfigMapBuilder()
.withNewMetadata()
.withName(configMapName)
.endMetadata()
.addToData(configMapKey, propertiesWriter.toString)
.build()
}
}

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
/**
* A driver configuration step for mounting user-specified secrets onto user-specified paths.
*
* @param bootstrap a utility actually handling mounting of the secrets.
*/
private[spark] class DriverMountSecretsStep(
bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val (pod, container) = bootstrap.mountSecrets(
driverSpec.driverPod, driverSpec.driverContainer)
driverSpec.copy(
driverPod = pod,
driverContainer = container
)
}
}

View file

@ -32,21 +32,22 @@ import org.apache.spark.util.Clock
* ports should correspond to the ports that the executor will reach the pod at for RPC.
*/
private[spark] class DriverServiceBootstrapStep(
kubernetesResourceNamePrefix: String,
resourceNamePrefix: String,
driverLabels: Map[String, String],
submissionSparkConf: SparkConf,
sparkConf: SparkConf,
clock: Clock) extends DriverConfigurationStep with Logging {
import DriverServiceBootstrapStep._
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
require(sparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " +
"address is managed and set to the driver pod's IP address.")
require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
require(sparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
"managed via a Kubernetes service.")
val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
@ -58,8 +59,8 @@ private[spark] class DriverServiceBootstrapStep(
shorterServiceName
}
val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
val driverBlockManagerPort = submissionSparkConf.getInt(
val driverPort = sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
val driverBlockManagerPort = sparkConf.getInt(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
val driverService = new ServiceBuilder()
.withNewMetadata()
@ -81,7 +82,7 @@ private[spark] class DriverServiceBootstrapStep(
.endSpec()
.build()
val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
.set(DRIVER_HOST_KEY, driverHostname)

View file

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, PodWithDetachedInitContainer}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesUtils
/**
* Performs basic configuration for the driver init-container with most of the work delegated to
* the given InitContainerBootstrap.
*/
private[spark] class BasicInitContainerConfigurationStep(
sparkJars: Seq[String],
sparkFiles: Seq[String],
jarsDownloadPath: String,
filesDownloadPath: String,
bootstrap: InitContainerBootstrap)
extends InitContainerConfigurationStep {
override def configureInitContainer(spec: InitContainerSpec): InitContainerSpec = {
val remoteJarsToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkJars)
val remoteFilesToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkFiles)
val remoteJarsConf = if (remoteJarsToDownload.nonEmpty) {
Map(INIT_CONTAINER_REMOTE_JARS.key -> remoteJarsToDownload.mkString(","))
} else {
Map()
}
val remoteFilesConf = if (remoteFilesToDownload.nonEmpty) {
Map(INIT_CONTAINER_REMOTE_FILES.key -> remoteFilesToDownload.mkString(","))
} else {
Map()
}
val baseInitContainerConfig = Map(
JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath,
FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath) ++
remoteJarsConf ++
remoteFilesConf
val bootstrapped = bootstrap.bootstrapInitContainer(
PodWithDetachedInitContainer(
spec.driverPod,
spec.initContainer,
spec.driverContainer))
spec.copy(
initContainer = bootstrapped.initContainer,
driverContainer = bootstrapped.mainContainer,
driverPod = bootstrapped.pod,
properties = spec.properties ++ baseInitContainerConfig)
}
}

View file

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
/**
* Figures out and returns the complete ordered list of InitContainerConfigurationSteps required to
* configure the driver init-container. The returned steps will be applied in the given order to
* produce a final InitContainerSpec that is used to construct the driver init-container in
* DriverInitContainerBootstrapStep. This class is only used when an init-container is needed, i.e.,
* when there are remote application dependencies to localize.
*/
private[spark] class InitContainerConfigOrchestrator(
sparkJars: Seq[String],
sparkFiles: Seq[String],
jarsDownloadPath: String,
filesDownloadPath: String,
imagePullPolicy: String,
configMapName: String,
configMapKey: String,
sparkConf: SparkConf) {
private val initContainerImage = sparkConf
.get(INIT_CONTAINER_IMAGE)
.getOrElse(throw new SparkException(
"Must specify the init-container image when there are remote dependencies"))
def getAllConfigurationSteps: Seq[InitContainerConfigurationStep] = {
val initContainerBootstrap = new InitContainerBootstrap(
initContainerImage,
imagePullPolicy,
jarsDownloadPath,
filesDownloadPath,
configMapName,
configMapKey,
SPARK_POD_DRIVER_ROLE,
sparkConf)
val baseStep = new BasicInitContainerConfigurationStep(
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
initContainerBootstrap)
val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_DRIVER_SECRETS_PREFIX)
// Mount user-specified driver secrets also into the driver's init-container. The
// init-container may need credentials in the secrets to be able to download remote
// dependencies. The driver's main container and its init-container share the secrets
// because the init-container is sort of an implementation details and this sharing
// avoids introducing a dedicated configuration property just for the init-container.
val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
Seq(new InitContainerMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
} else {
Nil
}
Seq(baseStep) ++ mountSecretsStep
}
}

View file

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps.initcontainer
/**
* Represents a step in configuring the driver init-container.
*/
private[spark] trait InitContainerConfigurationStep {
def configureInitContainer(spec: InitContainerSpec): InitContainerSpec
}

View file

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
/**
* An init-container configuration step for mounting user-specified secrets onto user-specified
* paths.
*
* @param bootstrap a utility actually handling mounting of the secrets
*/
private[spark] class InitContainerMountSecretsStep(
bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
val (driverPod, initContainer) = bootstrap.mountSecrets(
spec.driverPod,
spec.initContainer)
spec.copy(
driverPod = driverPod,
initContainer = initContainer
)
}
}

View file

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod}
/**
* Represents a specification of the init-container for the driver pod.
*
* @param properties properties that should be set on the init-container
* @param driverSparkConf Spark configuration properties that will be carried back to the driver
* @param initContainer the init-container object
* @param driverContainer the driver container object
* @param driverPod the driver pod object
* @param dependentResources resources the init-container depends on to work
*/
private[spark] case class InitContainerSpec(
properties: Map[String, String],
driverSparkConf: Map[String, String],
initContainer: Container,
driverContainer: Container,
driverPod: Pod,
dependentResources: Seq[HasMetadata])

View file

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.rest.k8s
import java.io.File
import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ThreadUtils, Utils}
/**
* Process that fetches files from a resource staging server and/or arbitrary remote locations.
*
* The init-container can handle fetching files from any of those sources, but not all of the
* sources need to be specified. This allows for composing multiple instances of this container
* with different configurations for different download sources, or using the same container to
* download everything at once.
*/
private[spark] class SparkPodInitContainer(
sparkConf: SparkConf,
fileFetcher: FileFetcher) extends Logging {
private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE)
private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize))
private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION))
private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION))
private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS)
private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
def run(): Unit = {
logInfo(s"Downloading remote jars: $remoteJars")
downloadFiles(
remoteJars,
jarsDownloadDir,
s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
"or is not a directory.")
logInfo(s"Downloading remote files: $remoteFiles")
downloadFiles(
remoteFiles,
filesDownloadDir,
s"Remote files download directory specified at $filesDownloadDir does not exist " +
"or is not a directory.")
downloadExecutor.shutdown()
downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
}
private def downloadFiles(
filesCommaSeparated: Option[String],
downloadDir: File,
errMessage: String): Unit = {
filesCommaSeparated.foreach { files =>
require(downloadDir.isDirectory, errMessage)
Utils.stringToSeq(files).foreach { file =>
Future[Unit] {
fileFetcher.fetchFile(file, downloadDir)
}
}
}
}
}
private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) {
def fetchFile(uri: String, targetDir: File): Unit = {
Utils.fetchFile(
url = uri,
targetDir = targetDir,
conf = sparkConf,
securityMgr = securityManager,
hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf),
timestamp = System.currentTimeMillis(),
useCache = false)
}
}
object SparkPodInitContainer extends Logging {
def main(args: Array[String]): Unit = {
logInfo("Starting init-container to download Spark application dependencies.")
val sparkConf = new SparkConf(true)
if (args.nonEmpty) {
Utils.loadDefaultSparkProperties(sparkConf, args(0))
}
val securityManager = new SparkSecurityManager(sparkConf)
val fileFetcher = new FileFetcher(sparkConf, securityManager)
new SparkPodInitContainer(sparkConf, fileFetcher).run()
logInfo("Finished downloading application dependencies.")
}
}

View file

@ -21,35 +21,35 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, PodWithDetachedInitContainer}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.util.Utils
/**
* A factory class for configuring and creating executor pods.
* A factory class for bootstrapping and creating executor pods with the given bootstrapping
* components.
*
* @param sparkConf Spark configuration
* @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto
* user-specified paths into the executor container
* @param initContainerBootstrap an optional component for bootstrapping the executor init-container
* if one is needed, i.e., when there are remote dependencies to
* localize
* @param initContainerMountSecretsBootstrap an optional component for mounting user-specified
* secrets onto user-specified paths into the executor
* init-container
*/
private[spark] trait ExecutorPodFactory {
/**
* Configure and construct an executor pod with the given parameters.
*/
def createExecutorPod(
executorId: String,
applicationId: String,
driverUrl: String,
executorEnvs: Seq[(String, String)],
driverPod: Pod,
nodeToLocalTaskCount: Map[String, Int]): Pod
}
private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
extends ExecutorPodFactory {
private[spark] class ExecutorPodFactory(
sparkConf: SparkConf,
mountSecretsBootstrap: Option[MountSecretsBootstrap],
initContainerBootstrap: Option[InitContainerBootstrap],
initContainerMountSecretsBootstrap: Option[MountSecretsBootstrap]) {
private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_LABEL_PREFIX)
require(
@ -64,11 +64,11 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
private val executorAnnotations =
ConfigurationUtils.parsePrefixedKeyValuePairs(
KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
private val nodeSelector =
ConfigurationUtils.parsePrefixedKeyValuePairs(
KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_NODE_SELECTOR_PREFIX)
@ -94,7 +94,10 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
override def createExecutorPod(
/**
* Configure and construct an executor pod with the given parameters.
*/
def createExecutorPod(
executorId: String,
applicationId: String,
driverUrl: String,
@ -198,7 +201,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
.endSpec()
.build()
val containerWithExecutorLimitCores = executorLimitCores.map { limitCores =>
val containerWithLimitCores = executorLimitCores.map { limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
@ -209,9 +212,33 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
.build()
}.getOrElse(executorContainer)
new PodBuilder(executorPod)
val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
mountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(executorPod, containerWithLimitCores)
}.getOrElse((executorPod, containerWithLimitCores))
val (bootstrappedPod, bootstrappedContainer) =
initContainerBootstrap.map { bootstrap =>
val podWithInitContainer = bootstrap.bootstrapInitContainer(
PodWithDetachedInitContainer(
maybeSecretsMountedPod,
new ContainerBuilder().build(),
maybeSecretsMountedContainer))
val (pod, mayBeSecretsMountedInitContainer) =
initContainerMountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(podWithInitContainer.pod, podWithInitContainer.initContainer)
}.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer))
val bootstrappedPod = KubernetesUtils.appendInitContainer(
pod, mayBeSecretsMountedInitContainer)
(bootstrappedPod, podWithInitContainer.mainContainer)
}.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer))
new PodBuilder(bootstrappedPod)
.editSpec()
.addToContainers(containerWithExecutorLimitCores)
.addToContainers(bootstrappedContainer)
.endSpec()
.build()
}

View file

@ -21,9 +21,9 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.ThreadUtils
@ -45,6 +45,59 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
val sparkConf = sc.getConf
val initContainerConfigMap = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_NAME)
val initContainerConfigMapKey = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_KEY_CONF)
if (initContainerConfigMap.isEmpty) {
logWarning("The executor's init-container config map is not specified. Executors will " +
"therefore not attempt to fetch remote or submitted dependencies.")
}
if (initContainerConfigMapKey.isEmpty) {
logWarning("The executor's init-container config map key is not specified. Executors will " +
"therefore not attempt to fetch remote or submitted dependencies.")
}
// Only set up the bootstrap if they've provided both the config map key and the config map
// name. The config map might not be provided if init-containers aren't being used to
// bootstrap dependencies.
val initContainerBootstrap = for {
configMap <- initContainerConfigMap
configMapKey <- initContainerConfigMapKey
} yield {
val initContainerImage = sparkConf
.get(INIT_CONTAINER_IMAGE)
.getOrElse(throw new SparkException(
"Must specify the init-container image when there are remote dependencies"))
new InitContainerBootstrap(
initContainerImage,
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY),
sparkConf.get(JARS_DOWNLOAD_LOCATION),
sparkConf.get(FILES_DOWNLOAD_LOCATION),
configMap,
configMapKey,
SPARK_POD_EXECUTOR_ROLE,
sparkConf)
}
val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
// Mount user-specified executor secrets also into the executor's init-container. The
// init-container may need credentials in the secrets to be able to download remote
// dependencies. The executor's main container and its init-container share the secrets
// because the init-container is sort of an implementation details and this sharing
// avoids introducing a dedicated configuration property just for the init-container.
val initContainerMountSecretsBootstrap = if (initContainerBootstrap.nonEmpty &&
executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
@ -54,7 +107,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf)
val executorPodFactory = new ExecutorPodFactory(
sparkConf,
mountSecretBootstrap,
initContainerBootstrap,
initContainerMountSecretsBootstrap)
val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(

View file

@ -17,25 +17,27 @@
package org.apache.spark.deploy.k8s.submit
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config.DRIVER_CONTAINER_IMAGE
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.steps._
class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
class DriverConfigOrchestratorSuite extends SparkFunSuite {
private val NAMESPACE = "default"
private val DRIVER_IMAGE = "driver-image"
private val IC_IMAGE = "init-container-image"
private val APP_ID = "spark-app-id"
private val LAUNCH_TIME = 975256L
private val APP_NAME = "spark"
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2")
private val SECRET_FOO = "foo"
private val SECRET_BAR = "bar"
private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
test("Base submission steps with a main app resource.") {
val sparkConf = new SparkConf(false)
.set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigurationStepsOrchestrator(
NAMESPACE,
val orchestrator = new DriverConfigOrchestrator(
APP_ID,
LAUNCH_TIME,
Some(mainAppResource),
@ -45,7 +47,7 @@ class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[BasicDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep]
@ -55,8 +57,7 @@ class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
test("Base submission steps without a main app resource.") {
val sparkConf = new SparkConf(false)
.set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
val orchestrator = new DriverConfigurationStepsOrchestrator(
NAMESPACE,
val orchestrator = new DriverConfigOrchestrator(
APP_ID,
LAUNCH_TIME,
Option.empty,
@ -66,16 +67,62 @@ class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[BasicDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep]
)
}
test("Submission steps with an init-container.") {
val sparkConf = new SparkConf(false)
.set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
.set(INIT_CONTAINER_IMAGE, IC_IMAGE)
.set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar")
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigOrchestrator(
APP_ID,
LAUNCH_TIME,
Some(mainAppResource),
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
validateStepTypes(
orchestrator,
classOf[BasicDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[DriverInitContainerBootstrapStep])
}
test("Submission steps with driver secrets to mount") {
val sparkConf = new SparkConf(false)
.set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigOrchestrator(
APP_ID,
LAUNCH_TIME,
Some(mainAppResource),
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
validateStepTypes(
orchestrator,
classOf[BasicDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[DriverMountSecretsStep])
}
private def validateStepTypes(
orchestrator: DriverConfigurationStepsOrchestrator,
orchestrator: DriverConfigOrchestrator,
types: Class[_ <: DriverConfigurationStep]*): Unit = {
val steps = orchestrator.getAllConfigurationSteps()
val steps = orchestrator.getAllConfigurationSteps
assert(steps.size === types.size)
assert(steps.map(_.getClass) === types)
}

View file

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{Container, Pod}
private[spark] object SecretVolumeUtils {
def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
}
def containerHasVolume(
driverContainer: Container,
volumeName: String,
mountPath: String): Boolean = {
driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath)
}
}

View file

@ -25,7 +25,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
class BaseDriverConfigurationStepSuite extends SparkFunSuite {
class BasicDriverConfigurationStepSuite extends SparkFunSuite {
private val APP_ID = "spark-app-id"
private val RESOURCE_NAME_PREFIX = "spark"
@ -52,7 +52,7 @@ class BaseDriverConfigurationStepSuite extends SparkFunSuite {
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
val submissionStep = new BaseDriverConfigurationStep(
val submissionStep = new BasicDriverConfigurationStep(
APP_ID,
RESOURCE_NAME_PREFIX,
DRIVER_LABELS,

View file

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import java.io.StringReader
import java.util.Properties
import scala.collection.JavaConverters._
import com.google.common.collect.Maps
import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, HasMetadata, PodBuilder, SecretBuilder}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
import org.apache.spark.util.Utils
class DriverInitContainerBootstrapStepSuite extends SparkFunSuite {
private val CONFIG_MAP_NAME = "spark-init-config-map"
private val CONFIG_MAP_KEY = "spark-init-config-map-key"
test("The init container bootstrap step should use all of the init container steps") {
val baseDriverSpec = KubernetesDriverSpec(
driverPod = new PodBuilder().build(),
driverContainer = new ContainerBuilder().build(),
driverSparkConf = new SparkConf(false),
otherKubernetesResources = Seq.empty[HasMetadata])
val initContainerSteps = Seq(
FirstTestInitContainerConfigurationStep,
SecondTestInitContainerConfigurationStep)
val bootstrapStep = new DriverInitContainerBootstrapStep(
initContainerSteps,
CONFIG_MAP_NAME,
CONFIG_MAP_KEY)
val preparedDriverSpec = bootstrapStep.configureDriver(baseDriverSpec)
assert(preparedDriverSpec.driverPod.getMetadata.getLabels.asScala ===
FirstTestInitContainerConfigurationStep.additionalLabels)
val additionalDriverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
assert(additionalDriverEnv.size === 1)
assert(additionalDriverEnv.head.getName ===
FirstTestInitContainerConfigurationStep.additionalMainContainerEnvKey)
assert(additionalDriverEnv.head.getValue ===
FirstTestInitContainerConfigurationStep.additionalMainContainerEnvValue)
assert(preparedDriverSpec.otherKubernetesResources.size === 2)
assert(preparedDriverSpec.otherKubernetesResources.contains(
FirstTestInitContainerConfigurationStep.additionalKubernetesResource))
assert(preparedDriverSpec.otherKubernetesResources.exists {
case configMap: ConfigMap =>
val hasMatchingName = configMap.getMetadata.getName == CONFIG_MAP_NAME
val configMapData = configMap.getData.asScala
val hasCorrectNumberOfEntries = configMapData.size == 1
val initContainerPropertiesRaw = configMapData(CONFIG_MAP_KEY)
val initContainerProperties = new Properties()
Utils.tryWithResource(new StringReader(initContainerPropertiesRaw)) {
initContainerProperties.load(_)
}
val initContainerPropertiesMap = Maps.fromProperties(initContainerProperties).asScala
val expectedInitContainerProperties = Map(
SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyKey ->
SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyValue)
val hasMatchingProperties = initContainerPropertiesMap == expectedInitContainerProperties
hasMatchingName && hasCorrectNumberOfEntries && hasMatchingProperties
case _ => false
})
val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers
assert(initContainers.size() === 1)
val initContainerEnv = initContainers.get(0).getEnv.asScala
assert(initContainerEnv.size === 1)
assert(initContainerEnv.head.getName ===
SecondTestInitContainerConfigurationStep.additionalInitContainerEnvKey)
assert(initContainerEnv.head.getValue ===
SecondTestInitContainerConfigurationStep.additionalInitContainerEnvValue)
val expectedSparkConf = Map(
INIT_CONTAINER_CONFIG_MAP_NAME.key -> CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY_CONF.key -> CONFIG_MAP_KEY,
SecondTestInitContainerConfigurationStep.additionalDriverSparkConfKey ->
SecondTestInitContainerConfigurationStep.additionalDriverSparkConfValue)
assert(preparedDriverSpec.driverSparkConf.getAll.toMap === expectedSparkConf)
}
}
private object FirstTestInitContainerConfigurationStep extends InitContainerConfigurationStep {
val additionalLabels = Map("additionalLabelkey" -> "additionalLabelValue")
val additionalMainContainerEnvKey = "TEST_ENV_MAIN_KEY"
val additionalMainContainerEnvValue = "TEST_ENV_MAIN_VALUE"
val additionalKubernetesResource = new SecretBuilder()
.withNewMetadata()
.withName("test-secret")
.endMetadata()
.addToData("secret-key", "secret-value")
.build()
override def configureInitContainer(initContainerSpec: InitContainerSpec): InitContainerSpec = {
val driverPod = new PodBuilder(initContainerSpec.driverPod)
.editOrNewMetadata()
.addToLabels(additionalLabels.asJava)
.endMetadata()
.build()
val mainContainer = new ContainerBuilder(initContainerSpec.driverContainer)
.addNewEnv()
.withName(additionalMainContainerEnvKey)
.withValue(additionalMainContainerEnvValue)
.endEnv()
.build()
initContainerSpec.copy(
driverPod = driverPod,
driverContainer = mainContainer,
dependentResources = initContainerSpec.dependentResources ++
Seq(additionalKubernetesResource))
}
}
private object SecondTestInitContainerConfigurationStep extends InitContainerConfigurationStep {
val additionalInitContainerEnvKey = "TEST_ENV_INIT_KEY"
val additionalInitContainerEnvValue = "TEST_ENV_INIT_VALUE"
val additionalInitContainerPropertyKey = "spark.initcontainer.testkey"
val additionalInitContainerPropertyValue = "testvalue"
val additionalDriverSparkConfKey = "spark.driver.testkey"
val additionalDriverSparkConfValue = "spark.driver.testvalue"
override def configureInitContainer(initContainerSpec: InitContainerSpec): InitContainerSpec = {
val initContainer = new ContainerBuilder(initContainerSpec.initContainer)
.addNewEnv()
.withName(additionalInitContainerEnvKey)
.withValue(additionalInitContainerEnvValue)
.endEnv()
.build()
val initContainerProperties = initContainerSpec.properties ++
Map(additionalInitContainerPropertyKey -> additionalInitContainerPropertyValue)
val driverSparkConf = initContainerSpec.driverSparkConf ++
Map(additionalDriverSparkConfKey -> additionalDriverSparkConfValue)
initContainerSpec.copy(
initContainer = initContainer,
properties = initContainerProperties,
driverSparkConf = driverSparkConf)
}
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, SecretVolumeUtils}
class DriverMountSecretsStepSuite extends SparkFunSuite {
private val SECRET_FOO = "foo"
private val SECRET_BAR = "bar"
private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
test("mounts all given secrets") {
val baseDriverSpec = KubernetesDriverSpec.initialSpec(new SparkConf(false))
val secretNamesToMountPaths = Map(
SECRET_FOO -> SECRET_MOUNT_PATH,
SECRET_BAR -> SECRET_MOUNT_PATH)
val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val mountSecretsStep = new DriverMountSecretsStep(mountSecretsBootstrap)
val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec)
val driverPodWithSecretsMounted = configuredDriverSpec.driverPod
val driverContainerWithSecretsMounted = configuredDriverSpec.driverContainer
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName))
}
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
assert(SecretVolumeUtils.containerHasVolume(
driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))
}
}
}

View file

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Matchers.any
import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, PodWithDetachedInitContainer}
import org.apache.spark.deploy.k8s.Config._
class BasicInitContainerConfigurationStepSuite extends SparkFunSuite with BeforeAndAfter {
private val SPARK_JARS = Seq(
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
private val SPARK_FILES = Seq(
"hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
private val JARS_DOWNLOAD_PATH = "/var/data/jars"
private val FILES_DOWNLOAD_PATH = "/var/data/files"
private val POD_LABEL = Map("bootstrap" -> "true")
private val INIT_CONTAINER_NAME = "init-container"
private val DRIVER_CONTAINER_NAME = "driver-container"
@Mock
private var podAndInitContainerBootstrap : InitContainerBootstrap = _
before {
MockitoAnnotations.initMocks(this)
when(podAndInitContainerBootstrap.bootstrapInitContainer(
any[PodWithDetachedInitContainer])).thenAnswer(new Answer[PodWithDetachedInitContainer] {
override def answer(invocation: InvocationOnMock) : PodWithDetachedInitContainer = {
val pod = invocation.getArgumentAt(0, classOf[PodWithDetachedInitContainer])
pod.copy(
pod = new PodBuilder(pod.pod)
.withNewMetadata()
.addToLabels("bootstrap", "true")
.endMetadata()
.withNewSpec().endSpec()
.build(),
initContainer = new ContainerBuilder()
.withName(INIT_CONTAINER_NAME)
.build(),
mainContainer = new ContainerBuilder()
.withName(DRIVER_CONTAINER_NAME)
.build()
)}})
}
test("additionalDriverSparkConf with mix of remote files and jars") {
val baseInitStep = new BasicInitContainerConfigurationStep(
SPARK_JARS,
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
podAndInitContainerBootstrap)
val expectedDriverSparkConf = Map(
JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH,
INIT_CONTAINER_REMOTE_JARS.key -> "hdfs://localhost:9000/app/jars/jar1.jar",
INIT_CONTAINER_REMOTE_FILES.key -> "hdfs://localhost:9000/app/files/file1.txt")
val initContainerSpec = InitContainerSpec(
Map.empty[String, String],
Map.empty[String, String],
new Container(),
new Container(),
new Pod,
Seq.empty[HasMetadata])
val returnContainerSpec = baseInitStep.configureInitContainer(initContainerSpec)
assert(expectedDriverSparkConf === returnContainerSpec.properties)
assert(returnContainerSpec.initContainer.getName === INIT_CONTAINER_NAME)
assert(returnContainerSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
}
}

View file

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
class InitContainerConfigOrchestratorSuite extends SparkFunSuite {
private val DOCKER_IMAGE = "init-container"
private val SPARK_JARS = Seq(
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
private val SPARK_FILES = Seq(
"hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
private val JARS_DOWNLOAD_PATH = "/var/data/jars"
private val FILES_DOWNLOAD_PATH = "/var/data/files"
private val DOCKER_IMAGE_PULL_POLICY: String = "IfNotPresent"
private val CUSTOM_LABEL_KEY = "customLabel"
private val CUSTOM_LABEL_VALUE = "customLabelValue"
private val INIT_CONTAINER_CONFIG_MAP_NAME = "spark-init-config-map"
private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key"
private val SECRET_FOO = "foo"
private val SECRET_BAR = "bar"
private val SECRET_MOUNT_PATH = "/etc/secrets/init-container"
test("including basic configuration step") {
val sparkConf = new SparkConf(true)
.set(INIT_CONTAINER_IMAGE, DOCKER_IMAGE)
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
val orchestrator = new InitContainerConfigOrchestrator(
SPARK_JARS.take(1),
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOCKER_IMAGE_PULL_POLICY,
INIT_CONTAINER_CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY,
sparkConf)
val initSteps = orchestrator.getAllConfigurationSteps
assert(initSteps.lengthCompare(1) == 0)
assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep])
}
test("including step to mount user-specified secrets") {
val sparkConf = new SparkConf(false)
.set(INIT_CONTAINER_IMAGE, DOCKER_IMAGE)
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
val orchestrator = new InitContainerConfigOrchestrator(
SPARK_JARS.take(1),
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOCKER_IMAGE_PULL_POLICY,
INIT_CONTAINER_CONFIG_MAP_NAME,
INIT_CONTAINER_CONFIG_MAP_KEY,
sparkConf)
val initSteps = orchestrator.getAllConfigurationSteps
assert(initSteps.length === 2)
assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep])
assert(initSteps(1).isInstanceOf[InitContainerMountSecretsStep])
}
}

View file

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.SecretVolumeUtils
class InitContainerMountSecretsStepSuite extends SparkFunSuite {
private val SECRET_FOO = "foo"
private val SECRET_BAR = "bar"
private val SECRET_MOUNT_PATH = "/etc/secrets/init-container"
test("mounts all given secrets") {
val baseInitContainerSpec = InitContainerSpec(
Map.empty,
Map.empty,
new ContainerBuilder().build(),
new ContainerBuilder().build(),
new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
Seq.empty)
val secretNamesToMountPaths = Map(
SECRET_FOO -> SECRET_MOUNT_PATH,
SECRET_BAR -> SECRET_MOUNT_PATH)
val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap)
val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer(
baseInitContainerSpec)
val podWithSecretsMounted = configuredInitContainerSpec.driverPod
val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName)))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.containerHasVolume(
initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
}
}

View file

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.rest.k8s
import java.io.File
import java.util.UUID
import com.google.common.base.Charsets
import com.google.common.io.Files
import org.mockito.Mockito
import org.scalatest.BeforeAndAfter
import org.scalatest.mockito.MockitoSugar._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.util.Utils
class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter {
private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt")
private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt")
private var downloadJarsDir: File = _
private var downloadFilesDir: File = _
private var downloadJarsSecretValue: String = _
private var downloadFilesSecretValue: String = _
private var fileFetcher: FileFetcher = _
override def beforeAll(): Unit = {
downloadJarsSecretValue = Files.toString(
new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8)
downloadFilesSecretValue = Files.toString(
new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8)
}
before {
downloadJarsDir = Utils.createTempDir()
downloadFilesDir = Utils.createTempDir()
fileFetcher = mock[FileFetcher]
}
after {
downloadJarsDir.delete()
downloadFilesDir.delete()
}
test("Downloads from remote server should invoke the file fetcher") {
val sparkConf = getSparkConfForRemoteFileDownloads
val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher)
initContainerUnderTest.run()
Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)
}
private def getSparkConfForRemoteFileDownloads: SparkConf = {
new SparkConf(true)
.set(INIT_CONTAINER_REMOTE_JARS,
"http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar")
.set(INIT_CONTAINER_REMOTE_FILES,
"http://localhost:9000/file.txt")
.set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath)
.set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath)
}
private def createTempFile(extension: String): String = {
val dir = Utils.createTempDir()
val file = new File(dir, s"${UUID.randomUUID().toString}.$extension")
Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8)
file.getAbsolutePath
}
}

View file

@ -18,15 +18,19 @@ package org.apache.spark.scheduler.cluster.k8s
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{Pod, _}
import org.mockito.MockitoAnnotations
import io.fabric8.kubernetes.api.model._
import org.mockito.{AdditionalAnswers, MockitoAnnotations}
import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrap, PodWithDetachedInitContainer}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
private val driverPodName: String = "driver-pod"
private val driverPodUid: String = "driver-uid"
private val executorPrefix: String = "base"
@ -54,7 +58,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
}
test("basic executor pod has reasonable defaults") {
val factory = new ExecutorPodFactoryImpl(baseConf)
val factory = new ExecutorPodFactory(baseConf, None, None, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
@ -85,7 +89,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
"loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
val factory = new ExecutorPodFactoryImpl(conf)
val factory = new ExecutorPodFactory(conf, None, None, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
@ -97,7 +101,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
val factory = new ExecutorPodFactoryImpl(conf)
val factory = new ExecutorPodFactory(conf, None, None, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]())
@ -108,6 +112,74 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
}
test("executor secrets get mounted") {
val conf = baseConf.clone()
val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1"))
val factory = new ExecutorPodFactory(
conf,
Some(secretsBootstrap),
None,
None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
assert(executor.getSpec.getContainers.size() === 1)
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1)
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName
=== "secret1-volume")
assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0)
.getMountPath === "/var/secret1")
// check volume mounted.
assert(executor.getSpec.getVolumes.size() === 1)
assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1")
checkOwnerReferences(executor, driverPodUid)
}
test("init-container bootstrap step adds an init container") {
val conf = baseConf.clone()
val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
when(initContainerBootstrap.bootstrapInitContainer(
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
val factory = new ExecutorPodFactory(
conf,
None,
Some(initContainerBootstrap),
None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
assert(executor.getSpec.getInitContainers.size() === 1)
checkOwnerReferences(executor, driverPodUid)
}
test("init-container with secrets mount bootstrap") {
val conf = baseConf.clone()
val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
when(initContainerBootstrap.bootstrapInitContainer(
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1"))
val factory = new ExecutorPodFactory(
conf,
None,
Some(initContainerBootstrap),
Some(secretsBootstrap))
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
assert(executor.getSpec.getInitContainers.size() === 1)
assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0).getName
=== "secret1-volume")
assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0)
.getMountPath === "/var/secret1")
checkOwnerReferences(executor, driverPodUid)
}
// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)

View file

@ -22,7 +22,7 @@ FROM spark-base
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark-driver:latest -f kubernetes/dockerfiles/spark-base/Dockerfile .
# docker build -t spark-driver:latest -f kubernetes/dockerfiles/driver/Dockerfile .
COPY examples /opt/spark/examples
@ -31,4 +31,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS

View file

@ -22,7 +22,7 @@ FROM spark-base
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark-executor:latest -f kubernetes/dockerfiles/spark-base/Dockerfile .
# docker build -t spark-executor:latest -f kubernetes/dockerfiles/executor/Dockerfile .
COPY examples /opt/spark/examples
@ -31,4 +31,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

View file

@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM spark-base
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
# docker build -t spark-init:latest -f kubernetes/dockerfiles/init-container/Dockerfile .
ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.rest.k8s.SparkPodInitContainer" ]