[SPARK-22646][K8S] Spark on Kubernetes - basic submission client

This PR contains implementation of the basic submission client for the cluster mode of Spark on Kubernetes. It's step 2 from the step-wise plan documented [here](https://github.com/apache-spark-on-k8s/spark/issues/441#issuecomment-330802935).
This addition is covered by the [SPIP](http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-td22147.html) vote which passed on Aug 31.

This PR and #19468 together form a MVP of Spark on Kubernetes that allows users to run Spark applications that use resources locally within the driver and executor containers on Kubernetes 1.6 and up. Some changes on pom and build/test setup are copied over from #19468 to make this PR self contained and testable.

The submission client is mainly responsible for creating the Kubernetes pod that runs the Spark driver. It follows a step-based approach to construct the driver pod, as the code under the `submit.steps` package shows. The steps are orchestrated by `DriverConfigurationStepsOrchestrator`. `Client` creates the driver pod and waits for the application to complete if it's configured to do so, which is the case by default.

This PR also contains Dockerfiles of the driver and executor images. They are included because some of the environment variables set in the code would not make sense without referring to the Dockerfiles.

* The patch contains unit tests which are passing.
* Manual testing: ./build/mvn -Pkubernetes clean package succeeded.
* It is a subset of the entire changelist hosted at http://github.com/apache-spark-on-k8s/spark which is in active use in several organizations.
* There is integration testing enabled in the fork currently hosted by PepperData which is being moved over to RiseLAB CI.
* Detailed documentation on trying out the patch in its entirety is in: https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html

cc rxin felixcheung mateiz (shepherd)
k8s-big-data SIG members & contributors: mccheah foxish ash211 ssuchter varunkatta kimoonkim erikerlandson tnachen ifilonenko liyinan926

Author: Yinan Li <liyinan926@gmail.com>

Closes #19717 from liyinan926/spark-kubernetes-4.
This commit is contained in:
Yinan Li 2017-12-11 15:14:59 -08:00 committed by Marcelo Vanzin
parent c235b5f977
commit 3f4060c340
39 changed files with 2566 additions and 67 deletions

View file

@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>

View file

@ -668,7 +668,11 @@ private[spark] object SparkConf extends Logging {
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3"))
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
DRIVER_MEMORY_OVERHEAD.key -> Seq(
AlternateConfig("spark.yarn.driver.memoryOverhead", "2.3")),
EXECUTOR_MEMORY_OVERHEAD.key -> Seq(
AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3"))
)
/**

View file

@ -18,7 +18,6 @@
package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}

View file

@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
// Deploy modes
private val CLIENT = 1
@ -97,6 +98,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
"org.apache.spark.deploy.yarn.YarnClusterApplication"
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
// scalastyle:off println
private[spark] def printVersionAndExit(): Unit = {
@ -257,9 +260,10 @@ object SparkSubmit extends CommandLineUtils with Logging {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
@ -294,6 +298,16 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}
if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
printErrorAndExit(
"Could not load KUBERNETES classes. " +
"This copy of Spark may not have been compiled with KUBERNETES support.")
}
}
// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
@ -302,6 +316,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
@ -322,6 +342,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
@ -557,19 +578,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),
// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = "spark.driver.supervise"),
@ -703,6 +724,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
}
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
}
}
}
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)

View file

@ -515,8 +515,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
outStream.println(
s"""
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local
| (Default: local[*]).
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
| k8s://https://host:port, or local (Default: local[*]).
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
| on one of the worker machines inside the cluster ("cluster")
| (Default: client).

View file

@ -41,6 +41,10 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead")
.bytesConf(ByteUnit.MiB)
.createOptional
private[spark] val EVENT_LOG_COMPRESS =
ConfigBuilder("spark.eventLog.compress")
.booleanConf
@ -80,6 +84,10 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
.bytesConf(ByteUnit.MiB)
.createOptional
private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled")
.doc("If true, Spark will attempt to use off-heap memory for certain operations. " +
"If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.")

View file

@ -2744,6 +2744,42 @@ private[spark] object Utils extends Logging {
}
}
/**
* Check the validity of the given Kubernetes master URL and return the resolved URL. Prefix
* "k8s:" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* in canCreate to determine if the KubernetesClusterManager should be used.
*/
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
require(rawMasterURL.startsWith("k8s://"),
"Kubernetes master URL must start with k8s://.")
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
// To handle master URLs, e.g., k8s://host:port.
if (!masterWithoutK8sPrefix.contains("://")) {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
return s"k8s:$resolvedURL"
}
val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
val resolvedURL = masterScheme.toLowerCase match {
case "https" =>
masterWithoutK8sPrefix
case "http" =>
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
masterWithoutK8sPrefix
case null =>
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
resolvedURL
case _ =>
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)
}
return s"k8s:$resolvedURL"
}
}
private[util] object CallerContext extends Logging {

View file

@ -550,6 +550,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
test("client mode with a k8s master url") {
intercept[SparkException] {
sc = new SparkContext("k8s://https://host:port", "test", new SparkConf())
}
}
testCancellingTasks("that raise interrupted exception on cancel") {
Thread.sleep(9999999)
}

View file

@ -388,6 +388,33 @@ class SparkSubmitSuite
conf.get("spark.ui.enabled") should be ("false")
}
test("handles k8s cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "k8s://host:port",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--driver-memory", "4g",
"--conf", "spark.kubernetes.namespace=spark",
"--conf", "spark.kubernetes.driver.docker.image=bar",
"/home/thejar.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)
val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
childArgsMap.get("--arg") should be (Some("arg1"))
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
classpath should have length (0)
conf.get("spark.master") should be ("k8s:https://host:port")
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("spark")
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")
}
test("handles confs with flag equivalents") {
val clArgs = Seq(
"--deploy-mode", "cluster",

View file

@ -1146,6 +1146,27 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
}
test("check Kubernetes master URL") {
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
assert(k8sMasterURLHttps === "k8s:https://host:port")
val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
assert(k8sMasterURLHttp === "k8s:http://host:port")
val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443")
assert(k8sMasterURLWithoutScheme === "k8s:https://127.0.0.1:8443")
val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1")
assert(k8sMasterURLWithoutScheme2 === "k8s:https://127.0.0.1")
intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
}
intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s://foo://host:port")
}
}
}
private class SimpleExtension

View file

@ -157,6 +157,16 @@ of the most common options to set are:
or in your default properties file.
</td>
</tr>
<tr>
<td><code>spark.driver.memoryOverhead</code></td>
<td>driverMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is
memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
This tends to grow with the container size (typically 6-10%). This option is currently supported
on YARN and Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.executor.memory</code></td>
<td>1g</td>
@ -164,6 +174,16 @@ of the most common options to set are:
Amount of memory to use per executor process (e.g. <code>2g</code>, <code>8g</code>).
</td>
</tr>
<tr>
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that
accounts for things like VM overheads, interned strings, other native overheads, etc. This tends
to grow with the executor size (typically 6-10%). This option is currently supported on YARN and
Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.extraListeners</code></td>
<td>(none)</td>

View file

@ -227,25 +227,11 @@ To use a custom metrics.properties for the application master and executors, upd
The number of executors for static allocation. With <code>spark.dynamicAllocation.enabled</code>, the initial set of executors will be at least this large.
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>driverMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.am.memoryOverhead</code></td>
<td>AM memory * 0.10, with minimum of 384 </td>
<td>
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
Same as <code>spark.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
</td>
</tr>
<tr>

View file

@ -82,7 +82,7 @@ class SparkSubmitOptionParser {
* name of the option, passed to {@link #handle(String, String)}.
* <p>
* Options not listed here nor in the "switch" list below will result in a call to
* {@link $#handleUnknown(String)}.
* {@link #handleUnknown(String)}.
* <p>
* These two arrays are visible for tests.
*/

View file

@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s
import java.util.concurrent.TimeUnit
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
@ -24,12 +26,16 @@ private[spark] object Config extends Logging {
val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods. When using " +
"spark-submit in cluster mode, this can also be passed to spark-submit via the " +
"--kubernetes-namespace command line argument.")
.doc("The namespace that will be used for running the driver and executor pods.")
.stringConf
.createWithDefault("default")
val DRIVER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.docker.image")
.doc("Docker image to use for the driver. Specify this using the standard Docker tag format.")
.stringConf
.createOptional
val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("Docker image to use for the executors. Specify this using the standard Docker tag " +
@ -44,9 +50,9 @@ private[spark] object Config extends Logging {
.checkValues(Set("Always", "Never", "IfNotPresent"))
.createWithDefault("IfNotPresent")
val APISERVER_AUTH_DRIVER_CONF_PREFIX =
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
@ -55,7 +61,7 @@ private[spark] object Config extends Logging {
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
"this service account when requesting executor pods from the API server. If specific " +
"credentials are given for the driver pod to use, the driver will favor " +
@ -63,19 +69,17 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This " +
"is memory that accounts for things like VM overheads, interned strings, other native " +
"overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
val KUBERNETES_DRIVER_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.driver.limit.cores")
.doc("Specify the hard cpu limit for the driver pod")
.stringConf
.createOptional
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for each executor pod")
.stringConf
.createOptional
val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
@ -104,12 +108,6 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
.createWithDefault(1)
val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional
val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
.doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
@ -119,5 +117,46 @@ private[spark] object Config extends Logging {
"must be a positive integer")
.createWithDefault(10)
val WAIT_FOR_APP_COMPLETION =
ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
"launcher process.")
.booleanConf
.createWithDefault(true)
val REPORT_INTERVAL =
ConfigBuilder("spark.kubernetes.report.interval")
.doc("Interval between reports of the current app status in cluster mode.")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")
val JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-jars")
val FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-files")
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
}

View file

@ -25,9 +25,30 @@ private[spark] object Constants {
val SPARK_POD_DRIVER_ROLE = "driver"
val SPARK_POD_EXECUTOR_ROLE = "executor"
// Annotations
val SPARK_APP_NAME_ANNOTATION = "spark-app-name"
// Credentials secrets
val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
"/mnt/secrets/spark-kubernetes-credentials"
val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
val DRIVER_CREDENTIALS_CA_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"
// Default and fixed ports
val DEFAULT_DRIVER_PORT = 7078
val DEFAULT_BLOCKMANAGER_PORT = 7079
val DRIVER_PORT_NAME = "driver-rpc-port"
val BLOCK_MANAGER_PORT_NAME = "blockmanager"
val EXECUTOR_PORT_NAME = "executor"
@ -42,9 +63,16 @@ private[spark] object Constants {
val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN_MIB = 384L
}

View file

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import java.util.UUID
import com.google.common.primitives.Longs
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.steps._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.SystemClock
/**
* Constructs the complete list of driver configuration steps to run to deploy the Spark driver.
*/
private[spark] class DriverConfigurationStepsOrchestrator(
namespace: String,
kubernetesAppId: String,
launchTime: Long,
mainAppResource: Option[MainAppResource],
appName: String,
mainClass: String,
appArgs: Array[String],
submissionSparkConf: SparkConf) {
// The resource name prefix is derived from the Spark application name, making it easy to connect
// the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
// application the user submitted.
private val kubernetesResourceNamePrefix = {
val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)).toString.replaceAll("-", "")
s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
}
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val jarsDownloadPath = submissionSparkConf.get(JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = submissionSparkConf.get(FILES_DOWNLOAD_LOCATION)
def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
val allDriverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val initialSubmissionStep = new BaseDriverConfigurationStep(
kubernetesAppId,
kubernetesResourceNamePrefix,
allDriverLabels,
dockerImagePullPolicy,
appName,
mainClass,
appArgs,
submissionSparkConf)
val driverAddressStep = new DriverServiceBootstrapStep(
kubernetesResourceNamePrefix,
allDriverLabels,
submissionSparkConf,
new SystemClock)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)
val additionalMainAppJar = if (mainAppResource.nonEmpty) {
val mayBeResource = mainAppResource.get match {
case JavaMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
Some(resource)
case _ => None
}
mayBeResource
} else {
None
}
val sparkJars = submissionSparkConf.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
additionalMainAppJar.toSeq
val sparkFiles = submissionSparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val maybeDependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
Some(new DependencyResolutionStep(
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath))
} else {
None
}
Seq(
initialSubmissionStep,
driverAddressStep,
kubernetesCredentialsStep) ++
maybeDependencyResolutionStep.toSeq
}
}

View file

@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import java.util.{Collections, UUID}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
/**
* Encapsulates arguments to the submission client.
*
* @param mainAppResource the main application resource if any
* @param mainClass the main class of the application to run
* @param driverArgs arguments to the driver
*/
private[spark] case class ClientArguments(
mainAppResource: Option[MainAppResource],
mainClass: String,
driverArgs: Array[String])
private[spark] object ClientArguments {
def fromCommandLineArgs(args: Array[String]): ClientArguments = {
var mainAppResource: Option[MainAppResource] = None
var mainClass: Option[String] = None
val driverArgs = mutable.ArrayBuffer.empty[String]
args.sliding(2, 2).toList.foreach {
case Array("--primary-java-resource", primaryJavaResource: String) =>
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
case Array("--main-class", clazz: String) =>
mainClass = Some(clazz)
case Array("--arg", arg: String) =>
driverArgs += arg
case other =>
val invalid = other.mkString(" ")
throw new RuntimeException(s"Unknown arguments: $invalid")
}
require(mainClass.isDefined, "Main class must be specified via --main-class")
ClientArguments(
mainAppResource,
mainClass.get,
driverArgs.toArray)
}
}
/**
* Submits a Spark application to run on Kubernetes by creating the driver pod and starting a
* watcher that monitors and logs the application status. Waits for the application to terminate if
* spark.kubernetes.submission.waitAppCompletion is true.
*
* @param submissionSteps steps that collectively configure the driver
* @param submissionSparkConf the submission client Spark configuration
* @param kubernetesClient the client to talk to the Kubernetes API server
* @param waitForAppCompletion a flag indicating whether the client should wait for the application
* to complete
* @param appName the application name
* @param loggingPodStatusWatcher a watcher that monitors and logs the application status
*/
private[spark] class Client(
submissionSteps: Seq[DriverConfigurationStep],
submissionSparkConf: SparkConf,
kubernetesClient: KubernetesClient,
waitForAppCompletion: Boolean,
appName: String,
loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
private val driverJavaOptions = submissionSparkConf.get(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
/**
* Run command that initializes a DriverSpec that will be updated after each
* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec
* will be used to build the Driver Container, Driver Pod, and Kubernetes Resources
*/
def run(): Unit = {
var currentDriverSpec = KubernetesDriverSpec.initialSpec(submissionSparkConf)
// submissionSteps contain steps necessary to take, to resolve varying
// client arguments that are passed in, created by orchestrator
for (nextStep <- submissionSteps) {
currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
}
val resolvedDriverJavaOpts = currentDriverSpec
.driverSparkConf
// Remove this as the options are instead extracted and set individually below using
// environment variables with prefix SPARK_JAVA_OPT_.
.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
.getAll
.map {
case (confKey, confValue) => s"-D$confKey=$confValue"
} ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map {
case (option, index) =>
new EnvVarBuilder()
.withName(s"$ENV_JAVA_OPT_PREFIX$index")
.withValue(option)
.build()
}
val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
.addAllToEnv(driverJavaOptsEnvs.asJava)
.build()
val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
.editSpec()
.addToContainers(resolvedDriverContainer)
.endSpec()
.build()
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(loggingPodStatusWatcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
val otherKubernetesResources = currentDriverSpec.otherKubernetesResources
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
}
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
if (waitForAppCompletion) {
logInfo(s"Waiting for application $appName to finish...")
loggingPodStatusWatcher.awaitCompletion()
logInfo(s"Application $appName finished.")
} else {
logInfo(s"Deployed Spark application $appName into Kubernetes.")
}
}
}
// Add a OwnerReference to the given resources making the driver pod an owner of them so when
// the driver pod is deleted, the resources are garbage collected.
private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(driverPod.getMetadata.getName)
.withApiVersion(driverPod.getApiVersion)
.withUid(driverPod.getMetadata.getUid)
.withKind(driverPod.getKind)
.withController(true)
.build()
resources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
}
}
}
/**
* Main class and entry point of application submission in KUBERNETES mode.
*/
private[spark] class KubernetesClientApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
run(parsedArguments, conf)
}
private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
// For constructing the app ID, we can't use the Spark application name, as the app ID is going
// to be added as a label to group resources belonging to the same application. Label values are
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below.
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val launchTime = System.currentTimeMillis()
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s:" prefix here.
val master = sparkConf.get("spark.master").substring("k8s:".length)
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
kubernetesAppId, loggingInterval)
val configurationStepsOrchestrator = new DriverConfigurationStepsOrchestrator(
namespace,
kubernetesAppId,
launchTime,
clientArguments.mainAppResource,
appName,
clientArguments.mainClass,
clientArguments.driverArgs,
sparkConf)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Some(namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
sparkConf,
None,
None)) { kubernetesClient =>
val client = new Client(
configurationStepsOrchestrator.getAllConfigurationSteps(),
sparkConf,
kubernetesClient,
waitForAppCompletion,
appName,
loggingPodStatusWatcher)
client.run()
}
}
}

View file

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, HasMetadata, Pod, PodBuilder}
import org.apache.spark.SparkConf
/**
* Represents the components and characteristics of a Spark driver. The driver can be considered
* as being comprised of the driver pod itself, any other Kubernetes resources that the driver
* pod depends on, and the SparkConf that should be supplied to the Spark application. The driver
* container should be operated on via the specific field of this case class as opposed to trying
* to edit the container directly on the pod. The driver container should be attached at the
* end of executing all submission steps.
*/
private[spark] case class KubernetesDriverSpec(
driverPod: Pod,
driverContainer: Container,
otherKubernetesResources: Seq[HasMetadata],
driverSparkConf: SparkConf)
private[spark] object KubernetesDriverSpec {
def initialSpec(initialSparkConf: SparkConf): KubernetesDriverSpec = {
KubernetesDriverSpec(
// Set new metadata and a new spec so that submission steps can use
// PodBuilder#editMetadata() and/or PodBuilder#editSpec() safely.
new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
new ContainerBuilder().build(),
Seq.empty[HasMetadata],
initialSparkConf.clone())
}
}

View file

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import java.io.File
import org.apache.spark.util.Utils
private[spark] object KubernetesFileUtils {
/**
* For the given collection of file URIs, resolves them as follows:
* - File URIs with scheme file:// are resolved to the given download path.
* - File URIs with scheme local:// resolve to just the path of the URI.
* - Otherwise, the URIs are returned as-is.
*/
def resolveFileUris(
fileUris: Iterable[String],
fileDownloadPath: String): Iterable[String] = {
fileUris.map { uri =>
resolveFileUri(uri, fileDownloadPath, false)
}
}
/**
* If any file uri has any scheme other than local:// it is mapped as if the file
* was downloaded to the file download path. Otherwise, it is mapped to the path
* part of the URI.
*/
def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
fileUris.map { uri =>
resolveFileUri(uri, fileDownloadPath, true)
}
}
private def resolveFileUri(
uri: String,
fileDownloadPath: String,
assumesDownloaded: Boolean): String = {
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
case "local" =>
fileUri.getPath
case _ =>
if (assumesDownloaded || fileScheme == "file") {
val fileName = new File(fileUri.getPath).getName
s"$fileDownloadPath/$fileName"
} else {
uri
}
}
}
}

View file

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils
private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
def awaitCompletion(): Unit
}
/**
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
* every state change and also at an interval for liveness.
*
* @param appId application ID.
* @param maybeLoggingInterval ms between each state request. If provided, must be a positive
* number.
*/
private[k8s] class LoggingPodStatusWatcherImpl(
appId: String,
maybeLoggingInterval: Option[Long])
extends LoggingPodStatusWatcher with Logging {
private val podCompletedFuture = new CountDownLatch(1)
// start timer for periodic logging
private val scheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
private val logRunnable: Runnable = new Runnable {
override def run() = logShortStatus()
}
private var pod = Option.empty[Pod]
private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
def start(): Unit = {
maybeLoggingInterval.foreach { interval =>
scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
}
}
override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)
action match {
case Action.DELETED | Action.ERROR =>
closeWatch()
case _ =>
logLongStatus()
if (hasCompleted()) {
closeWatch()
}
}
}
override def onClose(e: KubernetesClientException): Unit = {
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
closeWatch()
}
private def logShortStatus() = {
logInfo(s"Application status for $appId (phase: $phase)")
}
private def logLongStatus() = {
logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown"))
}
private def hasCompleted(): Boolean = {
phase == "Succeeded" || phase == "Failed"
}
private def closeWatch(): Unit = {
podCompletedFuture.countDown()
scheduler.shutdown()
}
private def formatPodState(pod: Pod): String = {
val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName),
("namespace", pod.getMetadata.getNamespace),
("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),
// spec details
("service account name", pod.getSpec.getServiceAccountName),
("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName),
// status
("start time", formatTime(pod.getStatus.getStartTime)),
("container images",
pod.getStatus.getContainerStatuses
.asScala
.map(_.getImage)
.mkString(", ")),
("phase", pod.getStatus.getPhase),
("status", pod.getStatus.getContainerStatuses.toString)
)
formatPairsBundle(details)
}
private def formatPairsBundle(pairs: Seq[(String, String)]) = {
// Use more loggable format if value is null or empty
pairs.map {
case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
}.mkString("")
}
override def awaitCompletion(): Unit = {
podCompletedFuture.await()
logInfo(pod.map { p =>
s"Container final statuses:\n\n${containersDescription(p)}"
}.getOrElse("No containers were found in the driver pod."))
}
private def containersDescription(p: Pod): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
("Container name", status.getName),
("Container image", status.getImage)) ++
containerStatusDescription(status)
}.map(formatPairsBundle).mkString("\n\n")
}
private def containerStatusDescription(
containerStatus: ContainerStatus): Seq[(String, String)] = {
val state = containerStatus.getState
Option(state.getRunning)
.orElse(Option(state.getTerminated))
.orElse(Option(state.getWaiting))
.map {
case running: ContainerStateRunning =>
Seq(
("Container state", "Running"),
("Container started at", formatTime(running.getStartedAt)))
case waiting: ContainerStateWaiting =>
Seq(
("Container state", "Waiting"),
("Pending reason", waiting.getReason))
case terminated: ContainerStateTerminated =>
Seq(
("Container state", "Terminated"),
("Exit code", terminated.getExitCode.toString))
case unknown =>
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
}.getOrElse(Seq(("Container state", "N/A")))
}
private def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
}
}

View file

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
private[spark] sealed trait MainAppResource
private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource

View file

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
/**
* Represents the initial setup required for the driver.
*/
private[spark] class BaseDriverConfigurationStep(
kubernetesAppId: String,
kubernetesResourceNamePrefix: String,
driverLabels: Map[String, String],
dockerImagePullPolicy: String,
appName: String,
mainClass: String,
appArgs: Array[String],
submissionSparkConf: SparkConf) extends DriverConfigurationStep {
private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(s"$kubernetesResourceNamePrefix-driver")
private val driverExtraClasspath = submissionSparkConf.get(
DRIVER_CLASS_PATH)
private val driverDockerImage = submissionSparkConf
.get(DRIVER_DOCKER_IMAGE)
.getOrElse(throw new SparkException("Must specify the driver Docker image"))
// CPU settings
private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
// Memory settings
private val driverMemoryMiB = submissionSparkConf.get(
DRIVER_MEMORY)
private val driverMemoryString = submissionSparkConf.get(
DRIVER_MEMORY.key,
DRIVER_MEMORY.defaultValueString)
private val memoryOverheadMiB = submissionSparkConf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
new EnvVarBuilder()
.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
.withValue(classPath)
.build()
}
val driverCustomAnnotations = ConfigurationUtils
.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
" Spark bookkeeping operations.")
val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
.map { env =>
new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
}
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
.build()
val driverMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${driverMemoryMiB}Mi")
.build()
val driverMemoryLimitQuantity = new QuantityBuilder(false)
.withAmount(s"${driverContainerMemoryWithOverheadMiB}Mi")
.build()
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
}
val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy(dockerImagePullPolicy)
.addAllToEnv(driverCustomEnvs.asJava)
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
.addNewEnv()
.withName(ENV_DRIVER_MEMORY)
.withValue(driverMemoryString)
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_MAIN_CLASS)
.withValue(mainClass)
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_ARGS)
.withValue(appArgs.map(arg => "\"" + arg + "\"").mkString(" "))
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryLimitQuantity)
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.endResources()
.build()
val baseDriverPod = new PodBuilder(driverSpec.driverPod)
.editOrNewMetadata()
.withName(kubernetesDriverPodName)
.addToLabels(driverLabels.asJava)
.addToAnnotations(allDriverAnnotations.asJava)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
.withNodeSelector(nodeSelector.asJava)
.endSpec()
.build()
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
.set("spark.app.id", kubernetesAppId)
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, kubernetesResourceNamePrefix)
driverSpec.copy(
driverPod = baseDriverPod,
driverSparkConf = resolvedSparkConf,
driverContainer = driverContainer)
}
}

View file

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import java.io.File
import io.fabric8.kubernetes.api.model.ContainerBuilder
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, KubernetesFileUtils}
/**
* Step that configures the classpath, spark.jars, and spark.files for the driver given that the
* user may provide remote files or files with local:// schemes.
*/
private[spark] class DependencyResolutionStep(
sparkJars: Seq[String],
sparkFiles: Seq[String],
jarsDownloadPath: String,
localFilesDownloadPath: String) extends DriverConfigurationStep {
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val resolvedSparkJars = KubernetesFileUtils.resolveFileUris(sparkJars, jarsDownloadPath)
val resolvedSparkFiles = KubernetesFileUtils.resolveFileUris(
sparkFiles, localFilesDownloadPath)
val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone()
if (resolvedSparkJars.nonEmpty) {
sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(","))
}
if (resolvedSparkFiles.nonEmpty) {
sparkConfResolvedSparkDependencies.set("spark.files", resolvedSparkFiles.mkString(","))
}
val resolvedClasspath = KubernetesFileUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
val driverContainerWithResolvedClasspath = if (resolvedClasspath.nonEmpty) {
new ContainerBuilder(driverSpec.driverContainer)
.addNewEnv()
.withName(ENV_MOUNTED_CLASSPATH)
.withValue(resolvedClasspath.mkString(File.pathSeparator))
.endEnv()
.build()
} else {
driverSpec.driverContainer
}
driverSpec.copy(
driverContainer = driverContainerWithResolvedClasspath,
driverSparkConf = sparkConfResolvedSparkDependencies)
}
}

View file

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
/**
* Represents a step in preparing the Kubernetes driver.
*/
private[spark] trait DriverConfigurationStep {
/**
* Apply some transformation to the previous state of the driver to add a new feature to it.
*/
def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec
}

View file

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import java.io.File
import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import com.google.common.io.{BaseEncoding, Files}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
/**
* Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials
* to request executors.
*/
private[spark] class DriverKubernetesCredentialsStep(
submissionSparkConf: SparkConf,
kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
private val maybeMountedClientCertFile = submissionSparkConf.getOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
private val maybeMountedCaCertFile = submissionSparkConf.getOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val driverSparkConf = driverSpec.driverSparkConf.clone()
val oauthTokenBase64 = submissionSparkConf
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
.map { token =>
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
}
val caCertDataBase64 = safeFileConfToBase64(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
"Driver CA cert file")
val clientKeyDataBase64 = safeFileConfToBase64(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
"Driver client key file")
val clientCertDataBase64 = safeFileConfToBase64(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
"Driver client cert file")
val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations(
driverSparkConf,
oauthTokenBase64,
caCertDataBase64,
clientKeyDataBase64,
clientCertDataBase64)
val kubernetesCredentialsSecret = createCredentialsSecret(
oauthTokenBase64,
caCertDataBase64,
clientKeyDataBase64,
clientCertDataBase64)
val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret =>
new PodBuilder(driverSpec.driverPod)
.editOrNewSpec()
.addNewVolume()
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
.endVolume()
.endSpec()
.build()
}.getOrElse(
driverServiceAccount.map { account =>
new PodBuilder(driverSpec.driverPod)
.editOrNewSpec()
.withServiceAccount(account)
.withServiceAccountName(account)
.endSpec()
.build()
}.getOrElse(driverSpec.driverPod)
)
val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { secret =>
new ContainerBuilder(driverSpec.driverContainer)
.addNewVolumeMount()
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
.withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
.endVolumeMount()
.build()
}.getOrElse(driverSpec.driverContainer)
driverSpec.copy(
driverPod = driverPodWithMountedKubernetesCredentials,
otherKubernetesResources =
driverSpec.otherKubernetesResources ++ kubernetesCredentialsSecret.toSeq,
driverSparkConf = driverSparkConfWithCredentialsLocations,
driverContainer = driverContainerWithMountedSecretVolume)
}
private def createCredentialsSecret(
driverOAuthTokenBase64: Option[String],
driverCaCertDataBase64: Option[String],
driverClientKeyDataBase64: Option[String],
driverClientCertDataBase64: Option[String]): Option[Secret] = {
val allSecretData =
resolveSecretData(
driverClientKeyDataBase64,
DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
resolveSecretData(
driverClientCertDataBase64,
DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
resolveSecretData(
driverCaCertDataBase64,
DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
resolveSecretData(
driverOAuthTokenBase64,
DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
if (allSecretData.isEmpty) {
None
} else {
Some(new SecretBuilder()
.withNewMetadata()
.withName(s"$kubernetesResourceNamePrefix-kubernetes-credentials")
.endMetadata()
.withData(allSecretData.asJava)
.build())
}
}
private def setDriverPodKubernetesCredentialLocations(
driverSparkConf: SparkConf,
driverOauthTokenBase64: Option[String],
driverCaCertDataBase64: Option[String],
driverClientKeyDataBase64: Option[String],
driverClientCertDataBase64: Option[String]): SparkConf = {
val resolvedMountedOAuthTokenFile = resolveSecretLocation(
maybeMountedOAuthTokenFile,
driverOauthTokenBase64,
DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
val resolvedMountedClientKeyFile = resolveSecretLocation(
maybeMountedClientKeyFile,
driverClientKeyDataBase64,
DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
val resolvedMountedClientCertFile = resolveSecretLocation(
maybeMountedClientCertFile,
driverClientCertDataBase64,
DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
val resolvedMountedCaCertFile = resolveSecretLocation(
maybeMountedCaCertFile,
driverCaCertDataBase64,
DRIVER_CREDENTIALS_CA_CERT_PATH)
val sparkConfWithCredentialLocations = driverSparkConf
.setOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
resolvedMountedCaCertFile)
.setOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
resolvedMountedClientKeyFile)
.setOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
resolvedMountedClientCertFile)
.setOption(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
resolvedMountedOAuthTokenFile)
// Redact all OAuth token values
sparkConfWithCredentialLocations
.getAll
.filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX)).map(_._1)
.foreach {
sparkConfWithCredentialLocations.set(_, "<present_but_redacted>")
}
sparkConfWithCredentialLocations
}
private def safeFileConfToBase64(conf: String, fileType: String): Option[String] = {
submissionSparkConf.getOption(conf)
.map(new File(_))
.map { file =>
require(file.isFile, String.format("%s provided at %s does not exist or is not a file.",
fileType, file.getAbsolutePath))
BaseEncoding.base64().encode(Files.toByteArray(file))
}
}
private def resolveSecretLocation(
mountedUserSpecified: Option[String],
valueMountedFromSubmitter: Option[String],
mountedCanonicalLocation: String): Option[String] = {
mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ =>
mountedCanonicalLocation
})
}
/**
* Resolve a Kubernetes secret data entry from an optional client credential used by the
* driver to talk to the Kubernetes API server.
*
* @param userSpecifiedCredential the optional user-specified client credential.
* @param secretName name of the Kubernetes secret storing the client credential.
* @return a secret data entry in the form of a map from the secret name to the secret data,
* which may be empty if the user-specified credential is empty.
*/
private def resolveSecretData(
userSpecifiedCredential: Option[String],
secretName: String): Map[String, String] = {
userSpecifiedCredential.map { valueBase64 =>
Map(secretName -> valueBase64)
}.getOrElse(Map.empty[String, String])
}
private implicit def augmentSparkConf(sparkConf: SparkConf): OptionSettableSparkConf = {
new OptionSettableSparkConf(sparkConf)
}
}
private class OptionSettableSparkConf(sparkConf: SparkConf) {
def setOption(configEntry: String, option: Option[String]): SparkConf = {
option.foreach { opt =>
sparkConf.set(configEntry, opt)
}
sparkConf
}
}

View file

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.ServiceBuilder
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.internal.Logging
import org.apache.spark.util.Clock
/**
* Allows the driver to be reachable by executor pods through a headless service. The service's
* ports should correspond to the ports that the executor will reach the pod at for RPC.
*/
private[spark] class DriverServiceBootstrapStep(
kubernetesResourceNamePrefix: String,
driverLabels: Map[String, String],
submissionSparkConf: SparkConf,
clock: Clock) extends DriverConfigurationStep with Logging {
import DriverServiceBootstrapStep._
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " +
"address is managed and set to the driver pod's IP address.")
require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
"managed via a Kubernetes service.")
val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = clock.getTimeMillis()
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
val driverBlockManagerPort = submissionSparkConf.getInt(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
val driverService = new ServiceBuilder()
.withNewMetadata()
.withName(resolvedServiceName)
.endMetadata()
.withNewSpec()
.withClusterIP("None")
.withSelector(driverLabels.asJava)
.addNewPort()
.withName(DRIVER_PORT_NAME)
.withPort(driverPort)
.withNewTargetPort(driverPort)
.endPort()
.addNewPort()
.withName(BLOCK_MANAGER_PORT_NAME)
.withPort(driverBlockManagerPort)
.withNewTargetPort(driverBlockManagerPort)
.endPort()
.endSpec()
.build()
val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
.set(DRIVER_HOST_KEY, driverHostname)
.set("spark.driver.port", driverPort.toString)
.set(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
driverSpec.copy(
driverSparkConf = resolvedSparkConf,
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
}
}
private[spark] object DriverServiceBootstrapStep {
val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
val DRIVER_SVC_POSTFIX = "-driver-svc"
val MAX_SERVICE_NAME_LENGTH = 63
}

View file

@ -24,6 +24,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.util.Utils
/**
@ -46,8 +47,7 @@ private[spark] trait ExecutorPodFactory {
private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
extends ExecutorPodFactory {
private val executorExtraClasspath =
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
@ -81,13 +81,12 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY)
private val executorMemoryString = sparkConf.get(
org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
private val memoryOverheadMiB = sparkConf
.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
.get(EXECUTOR_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
@ -129,7 +128,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
.build()
}
val executorExtraJavaOptionsEnv = sparkConf
.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
.get(EXECUTOR_JAVA_OPTIONS)
.map { opts =>
val delimitedOpts = Utils.splitCommandString(opts)
delimitedOpts.zipWithIndex.map {

View file

@ -20,7 +20,7 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import org.apache.spark.SparkContext
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
@ -33,6 +33,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
if (masterURL.startsWith("k8s") && sc.deployMode == "client") {
throw new SparkException("Client mode is currently not supported for Kubernetes.")
}
new TaskSchedulerImpl(sc)
}
@ -45,7 +49,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sparkConf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))

View file

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._
import com.google.common.collect.Iterables
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Mockito.{doReturn, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.scalatest.mockito.MockitoSugar._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private val DRIVER_POD_UID = "pod-id"
private val DRIVER_POD_API_VERSION = "v1"
private val DRIVER_POD_KIND = "pod"
private type ResourceList = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata, Boolean]
private type Pods = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
private var podOperations: Pods = _
@Mock
private var namedPods: PodResource[Pod, DoneablePod] = _
@Mock
private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
@Mock
private var resourceList: ResourceList = _
private val submissionSteps = Seq(FirstTestConfigurationStep, SecondTestConfigurationStep)
private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _
before {
MockitoAnnotations.initMocks(this)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(FirstTestConfigurationStep.podName)).thenReturn(namedPods)
createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod])
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
when(podOperations.create(createdPodArgumentCaptor.capture())).thenAnswer(new Answer[Pod] {
override def answer(invocation: InvocationOnMock): Pod = {
new PodBuilder(invocation.getArgumentAt(0, classOf[Pod]))
.editMetadata()
.withUid(DRIVER_POD_UID)
.endMetadata()
.withApiVersion(DRIVER_POD_API_VERSION)
.withKind(DRIVER_POD_KIND)
.build()
}
})
when(podOperations.withName(FirstTestConfigurationStep.podName)).thenReturn(namedPods)
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
doReturn(resourceList)
.when(kubernetesClient)
.resourceList(createdResourcesArgumentCaptor.capture())
}
test("The client should configure the pod with the submission steps.") {
val submissionClient = new Client(
submissionSteps,
new SparkConf(false),
kubernetesClient,
false,
"spark",
loggingPodStatusWatcher)
submissionClient.run()
val createdPod = createdPodArgumentCaptor.getValue
assert(createdPod.getMetadata.getName === FirstTestConfigurationStep.podName)
assert(createdPod.getMetadata.getLabels.asScala ===
Map(FirstTestConfigurationStep.labelKey -> FirstTestConfigurationStep.labelValue))
assert(createdPod.getMetadata.getAnnotations.asScala ===
Map(SecondTestConfigurationStep.annotationKey ->
SecondTestConfigurationStep.annotationValue))
assert(createdPod.getSpec.getContainers.size() === 1)
assert(createdPod.getSpec.getContainers.get(0).getName ===
SecondTestConfigurationStep.containerName)
}
test("The client should create the secondary Kubernetes resources.") {
val submissionClient = new Client(
submissionSteps,
new SparkConf(false),
kubernetesClient,
false,
"spark",
loggingPodStatusWatcher)
submissionClient.run()
val createdPod = createdPodArgumentCaptor.getValue
val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
assert(otherCreatedResources.size === 1)
val createdResource = Iterables.getOnlyElement(otherCreatedResources).asInstanceOf[Secret]
assert(createdResource.getMetadata.getName === FirstTestConfigurationStep.secretName)
assert(createdResource.getData.asScala ===
Map(FirstTestConfigurationStep.secretKey -> FirstTestConfigurationStep.secretData))
val ownerReference = Iterables.getOnlyElement(createdResource.getMetadata.getOwnerReferences)
assert(ownerReference.getName === createdPod.getMetadata.getName)
assert(ownerReference.getKind === DRIVER_POD_KIND)
assert(ownerReference.getUid === DRIVER_POD_UID)
assert(ownerReference.getApiVersion === DRIVER_POD_API_VERSION)
}
test("The client should attach the driver container with the appropriate JVM options.") {
val sparkConf = new SparkConf(false)
.set("spark.logConf", "true")
.set(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS,
"-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails")
val submissionClient = new Client(
submissionSteps,
sparkConf,
kubernetesClient,
false,
"spark",
loggingPodStatusWatcher)
submissionClient.run()
val createdPod = createdPodArgumentCaptor.getValue
val driverContainer = Iterables.getOnlyElement(createdPod.getSpec.getContainers)
assert(driverContainer.getName === SecondTestConfigurationStep.containerName)
val driverJvmOptsEnvs = driverContainer.getEnv.asScala.filter { env =>
env.getName.startsWith(ENV_JAVA_OPT_PREFIX)
}.sortBy(_.getName)
assert(driverJvmOptsEnvs.size === 4)
val expectedJvmOptsValues = Seq(
"-Dspark.logConf=true",
s"-D${SecondTestConfigurationStep.sparkConfKey}=" +
s"${SecondTestConfigurationStep.sparkConfValue}",
"-XX:+HeapDumpOnOutOfMemoryError",
"-XX:+PrintGCDetails")
driverJvmOptsEnvs.zip(expectedJvmOptsValues).zipWithIndex.foreach {
case ((resolvedEnv, expectedJvmOpt), index) =>
assert(resolvedEnv.getName === s"$ENV_JAVA_OPT_PREFIX$index")
assert(resolvedEnv.getValue === expectedJvmOpt)
}
}
test("Waiting for app completion should stall on the watcher") {
val submissionClient = new Client(
submissionSteps,
new SparkConf(false),
kubernetesClient,
true,
"spark",
loggingPodStatusWatcher)
submissionClient.run()
verify(loggingPodStatusWatcher).awaitCompletion()
}
}
private object FirstTestConfigurationStep extends DriverConfigurationStep {
val podName = "test-pod"
val secretName = "test-secret"
val labelKey = "first-submit"
val labelValue = "true"
val secretKey = "secretKey"
val secretData = "secretData"
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val modifiedPod = new PodBuilder(driverSpec.driverPod)
.editMetadata()
.withName(podName)
.addToLabels(labelKey, labelValue)
.endMetadata()
.build()
val additionalResource = new SecretBuilder()
.withNewMetadata()
.withName(secretName)
.endMetadata()
.addToData(secretKey, secretData)
.build()
driverSpec.copy(
driverPod = modifiedPod,
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(additionalResource))
}
}
private object SecondTestConfigurationStep extends DriverConfigurationStep {
val annotationKey = "second-submit"
val annotationValue = "submitted"
val sparkConfKey = "spark.custom-conf"
val sparkConfValue = "custom-conf-value"
val containerName = "driverContainer"
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val modifiedPod = new PodBuilder(driverSpec.driverPod)
.editMetadata()
.addToAnnotations(annotationKey, annotationValue)
.endMetadata()
.build()
val resolvedSparkConf = driverSpec.driverSparkConf.clone().set(sparkConfKey, sparkConfValue)
val modifiedContainer = new ContainerBuilder(driverSpec.driverContainer)
.withName(containerName)
.build()
driverSpec.copy(
driverPod = modifiedPod,
driverSparkConf = resolvedSparkConf,
driverContainer = modifiedContainer)
}
}

View file

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config.DRIVER_DOCKER_IMAGE
import org.apache.spark.deploy.k8s.submit.steps._
class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
private val NAMESPACE = "default"
private val DRIVER_IMAGE = "driver-image"
private val APP_ID = "spark-app-id"
private val LAUNCH_TIME = 975256L
private val APP_NAME = "spark"
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2")
test("Base submission steps with a main app resource.") {
val sparkConf = new SparkConf(false)
.set(DRIVER_DOCKER_IMAGE, DRIVER_IMAGE)
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigurationStepsOrchestrator(
NAMESPACE,
APP_ID,
LAUNCH_TIME,
Some(mainAppResource),
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep]
)
}
test("Base submission steps without a main app resource.") {
val sparkConf = new SparkConf(false)
.set(DRIVER_DOCKER_IMAGE, DRIVER_IMAGE)
val orchestrator = new DriverConfigurationStepsOrchestrator(
NAMESPACE,
APP_ID,
LAUNCH_TIME,
Option.empty,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep]
)
}
private def validateStepTypes(
orchestrator: DriverConfigurationStepsOrchestrator,
types: Class[_ <: DriverConfigurationStep]*): Unit = {
val steps = orchestrator.getAllConfigurationSteps()
assert(steps.size === types.size)
assert(steps.map(_.getClass) === types)
}
}

View file

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
class BaseDriverConfigurationStepSuite extends SparkFunSuite {
private val APP_ID = "spark-app-id"
private val RESOURCE_NAME_PREFIX = "spark"
private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
private val DOCKER_IMAGE_PULL_POLICY = "IfNotPresent"
private val APP_NAME = "spark-test"
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2", "arg 3")
private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"
test("Set all possible configurations from the user.") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, "/opt/spark/spark-examples.jar")
.set("spark.driver.cores", "2")
.set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
.set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
.set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
.set(DRIVER_DOCKER_IMAGE, "spark-driver:latest")
.set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
val submissionStep = new BaseDriverConfigurationStep(
APP_ID,
RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
DOCKER_IMAGE_PULL_POLICY,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
val basePod = new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
val baseDriverSpec = KubernetesDriverSpec(
driverPod = basePod,
driverContainer = new ContainerBuilder().build(),
driverSparkConf = new SparkConf(false),
otherKubernetesResources = Seq.empty[HasMetadata])
val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)
assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest")
assert(preparedDriverSpec.driverContainer.getImagePullPolicy === DOCKER_IMAGE_PULL_POLICY)
assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
val envs = preparedDriverSpec.driverContainer
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar")
assert(envs(ENV_DRIVER_MEMORY) === "256M")
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
assert(envs(ENV_DRIVER_ARGS) === "\"arg1\" \"arg2\" \"arg 3\"")
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
val resourceRequirements = preparedDriverSpec.driverContainer.getResources
val requests = resourceRequirements.getRequests.asScala
assert(requests("cpu").getAmount === "2")
assert(requests("memory").getAmount === "256Mi")
val limits = resourceRequirements.getLimits.asScala
assert(limits("memory").getAmount === "456Mi")
assert(limits("cpu").getAmount === "4")
val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata
assert(driverPodMetadata.getName === "spark-driver-pod")
assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
val expectedAnnotations = Map(
CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
SPARK_APP_NAME_ANNOTATION -> APP_NAME)
assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
val expectedSparkConf = Map(
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
"spark.app.id" -> APP_ID,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX)
assert(resolvedSparkConf === expectedSparkConf)
}
}

View file

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import java.io.File
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
class DependencyResolutionStepSuite extends SparkFunSuite {
private val SPARK_JARS = Seq(
"hdfs://localhost:9000/apps/jars/jar1.jar",
"file:///home/user/apps/jars/jar2.jar",
"local:///var/apps/jars/jar3.jar")
private val SPARK_FILES = Seq(
"file:///home/user/apps/files/file1.txt",
"hdfs://localhost:9000/apps/files/file2.txt",
"local:///var/apps/files/file3.txt")
private val JARS_DOWNLOAD_PATH = "/mnt/spark-data/jars"
private val FILES_DOWNLOAD_PATH = "/mnt/spark-data/files"
test("Added dependencies should be resolved in Spark configuration and environment") {
val dependencyResolutionStep = new DependencyResolutionStep(
SPARK_JARS,
SPARK_FILES,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH)
val driverPod = new PodBuilder().build()
val baseDriverSpec = KubernetesDriverSpec(
driverPod = driverPod,
driverContainer = new ContainerBuilder().build(),
driverSparkConf = new SparkConf(false),
otherKubernetesResources = Seq.empty[HasMetadata])
val preparedDriverSpec = dependencyResolutionStep.configureDriver(baseDriverSpec)
assert(preparedDriverSpec.driverPod === driverPod)
assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
val resolvedSparkJars = preparedDriverSpec.driverSparkConf.get("spark.jars").split(",").toSet
val expectedResolvedSparkJars = Set(
"hdfs://localhost:9000/apps/jars/jar1.jar",
s"$JARS_DOWNLOAD_PATH/jar2.jar",
"/var/apps/jars/jar3.jar")
assert(resolvedSparkJars === expectedResolvedSparkJars)
val resolvedSparkFiles = preparedDriverSpec.driverSparkConf.get("spark.files").split(",").toSet
val expectedResolvedSparkFiles = Set(
s"$FILES_DOWNLOAD_PATH/file1.txt",
s"hdfs://localhost:9000/apps/files/file2.txt",
s"/var/apps/files/file3.txt")
assert(resolvedSparkFiles === expectedResolvedSparkFiles)
val driverEnv = preparedDriverSpec.driverContainer.getEnv.asScala
assert(driverEnv.size === 1)
assert(driverEnv.head.getName === ENV_MOUNTED_CLASSPATH)
val resolvedDriverClasspath = driverEnv.head.getValue.split(File.pathSeparator).toSet
val expectedResolvedDriverClasspath = Set(
s"$JARS_DOWNLOAD_PATH/jar1.jar",
s"$JARS_DOWNLOAD_PATH/jar2.jar",
"/var/apps/jars/jar3.jar")
assert(resolvedDriverClasspath === expectedResolvedDriverClasspath)
}
}

View file

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import java.io.File
import scala.collection.JavaConverters._
import com.google.common.base.Charsets
import com.google.common.io.{BaseEncoding, Files}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, Secret}
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.util.Utils
class DriverKubernetesCredentialsStepSuite extends SparkFunSuite with BeforeAndAfter {
private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark"
private var credentialsTempDirectory: File = _
private val BASE_DRIVER_SPEC = new KubernetesDriverSpec(
driverPod = new PodBuilder().build(),
driverContainer = new ContainerBuilder().build(),
driverSparkConf = new SparkConf(false),
otherKubernetesResources = Seq.empty[HasMetadata])
before {
credentialsTempDirectory = Utils.createTempDir()
}
after {
credentialsTempDirectory.delete()
}
test("Don't set any credentials") {
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
new SparkConf(false), KUBERNETES_RESOURCE_NAME_PREFIX)
val preparedDriverSpec = kubernetesCredentialsStep.configureDriver(BASE_DRIVER_SPEC)
assert(preparedDriverSpec.driverPod === BASE_DRIVER_SPEC.driverPod)
assert(preparedDriverSpec.driverContainer === BASE_DRIVER_SPEC.driverContainer)
assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
assert(preparedDriverSpec.driverSparkConf.getAll.isEmpty)
}
test("Only set credentials that are manually mounted.") {
val submissionSparkConf = new SparkConf(false)
.set(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
"/mnt/secrets/my-token.txt")
.set(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
"/mnt/secrets/my-key.pem")
.set(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
"/mnt/secrets/my-cert.pem")
.set(
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
"/mnt/secrets/my-ca.pem")
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, KUBERNETES_RESOURCE_NAME_PREFIX)
val preparedDriverSpec = kubernetesCredentialsStep.configureDriver(BASE_DRIVER_SPEC)
assert(preparedDriverSpec.driverPod === BASE_DRIVER_SPEC.driverPod)
assert(preparedDriverSpec.driverContainer === BASE_DRIVER_SPEC.driverContainer)
assert(preparedDriverSpec.otherKubernetesResources.isEmpty)
assert(preparedDriverSpec.driverSparkConf.getAll.toMap === submissionSparkConf.getAll.toMap)
}
test("Mount credentials from the submission client as a secret.") {
val caCertFile = writeCredentials("ca.pem", "ca-cert")
val clientKeyFile = writeCredentials("key.pem", "key")
val clientCertFile = writeCredentials("cert.pem", "cert")
val submissionSparkConf = new SparkConf(false)
.set(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX",
"token")
.set(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
clientKeyFile.getAbsolutePath)
.set(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
clientCertFile.getAbsolutePath)
.set(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
caCertFile.getAbsolutePath)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, KUBERNETES_RESOURCE_NAME_PREFIX)
val preparedDriverSpec = kubernetesCredentialsStep.configureDriver(
BASE_DRIVER_SPEC.copy(driverSparkConf = submissionSparkConf))
val expectedSparkConf = Map(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX" -> "<present_but_redacted>",
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX" ->
DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH,
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
DRIVER_CREDENTIALS_CLIENT_KEY_PATH,
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
DRIVER_CREDENTIALS_CLIENT_CERT_PATH,
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
DRIVER_CREDENTIALS_CA_CERT_PATH,
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
clientKeyFile.getAbsolutePath,
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
clientCertFile.getAbsolutePath,
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
caCertFile.getAbsolutePath)
assert(preparedDriverSpec.driverSparkConf.getAll.toMap === expectedSparkConf)
assert(preparedDriverSpec.otherKubernetesResources.size === 1)
val credentialsSecret = preparedDriverSpec.otherKubernetesResources.head.asInstanceOf[Secret]
assert(credentialsSecret.getMetadata.getName ===
s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials")
val decodedSecretData = credentialsSecret.getData.asScala.map { data =>
(data._1, new String(BaseEncoding.base64().decode(data._2), Charsets.UTF_8))
}
val expectedSecretData = Map(
DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME -> "ca-cert",
DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME -> "token",
DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME -> "key",
DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> "cert")
assert(decodedSecretData === expectedSecretData)
val driverPodVolumes = preparedDriverSpec.driverPod.getSpec.getVolumes.asScala
assert(driverPodVolumes.size === 1)
assert(driverPodVolumes.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
assert(driverPodVolumes.head.getSecret != null)
assert(driverPodVolumes.head.getSecret.getSecretName === credentialsSecret.getMetadata.getName)
val driverContainerVolumeMount = preparedDriverSpec.driverContainer.getVolumeMounts.asScala
assert(driverContainerVolumeMount.size === 1)
assert(driverContainerVolumeMount.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
assert(driverContainerVolumeMount.head.getMountPath === DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
}
private def writeCredentials(credentialsFileName: String, credentialsContents: String): File = {
val credentialsFile = new File(credentialsTempDirectory, credentialsFileName)
Files.write(credentialsContents, credentialsFile, Charsets.UTF_8)
credentialsFile
}
}

View file

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.Service
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import org.apache.spark.util.Clock
class DriverServiceBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter {
private val SHORT_RESOURCE_NAME_PREFIX =
"a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH -
DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length)
private val LONG_RESOURCE_NAME_PREFIX =
"a" * (DriverServiceBootstrapStep.MAX_SERVICE_NAME_LENGTH -
DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX.length + 1)
private val DRIVER_LABELS = Map(
"label1key" -> "label1value",
"label2key" -> "label2value")
@Mock
private var clock: Clock = _
private var sparkConf: SparkConf = _
before {
MockitoAnnotations.initMocks(this)
sparkConf = new SparkConf(false)
}
test("Headless service has a port for the driver RPC and the block manager.") {
val configurationStep = new DriverServiceBootstrapStep(
SHORT_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf
.set("spark.driver.port", "9000")
.set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080),
clock)
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
assert(resolvedDriverSpec.otherKubernetesResources.size === 1)
assert(resolvedDriverSpec.otherKubernetesResources.head.isInstanceOf[Service])
val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
verifyService(
9000,
8080,
s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}",
driverService)
}
test("Hostname and ports are set according to the service name.") {
val configurationStep = new DriverServiceBootstrapStep(
SHORT_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf
.set("spark.driver.port", "9000")
.set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
.set(KUBERNETES_NAMESPACE, "my-namespace"),
clock)
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX
val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local"
verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName)
}
test("Ports should resolve to defaults in SparkConf and in the service.") {
val configurationStep = new DriverServiceBootstrapStep(
SHORT_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf,
clock)
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
verifyService(
DEFAULT_DRIVER_PORT,
DEFAULT_BLOCKMANAGER_PORT,
s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}",
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service])
assert(resolvedDriverSpec.driverSparkConf.get("spark.driver.port") ===
DEFAULT_DRIVER_PORT.toString)
assert(resolvedDriverSpec.driverSparkConf.get(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === DEFAULT_BLOCKMANAGER_PORT)
}
test("Long prefixes should switch to using a generated name.") {
val configurationStep = new DriverServiceBootstrapStep(
LONG_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
clock)
when(clock.getTimeMillis()).thenReturn(10000)
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
val expectedServiceName = s"spark-10000${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}"
assert(driverService.getMetadata.getName === expectedServiceName)
val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local"
verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName)
}
test("Disallow bind address and driver host to be set explicitly.") {
val configurationStep = new DriverServiceBootstrapStep(
LONG_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"),
clock)
try {
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
fail("The driver bind address should not be allowed.")
} catch {
case e: Throwable =>
assert(e.getMessage ===
s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_BIND_ADDRESS_KEY} is" +
" not supported in Kubernetes mode, as the driver's bind address is managed" +
" and set to the driver pod's IP address.")
}
sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
try {
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
fail("The driver host address should not be allowed.")
} catch {
case e: Throwable =>
assert(e.getMessage ===
s"requirement failed: ${DriverServiceBootstrapStep.DRIVER_HOST_KEY} is" +
" not supported in Kubernetes mode, as the driver's hostname will be managed via" +
" a Kubernetes service.")
}
}
private def verifyService(
driverPort: Int,
blockManagerPort: Int,
expectedServiceName: String,
service: Service): Unit = {
assert(service.getMetadata.getName === expectedServiceName)
assert(service.getSpec.getClusterIP === "None")
assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
assert(service.getSpec.getPorts.size() === 2)
val driverServicePorts = service.getSpec.getPorts.asScala
assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
assert(driverServicePorts.head.getPort.intValue() === driverPort)
assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
}
private def verifySparkConfHostNames(
driverSparkConf: SparkConf, expectedHostName: String): Unit = {
assert(driverSparkConf.get(
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === expectedHostName)
}
}

View file

@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM spark-base
# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark-driver:latest -f dockerfiles/spark-base/Dockerfile .
COPY examples /opt/spark/examples
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS

View file

@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM spark-base
# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark-executor:latest -f dockerfiles/spark-base/Dockerfile .
COPY examples /opt/spark/examples
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

View file

@ -0,0 +1,47 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM openjdk:8-alpine
# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
RUN set -ex && \
apk upgrade --no-cache && \
apk add --no-cache bash tini libc6-compat && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/work-dir \
touch /opt/spark/RELEASE && \
rm /bin/sh && \
ln -sv /bin/bash /bin/sh && \
chgrp root /etc/passwd && chmod ug+rw /etc/passwd
COPY jars /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY conf /opt/spark/conf
COPY dockerfiles/spark-base/entrypoint.sh /opt/
ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]

View file

@ -0,0 +1,37 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# echo commands to the terminal output
set -ex
# Check whether there is a passwd entry for the container UID
myuid=$(id -u)
mygid=$(id -g)
uidentry=$(getent passwd $myuid)
# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ] ; then
if [ -w /etc/passwd ] ; then
echo "$myuid:x:$myuid:$mygid:anonymous uid:$SPARK_HOME:/bin/false" >> /etc/passwd
else
echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
fi
fi
# Execute the container CMD under tini for better hygiene
/sbin/tini -s -- "$@"

View file

@ -217,20 +217,12 @@ package object config {
.intConf
.createWithDefault(1)
private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
.bytesConf(ByteUnit.MiB)
.createOptional
/* Executor configuration. */
private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
.intConf
.createWithDefault(1)
private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
.bytesConf(ByteUnit.MiB)
.createOptional
private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
.doc("Node label expression for executors.")